diff --git a/src/stream_parser/file_reader.rs b/src/stream_parser/file_reader.rs index 78f53d4..331bdae 100644 --- a/src/stream_parser/file_reader.rs +++ b/src/stream_parser/file_reader.rs @@ -1,13 +1,10 @@ use crate::stream_parser::model::{ParseErrorType, UlogParseError}; -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::collections::HashSet; use std::io::Read; use std::iter::FromIterator; use std::ops::DerefMut; -use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use super::model; use crate::unpack; @@ -220,8 +217,27 @@ impl<'c> LogParser<'c> { "flag bits at bad position", )); } + let flag_bits = parse_flag_bits(&msg)?; + + // Check for incompatible flag bits. If there's any unknown bits set, we cannot + // parse the log + const ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK: u8 = 1u8 << 0; + if (flag_bits.incompat_flags[0] & !ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK) != 0 { + return Err(UlogParseError::new( + ParseErrorType::Other, + &format!("Cannot parse log, incompatible flag bits set (idx={}, value={})", 0, flag_bits.incompat_flags[0]) + )); + } + for flag_idx in 1..flag_bits.incompat_flags.len() { + if flag_bits.incompat_flags[flag_idx] != 0 { + return Err(UlogParseError::new( + ParseErrorType::Other, + &format!("Cannot parse log, incompatible flag bits set (idx={}, value={})", flag_idx, flag_bits.incompat_flags[flag_idx]) + )); + } + } + self.status = ParseStatus::InDefinitions; - //TODO: read message } model::MessageType::Format => { let format = parse_format(&msg)?; @@ -361,8 +377,11 @@ impl<'c> LogParser<'c> { ), )); } - let current_timestamp = - flattened_format.timestamp_field.parse_timestamp(msg.data()); + let timestamp_field = flattened_format.timestamp_field.as_ref().ok_or_else(|| UlogParseError::new( + ParseErrorType::Other, + &format!("Message does not have a timestamp field {}", flattened_format.message_name), + ))?; + let current_timestamp = timestamp_field.parse_timestamp(msg.data()); if *last_timestamp < current_timestamp { *last_timestamp = current_timestamp; if let Some(cb) = &mut self.data_message_callback { @@ -452,6 +471,33 @@ impl MaybeRepeatedType { } } +#[derive(Debug)] +struct FlagBits { + compat_flags: [u8; 8], + incompat_flags: [u8; 8], + appended_offsets: [u64; 3], +} + +fn parse_flag_bits(message: &model::ULogMessage) -> Result { + const MINIMUM_MESSAGE_LENGTH: usize = 40; + if message.data().len() < MINIMUM_MESSAGE_LENGTH { + return Err(UlogParseError::new( + ParseErrorType::Other, + &format!("FlagBits message too small: {} < 40", message.data.len()), + )); + } + + let mut compat_flags: [u8; 8] = Default::default(); + compat_flags.copy_from_slice(&message.data[0..8]); + let mut incompat_flags: [u8; 8] = Default::default(); + incompat_flags.copy_from_slice(&message.data[8..16]); + Ok(FlagBits { + compat_flags, + incompat_flags, + appended_offsets: [unpack::as_u64_le( & message.data[16..24]), unpack::as_u64_le( & message.data[24..32]), unpack::as_u64_le( & message.data[32..40])], + }) +} + #[derive(Debug)] struct Field { field_name: String, @@ -780,27 +826,27 @@ pub fn read_file_with_simple_callback SimpleCallbackResul file_path: &str, c: &mut CB, ) -> Result { - let stop_reading = Arc::new(AtomicBool::new(false)); - let c_cell: Rc> = Rc::new(RefCell::new(c)); + let stop_reading = Cell::new(false); + let c_cell: RefCell<&mut CB> = RefCell::new(c); let mut wrapped_data_message_callback = |data_message: &DataMessage| { if let SimpleCallbackResult::Stop = - c_cell.as_ref().borrow_mut().deref_mut()(&Message::Data(&data_message)) + c_cell.borrow_mut().deref_mut()(&Message::Data(&data_message)) { - stop_reading.store(true, Ordering::Relaxed) + stop_reading.set(true); } }; let mut wrapped_string_message_callback = |data_message: &model::LoggedStringMessage| { if let SimpleCallbackResult::Stop = - c_cell.as_ref().borrow_mut().deref_mut()(&Message::LoggedMessage(&data_message)) + c_cell.borrow_mut().deref_mut()(&Message::LoggedMessage(&data_message)) { - stop_reading.store(true, Ordering::Relaxed) + stop_reading.set(true); } }; let mut wrapped_parameter_message_callback = |parameter_message: &model::ParameterMessage| { if let SimpleCallbackResult::Stop = - c_cell.as_ref().borrow_mut().deref_mut()(&Message::ParameterMessage(¶meter_message)) + c_cell.borrow_mut().deref_mut()(&Message::ParameterMessage(¶meter_message)) { - stop_reading.store(true, Ordering::Relaxed) + stop_reading.set(true); } }; let mut log_parser = LogParser::default(); @@ -812,7 +858,7 @@ pub fn read_file_with_simple_callback SimpleCallbackResul let mut f = std::fs::File::open(file_path)?; const READ_START: usize = 64 * 1024; let mut buf = [0u8; 1024 * 1024]; - while !stop_reading.load(Ordering::Relaxed) { + while !stop_reading.get() { let num_bytes_read = f.read(&mut buf[READ_START..])?; if num_bytes_read == 0 { break; diff --git a/src/stream_parser/model.rs b/src/stream_parser/model.rs index 1e944fd..d9a988e 100644 --- a/src/stream_parser/model.rs +++ b/src/stream_parser/model.rs @@ -168,7 +168,7 @@ pub struct FlattenedFormat { pub message_name: String, pub fields: Vec, name_to_field: HashMap, - pub timestamp_field: TimestampField, + pub timestamp_field: Option, size: u16, } @@ -187,7 +187,7 @@ impl FlattenedFormat { .iter() .map(|f| (f.flattened_field_name.to_string(), (*f).clone())) .collect(); - match name_to_field + let timestamp_field = name_to_field .get("timestamp") .and_then(|field| match field.field_type { FlattenedFieldType::UInt8 => Some(TimestampField { @@ -207,19 +207,14 @@ impl FlattenedFormat { offset: field.offset, }), _ => None, - }) { - Some(timestamp_field) => Ok(Self { - message_name, - fields, - name_to_field, - timestamp_field, - size, - }), - None => Err(UlogParseError::new( - ParseErrorType::Other, - &format!("Message does not have a timestamp field {}", message_name), - )), - } + }); + Ok(Self { + message_name, + fields, + name_to_field, + timestamp_field, + size, + }) } pub fn get_field_offset(