Skip to content

Commit

Permalink
Merge pull request #1 from Auterion/timestamp_fix
Browse files Browse the repository at this point in the history
fix stream_parser: allow for message formats without timestamp
  • Loading branch information
BWStearns authored Apr 5, 2024
2 parents 9a31b19 + 91b5e46 commit 41b5b61
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
78 changes: 62 additions & 16 deletions src/stream_parser/file_reader.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::stream_parser::model::{ParseErrorType, UlogParseError};
use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::Read;
use std::iter::FromIterator;
use std::ops::DerefMut;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use super::model;
use crate::unpack;
Expand Down Expand Up @@ -220,8 +217,27 @@ impl<'c> LogParser<'c> {
"flag bits at bad position",
));
}
let flag_bits = parse_flag_bits(&msg)?;

// Check for incompatible flag bits. If there's any unknown bits set, we cannot
// parse the log
const ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK: u8 = 1u8 << 0;
if (flag_bits.incompat_flags[0] & !ULOG_INCOMPAT_FLAG0_DATA_APPENDED_MASK) != 0 {
return Err(UlogParseError::new(
ParseErrorType::Other,
&format!("Cannot parse log, incompatible flag bits set (idx={}, value={})", 0, flag_bits.incompat_flags[0])
));
}
for flag_idx in 1..flag_bits.incompat_flags.len() {
if flag_bits.incompat_flags[flag_idx] != 0 {
return Err(UlogParseError::new(
ParseErrorType::Other,
&format!("Cannot parse log, incompatible flag bits set (idx={}, value={})", flag_idx, flag_bits.incompat_flags[flag_idx])
));
}
}

self.status = ParseStatus::InDefinitions;
//TODO: read message
}
model::MessageType::Format => {
let format = parse_format(&msg)?;
Expand Down Expand Up @@ -361,8 +377,11 @@ impl<'c> LogParser<'c> {
),
));
}
let current_timestamp =
flattened_format.timestamp_field.parse_timestamp(msg.data());
let timestamp_field = flattened_format.timestamp_field.as_ref().ok_or_else(|| UlogParseError::new(
ParseErrorType::Other,
&format!("Message does not have a timestamp field {}", flattened_format.message_name),
))?;
let current_timestamp = timestamp_field.parse_timestamp(msg.data());
if *last_timestamp < current_timestamp {
*last_timestamp = current_timestamp;
if let Some(cb) = &mut self.data_message_callback {
Expand Down Expand Up @@ -452,6 +471,33 @@ impl MaybeRepeatedType {
}
}

#[derive(Debug)]
struct FlagBits {
compat_flags: [u8; 8],
incompat_flags: [u8; 8],
appended_offsets: [u64; 3],
}

fn parse_flag_bits(message: &model::ULogMessage) -> Result<FlagBits, UlogParseError> {
const MINIMUM_MESSAGE_LENGTH: usize = 40;
if message.data().len() < MINIMUM_MESSAGE_LENGTH {
return Err(UlogParseError::new(
ParseErrorType::Other,
&format!("FlagBits message too small: {} < 40", message.data.len()),
));
}

let mut compat_flags: [u8; 8] = Default::default();
compat_flags.copy_from_slice(&message.data[0..8]);
let mut incompat_flags: [u8; 8] = Default::default();
incompat_flags.copy_from_slice(&message.data[8..16]);
Ok(FlagBits {
compat_flags,
incompat_flags,
appended_offsets: [unpack::as_u64_le( & message.data[16..24]), unpack::as_u64_le( & message.data[24..32]), unpack::as_u64_le( & message.data[32..40])],
})
}

#[derive(Debug)]
struct Field {
field_name: String,
Expand Down Expand Up @@ -780,27 +826,27 @@ pub fn read_file_with_simple_callback<CB: FnMut(&Message) -> SimpleCallbackResul
file_path: &str,
c: &mut CB,
) -> Result<usize, std::io::Error> {
let stop_reading = Arc::new(AtomicBool::new(false));
let c_cell: Rc<RefCell<&mut CB>> = Rc::new(RefCell::new(c));
let stop_reading = Cell::new(false);
let c_cell: RefCell<&mut CB> = RefCell::new(c);
let mut wrapped_data_message_callback = |data_message: &DataMessage| {
if let SimpleCallbackResult::Stop =
c_cell.as_ref().borrow_mut().deref_mut()(&Message::Data(&data_message))
c_cell.borrow_mut().deref_mut()(&Message::Data(&data_message))
{
stop_reading.store(true, Ordering::Relaxed)
stop_reading.set(true);
}
};
let mut wrapped_string_message_callback = |data_message: &model::LoggedStringMessage| {
if let SimpleCallbackResult::Stop =
c_cell.as_ref().borrow_mut().deref_mut()(&Message::LoggedMessage(&data_message))
c_cell.borrow_mut().deref_mut()(&Message::LoggedMessage(&data_message))
{
stop_reading.store(true, Ordering::Relaxed)
stop_reading.set(true);
}
};
let mut wrapped_parameter_message_callback = |parameter_message: &model::ParameterMessage| {
if let SimpleCallbackResult::Stop =
c_cell.as_ref().borrow_mut().deref_mut()(&Message::ParameterMessage(&parameter_message))
c_cell.borrow_mut().deref_mut()(&Message::ParameterMessage(&parameter_message))
{
stop_reading.store(true, Ordering::Relaxed)
stop_reading.set(true);
}
};
let mut log_parser = LogParser::default();
Expand All @@ -812,7 +858,7 @@ pub fn read_file_with_simple_callback<CB: FnMut(&Message) -> SimpleCallbackResul
let mut f = std::fs::File::open(file_path)?;
const READ_START: usize = 64 * 1024;
let mut buf = [0u8; 1024 * 1024];
while !stop_reading.load(Ordering::Relaxed) {
while !stop_reading.get() {
let num_bytes_read = f.read(&mut buf[READ_START..])?;
if num_bytes_read == 0 {
break;
Expand Down
25 changes: 10 additions & 15 deletions src/stream_parser/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub struct FlattenedFormat {
pub message_name: String,
pub fields: Vec<FlattenedField>,
name_to_field: HashMap<String, FlattenedField>,
pub timestamp_field: TimestampField,
pub timestamp_field: Option<TimestampField>,
size: u16,
}

Expand All @@ -187,7 +187,7 @@ impl FlattenedFormat {
.iter()
.map(|f| (f.flattened_field_name.to_string(), (*f).clone()))
.collect();
match name_to_field
let timestamp_field = name_to_field
.get("timestamp")
.and_then(|field| match field.field_type {
FlattenedFieldType::UInt8 => Some(TimestampField {
Expand All @@ -207,19 +207,14 @@ impl FlattenedFormat {
offset: field.offset,
}),
_ => None,
}) {
Some(timestamp_field) => Ok(Self {
message_name,
fields,
name_to_field,
timestamp_field,
size,
}),
None => Err(UlogParseError::new(
ParseErrorType::Other,
&format!("Message does not have a timestamp field {}", message_name),
)),
}
});
Ok(Self {
message_name,
fields,
name_to_field,
timestamp_field,
size,
})
}

pub fn get_field_offset(
Expand Down

0 comments on commit 41b5b61

Please sign in to comment.