Skip to content

Commit

Permalink
Continue ExecutionEngine in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Dec 20, 2024
1 parent 6f0653b commit 9a2ed76
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 47 deletions.
57 changes: 40 additions & 17 deletions nautilus_core/common/src/msgbus/switchboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;

use nautilus_model::{
data::{BarType, DataType},
identifiers::InstrumentId,
identifiers::{ClientOrderId, InstrumentId},
};
use ustr::Ustr;

Expand All @@ -31,11 +31,12 @@ pub struct MessagingSwitchboard {
custom_topics: HashMap<DataType, Ustr>,
instrument_topics: HashMap<InstrumentId, Ustr>,
deltas_topics: HashMap<InstrumentId, Ustr>,
snapshots_topics: HashMap<InstrumentId, Ustr>,
book_snapshots_topics: HashMap<InstrumentId, Ustr>,
depth_topics: HashMap<InstrumentId, Ustr>,
quote_topics: HashMap<InstrumentId, Ustr>,
trade_topics: HashMap<InstrumentId, Ustr>,
bar_topics: HashMap<BarType, Ustr>,
order_snapshots_topics: HashMap<ClientOrderId, Ustr>,
}

impl Default for MessagingSwitchboard {
Expand All @@ -49,11 +50,12 @@ impl Default for MessagingSwitchboard {
custom_topics: HashMap::new(),
instrument_topics: HashMap::new(),
deltas_topics: HashMap::new(),
snapshots_topics: HashMap::new(),
book_snapshots_topics: HashMap::new(),
depth_topics: HashMap::new(),
quote_topics: HashMap::new(),
trade_topics: HashMap::new(),
bar_topics: HashMap::new(),
order_snapshots_topics: HashMap::new(),
}
}
}
Expand Down Expand Up @@ -101,9 +103,9 @@ impl MessagingSwitchboard {
}

#[must_use]
pub fn get_snapshots_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
*self
.snapshots_topics
.book_snapshots_topics
.entry(instrument_id)
.or_insert_with(|| {
Ustr::from(&format!(
Expand All @@ -114,7 +116,7 @@ impl MessagingSwitchboard {
}

#[must_use]
pub fn get_quote_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
*self.quote_topics.entry(instrument_id).or_insert_with(|| {
Ustr::from(&format!(
"data.quotes.{}.{}",
Expand All @@ -124,7 +126,7 @@ impl MessagingSwitchboard {
}

#[must_use]
pub fn get_trade_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
*self.trade_topics.entry(instrument_id).or_insert_with(|| {
Ustr::from(&format!(
"data.trades.{}.{}",
Expand All @@ -134,12 +136,20 @@ impl MessagingSwitchboard {
}

#[must_use]
pub fn get_bar_topic(&mut self, bar_type: BarType) -> Ustr {
pub fn get_bars_topic(&mut self, bar_type: BarType) -> Ustr {
*self
.bar_topics
.entry(bar_type)
.or_insert_with(|| Ustr::from(&format!("data.bars.{bar_type}")))
}

#[must_use]
pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> Ustr {
*self
.order_snapshots_topics
.entry(client_order_id)
.or_insert_with(|| Ustr::from(&format!("order.snapshots.{client_order_id}")))
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -194,14 +204,16 @@ mod tests {
}

#[rstest]
fn test_get_snapshots_topic(
fn test_get_book_snapshots_topic(
mut switchboard: MessagingSwitchboard,
instrument_id: InstrumentId,
) {
let expected_topic = Ustr::from("data.book.snapshots.XCME.ESZ24");
let result = switchboard.get_snapshots_topic(instrument_id);
let result = switchboard.get_book_snapshots_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.snapshots_topics.contains_key(&instrument_id));
assert!(switchboard
.book_snapshots_topics
.contains_key(&instrument_id));
}

#[rstest]
Expand All @@ -213,27 +225,38 @@ mod tests {
}

#[rstest]
fn test_get_quote_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
let expected_topic = Ustr::from("data.quotes.XCME.ESZ24");
let result = switchboard.get_quote_topic(instrument_id);
let result = switchboard.get_quotes_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.quote_topics.contains_key(&instrument_id));
}

#[rstest]
fn test_get_trade_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
let expected_topic = Ustr::from("data.trades.XCME.ESZ24");
let result = switchboard.get_trade_topic(instrument_id);
let result = switchboard.get_trades_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.trade_topics.contains_key(&instrument_id));
}

#[rstest]
fn test_get_bar_topic(mut switchboard: MessagingSwitchboard) {
fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
let expected_topic = Ustr::from(&format!("data.bars.{bar_type}"));
let result = switchboard.get_bar_topic(bar_type);
let result = switchboard.get_bars_topic(bar_type);
assert_eq!(result, expected_topic);
assert!(switchboard.bar_topics.contains_key(&bar_type));
}

#[rstest]
fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
let client_order_id = ClientOrderId::from("O-123456789");
let expected_topic = Ustr::from(&format!("order.snapshots.{client_order_id}"));
let result = switchboard.get_order_snapshots_topic(client_order_id);
assert_eq!(result, expected_topic);
assert!(switchboard
.order_snapshots_topics
.contains_key(&client_order_id));
}
}
14 changes: 7 additions & 7 deletions nautilus_core/data/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ impl DataEngine {
// TODO: Handle synthetics

let mut msgbus = self.msgbus.borrow_mut();
let topic = msgbus.switchboard.get_quote_topic(quote.instrument_id);
let topic = msgbus.switchboard.get_quotes_topic(quote.instrument_id);
msgbus.publish(&topic, &quote as &dyn Any); // TODO: Optimize
}

Expand All @@ -531,7 +531,7 @@ impl DataEngine {
// TODO: Handle synthetics

let mut msgbus = self.msgbus.borrow_mut();
let topic = msgbus.switchboard.get_trade_topic(trade.instrument_id);
let topic = msgbus.switchboard.get_trades_topic(trade.instrument_id);
msgbus.publish(&topic, &trade as &dyn Any); // TODO: Optimize
}

Expand Down Expand Up @@ -562,7 +562,7 @@ impl DataEngine {
}

let mut msgbus = self.msgbus.borrow_mut();
let topic = msgbus.switchboard.get_bar_topic(bar.bar_type);
let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
msgbus.publish(&topic, &bar as &dyn Any); // TODO: Optimize
}

Expand Down Expand Up @@ -626,7 +626,7 @@ impl DataEngine {
if !self.book_intervals.contains_key(&interval_ms) {
let interval_ns = millis_to_nanos(interval_ms.get() as f64);
let mut msgbus = self.msgbus.borrow_mut();
let topic = msgbus.switchboard.get_snapshots_topic(instrument_id);
let topic = msgbus.switchboard.get_book_snapshots_topic(instrument_id);

let snap_info = BookSnapshotInfo {
instrument_id,
Expand Down Expand Up @@ -715,7 +715,7 @@ impl DataEngine {
vec![
msgbus.switchboard.get_deltas_topic(instrument_id),
msgbus.switchboard.get_depth_topic(instrument_id),
msgbus.switchboard.get_snapshots_topic(instrument_id),
msgbus.switchboard.get_book_snapshots_topic(instrument_id),
]
};

Expand Down Expand Up @@ -746,7 +746,7 @@ impl DataEngine {
vec![
msgbus.switchboard.get_deltas_topic(instrument_id),
msgbus.switchboard.get_depth_topic(instrument_id),
msgbus.switchboard.get_snapshots_topic(instrument_id),
msgbus.switchboard.get_book_snapshots_topic(instrument_id),
]
};

Expand Down Expand Up @@ -791,7 +791,7 @@ impl DataEngine {
if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
let mut msgbus = self.msgbus.borrow_mut();

let topic = msgbus.switchboard.get_snapshots_topic(*instrument_id);
let topic = msgbus.switchboard.get_book_snapshots_topic(*instrument_id);

// Check remaining snapshot subscriptions, if none then remove snapshotter
if msgbus.subscriptions_count(topic) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions nautilus_core/data/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ fn test_process_quote_tick(
let handler = get_message_saving_handler::<QuoteTick>(None);
{
let mut msgbus = msgbus.borrow_mut();
let topic = msgbus.switchboard.get_quote_topic(quote.instrument_id);
let topic = msgbus.switchboard.get_quotes_topic(quote.instrument_id);
msgbus.subscribe(topic, handler.clone(), None);
}

Expand Down Expand Up @@ -840,7 +840,7 @@ fn test_process_trade_tick(
let handler = get_message_saving_handler::<TradeTick>(None);
{
let mut msgbus = msgbus.borrow_mut();
let topic = msgbus.switchboard.get_trade_topic(trade.instrument_id);
let topic = msgbus.switchboard.get_trades_topic(trade.instrument_id);
msgbus.subscribe(topic, handler.clone(), None);
}

Expand Down Expand Up @@ -891,7 +891,7 @@ fn test_process_bar(
let handler = get_message_saving_handler::<Bar>(None);
{
let mut msgbus = msgbus.borrow_mut();
let topic = msgbus.switchboard.get_bar_topic(bar.bar_type);
let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
msgbus.subscribe(topic, handler.clone(), None);
}

Expand Down
41 changes: 21 additions & 20 deletions nautilus_core/execution/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
// -------------------------------------------------------------------------------------------------

//! Provides a generic `ExecutionEngine` for all environments.
//!
//! The execution engines primary responsibility is to orchestrate interactions
//! between the `ExecutionClient` instances, and the rest of the platform. This
//! includes sending commands to, and receiving events from, the trading venue
//! endpoints via its registered execution clients.
// Under development
#![allow(dead_code)]
#![allow(unused_variables)]

pub mod config;

#[cfg(test)]
mod tests;

use std::{
cell::RefCell,
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -264,17 +266,18 @@ impl ExecutionEngine {
}

pub fn handle_submit_order_list(&self, client: &ExecutionClient, command: SubmitOrderList) {
let mut cache = self.cache.borrow_mut();

for order in &command.order_list.orders {
if !self.cache.borrow().order_exists(&order.client_order_id()) {
self.cache
.borrow_mut()
.add_order(
order.clone(),
command.position_id,
Some(command.client_id),
true,
)
.unwrap();
if !cache.order_exists(&order.client_order_id()) {
if let Err(e) = cache.add_order(
order.clone(),
command.position_id,
Some(command.client_id),
true,
) {
log::error!("Error on cache insert: {e}");
}

if self.config.snapshot_orders {
self.create_order_state_snapshot(order);
Expand Down Expand Up @@ -310,14 +313,12 @@ impl ExecutionEngine {
todo!();
}

// TODO
fn create_order_state_snapshot(&self, order: &OrderAny) {
todo!()
// let mut msgbus = self.msgbus.borrow_mut();
// let topic = msgbus
// .switchboard
// .get_order_snapshot_topic(order.client_order_id());
// msgbus.publish(&topic, order);
let mut msgbus = self.msgbus.borrow_mut();
let topic = msgbus
.switchboard
.get_order_snapshots_topic(order.client_order_id());
msgbus.publish(&topic, order);
}

// -- EVENT HANDLERS ----------------------------------------------------
Expand Down

0 comments on commit 9a2ed76

Please sign in to comment.