Skip to content

Commit

Permalink
ignore non-increasing timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Etter committed Mar 3, 2020
1 parent 5ea526c commit b3c3443
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/full_parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn read_file(file_path: &str) -> Result<ParsedData, std::io::Error> {
.consume_bytes(&buf[READ_START..(READ_START + num_bytes_read)])
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("err: {:?}", e)))?;
}
let data_format = parser.get_final_data_format();
let mut data_format = parser.get_final_data_format();

let mut messages = HashMap::<String, HashMap<MultiId, HashMap<String, SomeVec>>>::new();
for msg_id in 0..reader.messages.len() {
Expand Down
66 changes: 29 additions & 37 deletions src/stream_parser/file_reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::stream_parser::model::{ParseErrorType, UlogParseError};
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -33,7 +34,8 @@ impl Default for ParseStatus {
#[derive(Default)]
pub struct DataFormat {
flattened_format: HashMap<String, FlattenedFormat>,
registered_messages: HashMap<u16, (FlattenedFormat, MultiId)>,
// msg_id -> (flattened_format, multi_id, last_timestamp)
registered_messages: HashMap<u16, (FlattenedFormat, MultiId, u64)>,
}

impl DataFormat {
Expand All @@ -51,10 +53,10 @@ impl DataFormat {
multi_id: u8,
) -> Result<(), UlogParseError> {
if let Some(flattened_message) = self.flattened_format.get(message_name) {
if let Some(preexisting_message) = self
.registered_messages
.insert(msg_id, (flattened_message.clone(), MultiId::new(multi_id)))
{
if let Some(preexisting_message) = self.registered_messages.insert(
msg_id,
(flattened_message.clone(), MultiId::new(multi_id), 0),
) {
return Err(UlogParseError::new(
ParseErrorType::Other,
&format!(
Expand All @@ -78,8 +80,11 @@ impl DataFormat {
}

// This should actually never return None
pub fn get_message_description(&self, msg_id: u16) -> Option<&(FlattenedFormat, MultiId)> {
self.registered_messages.get(&msg_id)
pub fn get_message_description(
&mut self,
msg_id: u16,
) -> Option<&mut (FlattenedFormat, MultiId, u64)> {
self.registered_messages.get_mut(&msg_id)
}
}

Expand All @@ -99,27 +104,6 @@ pub struct LogParser<'c> {
const MAX_MESSAGE_SIZE: usize = 2 + 1 + (u16::max_value() as usize);
const HEADER_BYTES: [u8; 7] = [85, 76, 111, 103, 1, 18, 53];

#[derive(Debug)]
pub struct UlogParseError {
error_type: ParseErrorType,
description: String,
}

impl UlogParseError {
fn new(error_type: ParseErrorType, description: &str) -> Self {
Self {
error_type,
description: description.to_string(),
}
}
}

#[derive(Debug)]
pub enum ParseErrorType {
InvalidFile,
Other,
}

impl<'c> LogParser<'c> {
pub fn set_data_message_callback<CB: FnMut(&model::DataMessage)>(&mut self, c: &'c mut CB) {
self.data_message_callback = Some(c)
Expand Down Expand Up @@ -358,7 +342,7 @@ impl<'c> LogParser<'c> {
));
}
let msg_id = unpack::as_u16_le(&msg.data[0..2]);
let (flattened_format, multi_id) = self
let (ref mut flattened_format, ref mut multi_id, ref mut last_timestamp) = self
.flattened_format
.get_message_description(msg_id)
.ok_or_else(|| {
Expand All @@ -377,13 +361,21 @@ impl<'c> LogParser<'c> {
),
));
}
if let Some(cb) = &mut self.data_message_callback {
cb(&DataMessage {
msg_id,
multi_id: multi_id.clone(),
data: msg.data(),
flattened_format,
});
let current_timestamp =
flattened_format.timestamp_field.parse_timestamp(msg.data());
if *last_timestamp < current_timestamp {
*last_timestamp = current_timestamp;
if let Some(cb) = &mut self.data_message_callback {
cb(&DataMessage {
msg_id,
multi_id: multi_id.clone(),
data: msg.data(),
flattened_format,
});
}
} else {
// TODO: have some failure state for this.
// Encountered bad timestamp, ignore
}
}

Expand Down Expand Up @@ -766,7 +758,7 @@ fn flatten_format(
}
result.insert(
message_name.to_string(),
FlattenedFormat::new(message_name.to_string(), flattened_fields, u16_offset),
FlattenedFormat::new(message_name.to_string(), flattened_fields, u16_offset)?,
);
}

Expand Down
100 changes: 89 additions & 11 deletions src/stream_parser/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,64 @@ pub struct FlattenedField {
pub offset: u16, // relative to the beginning of the message ()
}

#[derive(Clone, Debug, PartialEq)]
pub enum TimestampFieldType {
UInt8,
UInt16,
UInt32,
UInt64,
}

#[derive(Clone, Debug)]
pub struct TimestampField {
pub field_type: TimestampFieldType,
pub offset: u16, // relative to the beginning of the message ()
}

impl TimestampField {
pub fn parse_timestamp(&self, data: &[u8]) -> u64 {
match self.field_type {
TimestampFieldType::UInt8 => u8::parse(&data[self.offset as usize..]) as u64,
TimestampFieldType::UInt16 => u16::parse(&data[self.offset as usize..]) as u64,
TimestampFieldType::UInt32 => u32::parse(&data[self.offset as usize..]) as u64,
TimestampFieldType::UInt64 => u64::parse(&data[self.offset as usize..]),
}
}
}

#[derive(Debug)]
pub enum FieldLookupError {
MissingField,
TypeMismatch,
}

#[derive(Debug)]
pub struct UlogParseError {
error_type: ParseErrorType,
description: String,
}

impl UlogParseError {
pub fn new(error_type: ParseErrorType, description: &str) -> Self {
Self {
error_type,
description: description.to_string(),
}
}
}

#[derive(Debug)]
pub enum ParseErrorType {
InvalidFile,
Other,
}

#[derive(Clone, Debug)]
pub struct FlattenedFormat {
pub message_name: String,
pub fields: Vec<FlattenedField>,
name_to_field: HashMap<String, FlattenedField>,
pub timestamp_field: TimestampField,
size: u16,
}

Expand All @@ -131,16 +178,47 @@ pub trait ParseableFieldType: LittleEndianParser + FlattenedFieldTypeMatcher {}
impl<T: LittleEndianParser + FlattenedFieldTypeMatcher> ParseableFieldType for T {}

impl FlattenedFormat {
pub fn new(message_name: String, fields: Vec<FlattenedField>, size: u16) -> Self {
pub fn new(
message_name: String,
fields: Vec<FlattenedField>,
size: u16,
) -> Result<Self, UlogParseError> {
let name_to_field: HashMap<String, FlattenedField> = fields
.iter()
.map(|f| (f.flattened_field_name.to_string(), (*f).clone()))
.collect();
Self {
message_name,
fields,
name_to_field,
size,
match name_to_field
.get("timestamp")
.and_then(|field| match field.field_type {
FlattenedFieldType::UInt8 => Some(TimestampField {
field_type: TimestampFieldType::UInt8,
offset: field.offset,
}),
FlattenedFieldType::UInt16 => Some(TimestampField {
field_type: TimestampFieldType::UInt16,
offset: field.offset,
}),
FlattenedFieldType::UInt32 => Some(TimestampField {
field_type: TimestampFieldType::UInt32,
offset: field.offset,
}),
FlattenedFieldType::UInt64 => Some(TimestampField {
field_type: TimestampFieldType::UInt64,
offset: field.offset,
}),
_ => None,
}) {
Some(timestamp_field) => Ok(Self {
message_name,
fields,
name_to_field,
timestamp_field,
size,
}),
None => Err(UlogParseError::new(
ParseErrorType::Other,
&format!("Message does not have a timestamp field {}", message_name),
)),
}
}

Expand Down Expand Up @@ -252,16 +330,16 @@ mod tests {
use super::*;

#[test]
fn parses_i32() {
fn parses_u32() {
let mut data: [u8; 256] = [0; 256];
data[13] = 1;
let field = FlattenedField {
flattened_field_name: "field0".to_string(),
field_type: FlattenedFieldType::Int32,
flattened_field_name: "timestamp".to_string(),
field_type: FlattenedFieldType::UInt32,
offset: 10, // relative to the beginning of the message ()
};
let flattened_format =
FlattenedFormat::new("message".to_string(), vec![field.clone()], 500);
FlattenedFormat::new("message".to_string(), vec![field.clone()], 500).unwrap();
let data_msg = DataMessage {
msg_id: 1,
multi_id: MultiId(10),
Expand All @@ -270,7 +348,7 @@ mod tests {
};
let parser = data_msg
.flattened_format
.get_field_parser::<i32>("field0")
.get_field_parser::<u32>("timestamp")
.expect("could not get parser");
assert_eq!(10, parser.offset());
assert_eq!(0x01000000, parser.parse(&data));
Expand Down

0 comments on commit b3c3443

Please sign in to comment.