diff --git a/rust/rti/src/federate_info.rs b/rust/rti/src/federate_info.rs index daf5b48..97ac56c 100644 --- a/rust/rti/src/federate_info.rs +++ b/rust/rti/src/federate_info.rs @@ -31,7 +31,7 @@ pub struct FederateInfo { // a federate when handling lf_request_stop(). // TODO: lf_thread_t thread_id; stream: Option, // The TCP socket descriptor for communicating with this federate. - // TODO: struct sockaddr_in UDP_addr; + udp_addr: SocketAddr, clock_synchronization_enabled: bool, // Indicates the status of clock synchronization // for this federate. Enabled by default. in_transit_message_tags: InTransitMessageQueue, // Record of in-transit messages to this federate that are not @@ -54,6 +54,7 @@ impl FederateInfo { enclave: SchedulingNode::new(), requested_stop: false, stream: None::, + udp_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), clock_synchronization_enabled: true, in_transit_message_tags: InTransitMessageQueue::new(), server_hostname: String::from("localhost"), @@ -78,6 +79,14 @@ impl FederateInfo { &self.stream } + pub fn stream_mut(&mut self) -> &mut Option { + &mut self.stream + } + + pub fn udp_addr(&self) -> SocketAddr { + self.udp_addr.clone() + } + pub fn clock_synchronization_enabled(&self) -> bool { self.clock_synchronization_enabled } @@ -110,6 +119,10 @@ impl FederateInfo { self.stream = Some(stream); } + pub fn set_udp_addr(&mut self, udp_addr: SocketAddr) { + self.udp_addr = udp_addr; + } + pub fn set_clock_synchronization_enabled(&mut self, clock_synchronization_enabled: bool) { self.clock_synchronization_enabled = clock_synchronization_enabled; } diff --git a/rust/rti/src/lib.rs b/rust/rti/src/lib.rs index 3f7ab20..3053c51 100644 --- a/rust/rti/src/lib.rs +++ b/rust/rti/src/lib.rs @@ -21,6 +21,7 @@ use std::error::Error; use crate::constants::*; use crate::federate_info::*; +use crate::net_common::SocketType; use crate::rti_common::*; use crate::rti_remote::*; use crate::trace::Trace; @@ -29,7 +30,7 @@ use server::Server; const RTI_TRACE_FILE_NAME: &str = "rti.lft"; -#[derive(PartialEq, PartialOrd, Clone)] +#[derive(PartialEq, PartialOrd, Clone, Debug)] pub enum ClockSyncStat { ClockSyncOff, ClockSyncInit, @@ -125,7 +126,7 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static return Err("Fail to handle clock_sync option"); } idx += 1; - // TODO: idx += process_clock_sync_args(); + idx += process_clock_sync_args(rti, argc - idx, &argv[idx..]); } else if arg == "-t" || arg == "--tracing" { rti.base_mut().set_tracing_enabled(true); } else if arg == " " { @@ -175,6 +176,74 @@ fn usage(argc: usize, argv: &[String]) { } } +fn process_clock_sync_args(rti: &mut RTIRemote, argc: usize, argv: &[String]) -> usize { + println!("argc = {}", argc); + for mut i in 0..argc { + let arg = argv[i].as_str(); + if arg == "off" { + rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOff); + println!("RTI: Clock sync: off"); + } else if arg == "init" || arg == "initial" { + rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncInit); + println!("RTI: Clock sync: init"); + } else if arg == "on" { + rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOn); + println!("RTI: Clock sync: on"); + } else if arg == "period" { + if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn { + println!("[ERROR] clock sync period can only be set if --clock-sync is set to on."); + usage(argc, argv); + i += 1; + continue; // Try to parse the rest of the arguments as clock sync args. + } else if argc < i + 2 { + println!("[ERROR] clock sync period needs a time (in nanoseconds) argument."); + usage(argc, argv); + continue; + } + i += 1; + let period_ns: u64 = 10; + if period_ns == 0 || period_ns == u64::MAX { + println!("[ERROR] clock sync period value is invalid."); + continue; // Try to parse the rest of the arguments as clock sync args. + } + rti.set_clock_sync_period_ns(period_ns); + println!("RTI: Clock sync period: {}", rti.clock_sync_period_ns()); + } else if argv[i] == "exchanges-per-interval" { + if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn + && rti.clock_sync_global_status() != ClockSyncStat::ClockSyncInit + { + println!("[ERROR] clock sync exchanges-per-interval can only be set if\n--clock-sync is set to on or init."); + usage(argc, argv); + continue; // Try to parse the rest of the arguments as clock sync args. + } else if argc < i + 2 { + println!("[ERROR] clock sync exchanges-per-interval needs an integer argument."); + usage(argc, argv); + continue; // Try to parse the rest of the arguments as clock sync args. + } + i += 1; + let exchanges: u32 = 10; + if exchanges == 0 || exchanges == u32::MAX || exchanges == u32::MIN { + println!("[ERROR] clock sync exchanges-per-interval value is invalid."); + continue; // Try to parse the rest of the arguments as clock sync args. + } + rti.set_clock_sync_exchanges_per_interval(exchanges); // FIXME: Loses numbers on 64-bit machines + println!( + "RTI: Clock sync exchanges per interval: {}", + rti.clock_sync_exchanges_per_interval() + ); + } else if arg == " " { + // Tolerate spaces + continue; + } else { + // Either done with the clock sync args or there is an invalid + // character. In either case, let the parent function deal with + // the rest of the characters; + return i; + } + } + argc +} + pub fn initialize_federates(rti: &mut RTIRemote) { if rti.base().tracing_enabled() { let _lf_number_of_workers = rti.base().number_of_scheduling_nodes(); @@ -205,9 +274,12 @@ fn initialize_federate(fed: &mut FederateInfo, id: u16) { pub fn start_rti_server(_f_rti: &mut RTIRemote) -> Result> { // TODO: _lf_initialize_clock(); - Ok(Server::create_server( - _f_rti.user_specified_port().to_string(), - )) + let server = Server::create_rti_server(_f_rti, _f_rti.user_specified_port(), SocketType::TCP); + if _f_rti.clock_sync_global_status() >= ClockSyncStat::ClockSyncOn { + let final_tcp_port = u16::from(_f_rti.final_port_tcp()); + Server::create_rti_server(_f_rti, final_tcp_port + 1, SocketType::UDP); + } + Ok(server) } /** diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index 8dfe47d..d33c8d1 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -103,6 +103,10 @@ pub enum MsgType { AddressAdvertisement, P2pSendingFedId, P2pTaggedMessage, + ClockSyncT1, + ClockSyncT3, + ClockSyncT4, + ClockSyncCodedProbe, PortAbsent, NeighborStructure, Ignore, @@ -129,6 +133,10 @@ impl MsgType { MsgType::AddressAdvertisement => 14, MsgType::P2pSendingFedId => 15, MsgType::P2pTaggedMessage => 17, + MsgType::ClockSyncT1 => 19, + MsgType::ClockSyncT3 => 20, + MsgType::ClockSyncT4 => 21, + MsgType::ClockSyncCodedProbe => 22, MsgType::PortAbsent => 23, MsgType::NeighborStructure => 24, MsgType::Ignore => 250, @@ -150,6 +158,10 @@ impl MsgType { 12 => MsgType::StopGranted, 13 => MsgType::AddressQuery, 14 => MsgType::AddressAdvertisement, + 19 => MsgType::ClockSyncT1, + 20 => MsgType::ClockSyncT3, + 21 => MsgType::ClockSyncT4, + 22 => MsgType::ClockSyncCodedProbe, 23 => MsgType::PortAbsent, _ => MsgType::Ignore, } @@ -183,6 +195,12 @@ impl ErrType { } } +#[derive(PartialEq, Clone)] +pub enum SocketType { + TCP, + UDP, +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/rti/src/net_util.rs b/rust/rti/src/net_util.rs index bd53636..e366efc 100644 --- a/rust/rti/src/net_util.rs +++ b/rust/rti/src/net_util.rs @@ -31,7 +31,7 @@ impl NetUtil { } {} } - pub fn read_from_stream(stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16) -> usize { + pub fn read_from_socket(stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16) -> usize { let mut bytes_read = 0; while match stream.read(buffer) { Ok(msg_size) => { @@ -209,7 +209,7 @@ mod tests { } #[test] - fn test_read_from_stream_positive() { + fn test_read_from_socket_positive() { let port_num = 35642; let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); let mut ip_address = LOCAL_HOST.to_owned(); @@ -225,7 +225,7 @@ mod tests { ), ); let mut buffer = vec![0 as u8; buffer_size]; - let read_size = NetUtil::read_from_stream(&mut stream, &mut buffer, 0); + let read_size = NetUtil::read_from_socket(&mut stream, &mut buffer, 0); assert!(buffer == msg); assert!(buffer_size == read_size); } diff --git a/rust/rti/src/rti_remote.rs b/rust/rti/src/rti_remote.rs index 724537f..7ea44f7 100644 --- a/rust/rti/src/rti_remote.rs +++ b/rust/rti/src/rti_remote.rs @@ -16,6 +16,8 @@ use crate::constants::*; use crate::ClockSyncStat; use crate::RTICommon; +use std::net::UdpSocket; + /** * Structure that an RTI instance uses to keep track of its own and its * corresponding federates' state. @@ -61,7 +63,7 @@ pub struct RTIRemote { final_port_udp: u16, /** The UDP socket descriptor for the socket server. */ - socket_descriptor_udp: i32, + socket_descriptor_udp: Option, /************* Clock synchronization information *************/ /* Thread performing PTP clock sync sessions periodically. */ @@ -80,7 +82,7 @@ pub struct RTIRemote { /** * Number of messages exchanged for each clock sync attempt. */ - clock_sync_exchanges_per_interval: i32, + clock_sync_exchanges_per_interval: u32, /** * Boolean indicating that authentication is enabled. @@ -105,7 +107,7 @@ impl RTIRemote { final_port_tcp: 0, socket_descriptor_tcp: -1, final_port_udp: u16::MAX, - socket_descriptor_udp: -1, + socket_descriptor_udp: None, clock_sync_global_status: ClockSyncStat::ClockSyncInit, clock_sync_period_ns: 10 * 1000000, clock_sync_exchanges_per_interval: 10, @@ -138,6 +140,14 @@ impl RTIRemote { self.user_specified_port } + pub fn final_port_tcp(&self) -> u16 { + self.final_port_tcp + } + + pub fn socket_descriptor_udp(&mut self) -> &mut Option { + &mut self.socket_descriptor_udp + } + pub fn final_port_udp(&self) -> u16 { self.final_port_udp } @@ -146,6 +156,14 @@ impl RTIRemote { self.clock_sync_global_status.clone() } + pub fn clock_sync_period_ns(&self) -> u64 { + self.clock_sync_period_ns + } + + pub fn clock_sync_exchanges_per_interval(&self) -> u32 { + self.clock_sync_exchanges_per_interval + } + pub fn stop_in_progress(&self) -> bool { self.stop_in_progress } @@ -167,6 +185,33 @@ impl RTIRemote { self.user_specified_port = user_specified_port; } + pub fn set_final_port_tcp(&mut self, final_port_tcp: u16) { + self.final_port_tcp = final_port_tcp; + } + + pub fn set_socket_descriptor_udp(&mut self, socket_descriptor_udp: Option) { + self.socket_descriptor_udp = socket_descriptor_udp; + } + + pub fn set_final_port_udp(&mut self, final_port_udp: u16) { + self.final_port_udp = final_port_udp; + } + + pub fn set_clock_sync_global_status(&mut self, clock_sync_global_status: ClockSyncStat) { + self.clock_sync_global_status = clock_sync_global_status; + } + + pub fn set_clock_sync_period_ns(&mut self, clock_sync_period_ns: u64) { + self.clock_sync_period_ns = clock_sync_period_ns; + } + + pub fn set_clock_sync_exchanges_per_interval( + &mut self, + clock_sync_exchanges_per_interval: u32, + ) { + self.clock_sync_exchanges_per_interval = clock_sync_exchanges_per_interval; + } + pub fn set_stop_in_progress(&mut self, stop_in_progress: bool) { self.stop_in_progress = stop_in_progress; } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 5e1c16f..9d5d8b6 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -8,10 +8,11 @@ */ use std::io::Write; use std::mem; -use std::net::{IpAddr, Shutdown, TcpListener, TcpStream}; +use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpListener, TcpStream, UdpSocket}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::thread::JoinHandle; +use std::time::Duration; use crate::in_transit_message_queue::InTransitMessageQueue; use crate::net_common; @@ -53,18 +54,50 @@ impl StopGranted { pub struct Server { port: String, + socket_type: SocketType, } impl Server { - pub fn create_server(port: String) -> Server { - // TODO: handle TCP and UDP cases - Server { port } + pub fn create_rti_server( + rti_remote: &mut RTIRemote, + port: u16, + socket_type: SocketType, + ) -> Server { + let mut type_str = String::from("TCP"); + if socket_type == SocketType::UDP { + type_str = String::from("UDP"); + + // TODO: Handle unwrap() properly. + let mut address = String::from("0.0.0.0:"); + address.push_str(port.to_string().as_str()); + let socket = UdpSocket::bind(address).unwrap(); + rti_remote.set_socket_descriptor_udp(Some(socket)); + } + println!( + "RTI using {} port {} for federation {}.", + type_str, + port, + rti_remote.federation_id() + ); + + if socket_type == SocketType::TCP { + rti_remote.set_final_port_tcp(port); + } else if socket_type == SocketType::UDP { + rti_remote.set_final_port_udp(port); + // No need to listen on the UDP socket + } + + Server { + port: port.to_string(), + socket_type, + } } pub fn wait_for_federates(&mut self, _f_rti: RTIRemote) { println!("Server listening on port {}", self.port); let mut address = String::from("0.0.0.0:"); address.push_str(self.port.as_str()); + // TODO: Handle unwrap() properly. let socket = TcpListener::bind(address).unwrap(); let start_time = Arc::new(Mutex::new(StartTime::new())); let received_start_times = Arc::new((Mutex::new(false), Condvar::new())); @@ -187,7 +220,7 @@ impl Server { } // Read no more than one byte to get the message type. // FIXME: Handle unwrap properly. - let bytes_read = NetUtil::read_from_stream( + let bytes_read = NetUtil::read_from_socket( &mut stream, &mut buffer, fed_id.try_into().unwrap(), @@ -324,24 +357,205 @@ impl Server { println!("All federates have connected to RTI."); let cloned_rti = Arc::clone(&arc_rti); - let locked_rti = cloned_rti.read().unwrap(); - let clock_sync_global_status = locked_rti.clock_sync_global_status(); + let clock_sync_global_status; + let number_of_scheduling_nodes; + let final_port_udp; + { + let locked_rti = cloned_rti.read().unwrap(); + clock_sync_global_status = locked_rti.clock_sync_global_status(); + number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); + final_port_udp = locked_rti.final_port_udp(); + } + println!("clock_sync_global_status = {:?}", clock_sync_global_status); if clock_sync_global_status >= ClockSyncStat::ClockSyncOn { + println!("clock_sync_global_status >= ClockSyncStat::ClockSyncOn"); // Create the thread that performs periodic PTP clock synchronization sessions // over the UDP channel, but only if the UDP channel is open and at least one // federate_info is performing runtime clock synchronization. let mut clock_sync_enabled = false; - for i in 0..locked_rti.base().number_of_scheduling_nodes() { - if locked_rti.base().scheduling_nodes()[i as usize].clock_synchronization_enabled() + for i in 0..number_of_scheduling_nodes { { - clock_sync_enabled = true; - break; + let locked_rti = cloned_rti.read().unwrap(); + if locked_rti.base().scheduling_nodes()[i as usize] + .clock_synchronization_enabled() + { + clock_sync_enabled = true; + break; + } } } - if locked_rti.final_port_udp() != u16::MAX && clock_sync_enabled { - println!("\tNEED to create clock_synchronization_thread thread.."); - // TODO: Implement the following. - // lf_thread_create(&_f_rti->clock_thread, clock_synchronization_thread, NULL); + println!( + "After inspecting federates... final_port_udp = {}, clock_sync_enabled = {}", + final_port_udp, clock_sync_enabled + ); + // let cloned_start_time = Arc::clone(&start_time); + // let cloned_received_start_times = Arc::clone(&received_start_times); + + if final_port_udp != u16::MAX && clock_sync_enabled { + let handle = thread::spawn(move || { + println!("Thread is spawned!!!"); + // Wait until all federates have been notified of the start time. + // FIXME: Use lf_ version of this when merged with master. + { + let locked_rti = cloned_rti.read().unwrap(); + while locked_rti.num_feds_proposed_start() + < locked_rti.base().number_of_scheduling_nodes() + { + // Need to wait here. + let received_start_times_notifier = Arc::clone(&received_start_times); + let (lock, condvar) = &*received_start_times_notifier; + let mut notified = lock.lock().unwrap(); + while !*notified { + notified = condvar.wait(notified).unwrap(); + } + } + } + + // Wait until the start time before starting clock synchronization. + // The above wait ensures that start_time has been set. + println!("After cond wait"); + let start_time_value; + { + let locked_start_time = start_time.lock().unwrap(); + start_time_value = locked_start_time.start_time(); + } + let ns_to_wait = start_time_value - Tag::lf_time_physical(); + + if ns_to_wait > 0 { + // TODO: Handle unwrap() properly. + let ns = Duration::from_nanos(ns_to_wait.try_into().unwrap()); + thread::sleep(ns); + } + println!("After sleep for {} ns", ns_to_wait); + // Initiate a clock synchronization every rti->clock_sync_period_ns + // Initiate a clock synchronization every rti->clock_sync_period_ns + // let sleep_time = {(time_t)rti_remote->clock_sync_period_ns / BILLION, + // rti_remote->clock_sync_period_ns % BILLION}; + // let remaining_time; + + let mut any_federates_connected = true; + while any_federates_connected { + // Sleep + let clock_sync_period_ns; + let number_of_scheduling_nodes; + { + let locked_rti = cloned_rti.read().unwrap(); + clock_sync_period_ns = locked_rti.clock_sync_period_ns(); + number_of_scheduling_nodes = + locked_rti.base().number_of_scheduling_nodes(); + } + let ns = Duration::from_nanos(clock_sync_period_ns); // Can be interrupted + thread::sleep(ns); + any_federates_connected = false; + for fed_id in 0..number_of_scheduling_nodes { + let state; + let clock_synchronization_enabled; + { + let locked_rti = cloned_rti.read().unwrap(); + let idx: usize = fed_id as usize; + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + clock_synchronization_enabled = fed.clock_synchronization_enabled(); + } + if state == SchedulingNodeState::NotConnected { + // FIXME: We need better error handling here, but clock sync failure + // should not stop execution. + println!( + "[ERROR] Clock sync failed with federate {}. Not connected.", + fed_id + ); + continue; + } else if !clock_synchronization_enabled { + continue; + } + // Send the RTI's current physical time to the federate + // Send on UDP. + println!( + "[DEBUG:474] RTI sending T1 message to initiate clock sync round." + ); + // TODO: Handle unwrap() properly. + Self::send_physical_clock( + fed_id.try_into().unwrap(), + cloned_rti.clone(), + MsgType::ClockSyncT1.to_byte(), + SocketType::UDP, + ); + + // Listen for reply message, which should be T3. + let message_size = 1 + std::mem::size_of::(); + let mut buffer = vec![0 as u8; message_size]; + // Maximum number of messages that we discard before giving up on this cycle. + // If the T3 message from this federate does not arrive and we keep receiving + // other messages, then give up on this federate and move to the next federate. + let mut remaining_attempts = 5; + while remaining_attempts > 0 { + remaining_attempts -= 1; + let mut read_failed = true; + { + let mut locked_rti = cloned_rti.write().unwrap(); + // TODO: Handle unwrap() properly. + let udp_socket = + locked_rti.socket_descriptor_udp().as_mut().unwrap(); + // let idx: usize = fed_id as usize; + // let fed: &mut FederateInfo = + // &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + // let stream: &mut TcpStream = + // &mut fed.stream_mut().as_mut().unwrap(); + // TODO: Handle unwrap() properly. + match udp_socket.recv(&mut buffer) { + Ok(read_bytes) => { + read_failed = false; + } + Err(..) => { + println!("[ERROR] Failed to read from an UDP socket."); + } + } + } + // If any errors occur, either discard the message or the clock sync round. + if !read_failed { + if buffer[0] == MsgType::ClockSyncT3.to_byte() { + // TODO: Change from_le_bytes properly. + let fed_id_2 = i32::from_le_bytes( + buffer[1..1 + std::mem::size_of::()] + .try_into() + .unwrap(), + ); + // Check that this message came from the correct federate. + if fed_id_2 != fed_id { + // Message is from the wrong federate. Discard the message. + println!("[WARNING] Clock sync: Received T3 message from federate {}, but expected one from {}. Discarding message.", + fed_id_2, fed_id); + continue; + } + println!("[DEBUG:530] Clock sync: RTI received T3 message from federate {}.", fed_id_2); + // TODO: Handle unwrap() properly. + Self::handle_physical_clock_sync_message( + fed_id_2.try_into().unwrap(), + cloned_rti.clone(), + SocketType::UDP, + ); + break; + } else { + // The message is not a T3 message. Discard the message and + // continue waiting for the T3 message. This is possibly a message + // from a previous cycle that was discarded. + println!("[WARNING] Clock sync: Unexpected UDP message {}. Expected MsgType::ClockSyncT3 from federate {}. Discarding message.", + buffer[0], fed_id); + continue; + } + } else { + println!("[WARNING] Clock sync: Read from UDP socket failed: Skipping clock sync round for federate {}.", + fed_id); + remaining_attempts -= 1; + } + } + if remaining_attempts > 0 { + any_federates_connected = true; + } + } + } + }); + handle_list.push(handle); } } @@ -591,6 +805,7 @@ impl Server { fed_id ); let cloned_rti = Arc::clone(&_f_rti); + // TODO: Handle unwrap() properly. let mut connection_info_header = vec![0 as u8; MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE.try_into().unwrap()]; NetUtil::read_from_socket_fail_on_error( @@ -708,9 +923,11 @@ impl Server { } else { let cloned_rti = Arc::clone(&_f_rti); let clock_sync_global_status; + let clock_sync_exchanges_per_interval; { let locked_rti = cloned_rti.read().unwrap(); clock_sync_global_status = locked_rti.clock_sync_global_status(); + clock_sync_exchanges_per_interval = locked_rti.clock_sync_exchanges_per_interval(); } if clock_sync_global_status >= ClockSyncStat::ClockSyncInit { @@ -725,9 +942,54 @@ impl Server { ); // A port number of UINT16_MAX means initial clock sync should not be performed. if federate_udp_port_number != u16::MAX { - // TODO: Implement this if body + // Perform the initialization clock synchronization with the federate. + // Send the required number of messages for clock synchronization + for i in 0..clock_sync_exchanges_per_interval { + // Send the RTI's current physical time T1 to the federate. + Self::send_physical_clock_with_stream( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncT1.to_byte(), + SocketType::TCP, + stream, + ); + + // Listen for reply message, which should be T3. + let message_size = 1 + std::mem::size_of::(); + let mut buffer = vec![0 as u8; message_size]; + NetUtil::read_from_socket_fail_on_error( + stream, + &mut buffer, + fed_id, + "T3 messages", + ); + if buffer[0] == MsgType::ClockSyncT3.to_byte() { + let fed_id = i32::from_le_bytes( + buffer[1..1 + std::mem::size_of::()] + .try_into() + .unwrap(), + ); + println!( + "[DEBUG:956] RTI received T3 clock sync message from federate {}.", + fed_id + ); + Self::handle_physical_clock_sync_message_with_stream( + fed_id.try_into().unwrap(), + cloned_rti.clone(), + SocketType::TCP, + stream, + ); + } else { + println!( + "[ERROR] Unexpected message {} from federate {}.", + buffer[0], fed_id + ); + Self::send_reject(stream, ErrType::UnexpectedMessage.to_byte()); + return false; + } + } println!( - "RTI finished initial clock synchronization with federate_info {}.", + "[DEBUG] RTI finished initial clock synchronization with federate {}.", fed_id ); } @@ -739,6 +1001,19 @@ impl Server { // fed.UDP_addr.sin_family = AF_INET; // fed.UDP_addr.sin_port = htons(federate_udp_port_number); // fed.UDP_addr.sin_addr = fed->server_ip_addr; + println!( + "Before Setting UDP address... port = {}", + federate_udp_port_number + ); + let mut locked_rti = cloned_rti.write().unwrap(); + let idx: usize = fed_id.into(); + let fed: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + // let ip_addr = fed.server_ip_addr(); + fed.set_udp_addr(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + federate_udp_port_number, + )); } } else { // Disable clock sync after initial round. @@ -776,7 +1051,7 @@ impl Server { ) { let mut buffer = vec![0 as u8; mem::size_of::()]; // Read bytes from the socket. We need 8 bytes. - let bytes_read = NetUtil::read_from_stream(stream, &mut buffer, fed_id); + let bytes_read = NetUtil::read_from_socket(stream, &mut buffer, fed_id); if bytes_read < mem::size_of::() { println!("ERROR reading timestamp from federate_info {}.", fed_id); } @@ -806,6 +1081,10 @@ impl Server { locked_rti.set_max_start_time(timestamp); } } + println!( + "num_feds_proposed_start = {}, number_of_enclaves = {}", + num_feds_proposed_start, number_of_enclaves + ); if num_feds_proposed_start == number_of_enclaves { // All federates have proposed a start time. let received_start_times_notifier = Arc::clone(&received_start_times); @@ -2020,4 +2299,171 @@ impl Server { ); } } + + fn send_physical_clock( + fed_id: u16, + _f_rti: Arc>, + message_type: u8, + socket_type: SocketType, + ) { + let state; + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + } + if state == SchedulingNodeState::NotConnected { + println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}. Socket not connected.\n", + fed_id); + return; + } + let mut buffer = vec![0 as u8; std::mem::size_of::() + 1]; + buffer[0] = message_type; + let current_physical_time = Tag::lf_time_physical(); + NetUtil::encode_int64(current_physical_time, &mut buffer, 1); + + // Send the message + // if socket_type == SocketType::UDP { + { + println!( + "[DEBUG] Clock sync: RTI sending UDP message type {}.", + buffer[0] + ); + let mut locked_rti = _f_rti.write().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + // FIXME: udp_addr is initialized as 0.0.0.0. + let udp_addr = fed.udp_addr(); + let socket = locked_rti.socket_descriptor_udp().as_mut().unwrap(); + println!("fed.udp_addr = {:?}", udp_addr); + match socket.send_to(&buffer, udp_addr) { + Ok(bytes_written) => { + if bytes_written < 1 + std::mem::size_of::() { + println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}: \n", fed_id); + return; + } + } + Err(_) => { + println!("Failed to send an UDP message."); + return; + } + } + } + // } else if socket_type == SocketType::TCP { + // println!( + // "[DEBUG] Clock sync: RTI sending TCP message type {}.", + // buffer[0] + // ); + // let locked_rti = _f_rti.read().unwrap(); + // let idx: usize = fed_id.into(); + // let fed = &locked_rti.base().scheduling_nodes()[idx]; + // let stream = fed.stream().as_ref().unwrap(); + // NetUtil::write_to_socket_fail_on_error(stream, &buffer, fed_id, "physical time"); + // } + println!("[DEBUG] Clock sync: RTI sent PHYSICAL_TIME_SYNC_MESSAGE with timestamp ({}) to federate {}.", + current_physical_time, fed_id); + } + + fn handle_physical_clock_sync_message( + fed_id: u16, + _f_rti: Arc>, + socket_type: SocketType, + ) { + // Lock the mutex to prevent interference between sending the two + // coded probe messages. + // let _locked_rti = _f_rti.write().unwrap(); + // Reply with a T4 type message + Self::send_physical_clock( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncT4.to_byte(), + socket_type.clone(), + ); + // Send the corresponding coded probe immediately after, + // but only if this is a UDP channel. + if socket_type == SocketType::UDP { + Self::send_physical_clock( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncCodedProbe.to_byte(), + socket_type, + ); + } + } + + fn send_physical_clock_with_stream( + fed_id: u16, + _f_rti: Arc>, + message_type: u8, + socket_type: SocketType, + stream: &mut TcpStream, + ) { + let state; + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + } + if state == SchedulingNodeState::NotConnected { + println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}. Socket not connected.\n", + fed_id); + return; + } + let mut buffer = vec![0 as u8; std::mem::size_of::() + 1]; + buffer[0] = message_type; + let current_physical_time = Tag::lf_time_physical(); + NetUtil::encode_int64(current_physical_time, &mut buffer, 1); + + // Send the message + // if socket_type == SocketType::UDP { + // println!("Sending messages through UDP is not supported yet."); + // // FIXME: UDP_addr is never initialized. + // println!("[Debug] Clock sync: RTI sending UDP message type %u.", buffer[0]); + // let bytes_written = sendto(rti_remote->socket_descriptor_UDP, buffer, 1 + sizeof(int64_t), 0, + // (struct sockaddr*)&fed->UDP_addr, sizeof(fed->UDP_addr)); + // if bytes_written < std::mem::size_of() + 1 { + // println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}: \n", fed_id); + // return; + // } + // } else if socket_type == SocketType::TCP { + println!( + "[DEBUG] Clock sync: RTI sending TCP message type {}.", + buffer[0] + ); + NetUtil::write_to_socket_fail_on_error(stream, &buffer, fed_id, "physical time"); + // } + println!("[DEBUG] Clock sync: RTI sent PHYSICAL_TIME_SYNC_MESSAGE with timestamp ({}) to federate {}.", + current_physical_time, fed_id); + } + + fn handle_physical_clock_sync_message_with_stream( + fed_id: u16, + _f_rti: Arc>, + socket_type: SocketType, + stream: &mut TcpStream, + ) { + // Lock the mutex to prevent interference between sending the two + // coded probe messages. + // Reply with a T4 type message + Self::send_physical_clock_with_stream( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncT4.to_byte(), + socket_type.clone(), + stream, + ); + // Send the corresponding coded probe immediately after, + // but only if this is a UDP channel. + // if socket_type == SocketType::UDP { + // Self::send_physical_clock_with_stream( + // fed_id, + // _f_rti.clone(), + // MsgType::ClockSyncCodedProbe.to_byte(), + // socket_type, + // stream + // ); + // } + } }