From d99005f8507e3f6a3c9cd9b6a2b2b3ed7e9db87a Mon Sep 17 00:00:00 2001 From: Ammar Abou Zor Date: Thu, 29 Aug 2024 15:35:03 +0200 Subject: [PATCH 1/2] Nested Parsers: Architectures proposal... * Resolver will be passed to resolve method instead of parser returning a template. * Parsing log message is returning an error which is defined by each type implementing LogMessage trait. * Types with no errors sets the error to infallible for compiler optimizations. * This approach enables saving parsing errors. * Nested errors are ignored for now --- application/apps/indexer/Cargo.lock | 15 +- application/apps/indexer/Cargo.toml | 4 +- .../indexer/indexer_cli/src/interactive.rs | 13 +- application/apps/indexer/parsers/Cargo.toml | 4 +- .../apps/indexer/parsers/src/dlt/fmt.rs | 136 +++++++++++++++--- .../apps/indexer/parsers/src/dlt/mod.rs | 29 ++-- application/apps/indexer/parsers/src/lib.rs | 108 +++++++++++++- .../apps/indexer/parsers/src/nested_parser.rs | 90 ++++++++++++ .../apps/indexer/parsers/src/someip.rs | 17 ++- application/apps/indexer/parsers/src/text.rs | 11 +- application/apps/indexer/session/Cargo.toml | 4 +- .../session/src/handlers/observing/mod.rs | 48 ++++++- .../apps/rustcore/rs-bindings/Cargo.lock | 15 +- 13 files changed, 428 insertions(+), 66 deletions(-) create mode 100644 application/apps/indexer/parsers/src/nested_parser.rs diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock index e7cbfaebfa..453a5d379e 100644 --- a/application/apps/indexer/Cargo.lock +++ b/application/apps/indexer/Cargo.lock @@ -650,9 +650,8 @@ dependencies = [ [[package]] name = "dlt-core" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6fd69a328826e883613fbbcf468a33a53df1198c3e39bae7ad079c449431bfd" +version = "0.16.0" +source = "git+https://github.com/kruss/dlt-core.git?branch=dlt_network_traces#525f591862437ca15c56639143f205a956d01b6a" dependencies = [ "buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder", @@ -1686,9 +1685,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.22.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b" +checksum = "11bafc859c6815fbaffbbbf4229ecb767ac913fecb27f9ad4343662e9ef099ea" dependencies = [ "memchr", ] @@ -2085,12 +2084,12 @@ dependencies = [ [[package]] name = "someip-payload" -version = "0.1.1" -source = "git+https://github.com/esrlabs/someip-payload#9a58a561a3d284e37a25ea2dc52188c70694682d" +version = "0.1.3" +source = "git+https://github.com/kruss/someip-payload.git?branch=robustness#ccd100907b38f60d9e1ac657250111b2e0a69518" dependencies = [ "byteorder", "log", - "quick-xml 0.22.0", + "quick-xml 0.23.1", "regex", "thiserror", "ux", diff --git a/application/apps/indexer/Cargo.toml b/application/apps/indexer/Cargo.toml index ea5581935d..3998b4b67a 100644 --- a/application/apps/indexer/Cargo.toml +++ b/application/apps/indexer/Cargo.toml @@ -22,7 +22,9 @@ thiserror = "1.0" lazy_static = "1.4" tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" -dlt-core = "0.14" +# dlt-core = "0.14" +# TODO https://github.com/esrlabs/dlt-core/pull/24 +dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "dlt_network_traces" } crossbeam-channel = "0.5" futures = "0.3" tokio-util = "0.7" diff --git a/application/apps/indexer/indexer_cli/src/interactive.rs b/application/apps/indexer/indexer_cli/src/interactive.rs index a12215d399..32a8e5a41b 100644 --- a/application/apps/indexer/indexer_cli/src/interactive.rs +++ b/application/apps/indexer/indexer_cli/src/interactive.rs @@ -1,6 +1,10 @@ use crate::{duration_report, Instant}; use futures::{pin_mut, stream::StreamExt}; -use parsers::{dlt::DltParser, MessageStreamItem, ParseYield}; +use parsers::{ + dlt::DltParser, + nested_parser::{resolve_log_msg, ParseRestResolver}, + MessageStreamItem, ParseYield, +}; use processor::grabber::LineRange; use rustyline::{error::ReadlineError, DefaultEditor}; use session::session::Session; @@ -46,6 +50,7 @@ pub(crate) async fn handle_interactive_session(input: Option) { let udp_source = UdpSource::new(RECEIVER, vec![]).await.unwrap(); let dlt_parser = DltParser::new(None, None, None, false); let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, None); + let mut parse_reslover = ParseRestResolver::new(); let msg_stream = dlt_msg_producer.as_stream(); pin_mut!(msg_stream); loop { @@ -56,10 +61,12 @@ pub(crate) async fn handle_interactive_session(input: Option) { } item = msg_stream.next() => { match item { - Some((_, MessageStreamItem::Item(ParseYield::Message(msg)))) => { + Some((_, MessageStreamItem::Item(ParseYield::Message(item)))) => { + let msg = resolve_log_msg(item, &mut parse_reslover); println!("msg: {msg}"); } - Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((msg, attachment))))) => { + Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((item, attachment))))) => { + let msg = resolve_log_msg(item, &mut parse_reslover); println!("msg: {msg}, attachment: {attachment:?}"); } Some((_, MessageStreamItem::Item(ParseYield::Attachment(attachment)))) => { diff --git a/application/apps/indexer/parsers/Cargo.toml b/application/apps/indexer/parsers/Cargo.toml index c2ac11d3ac..8645323e88 100644 --- a/application/apps/indexer/parsers/Cargo.toml +++ b/application/apps/indexer/parsers/Cargo.toml @@ -18,7 +18,9 @@ rand.workspace = true # someip-messages = { path = "../../../../../someip"} someip-messages = { git = "https://github.com/esrlabs/someip" } # someip-payload = { path = "../../../../../someip-payload" } -someip-payload = { git = "https://github.com/esrlabs/someip-payload" } +# someip-payload = { git = "https://github.com/esrlabs/someip-payload" } +# TODO +someip-payload = { git = "https://github.com/kruss/someip-payload.git", branch = "robustness" } [dev-dependencies] stringreader = "0.1.1" diff --git a/application/apps/indexer/parsers/src/dlt/fmt.rs b/application/apps/indexer/parsers/src/dlt/fmt.rs index 52941eee2b..4c3d5deddb 100644 --- a/application/apps/indexer/parsers/src/dlt/fmt.rs +++ b/application/apps/indexer/parsers/src/dlt/fmt.rs @@ -28,10 +28,15 @@ use log::trace; use serde::ser::{Serialize, SerializeStruct, Serializer}; use std::{ - fmt::{self, Formatter}, + fmt::{self, Display, Formatter, Write}, str, }; +use crate::{ + nested_parser::ParseRestResolver, GeneralParseLogError, LogMessage, ParseLogSeverity, + ResolveParseHint, +}; + const DLT_COLUMN_SENTINAL: char = '\u{0004}'; const DLT_ARGUMENT_SENTINAL: char = '\u{0005}'; const DLT_NEWLINE_SENTINAL_SLICE: &[u8] = &[0x6]; @@ -281,6 +286,17 @@ impl<'a> Serialize for FormattableMessage<'a> { None => state.serialize_field("payload", "[Unknown CtrlCommand]")?, } } + PayloadContent::NetworkTrace(slices) => { + state.serialize_field("app-id", &ext_header_app_id)?; + state.serialize_field("context-id", &ext_header_context_id)?; + state.serialize_field("message-type", &ext_header_msg_type)?; + let arg_string = slices + .iter() + .map(|slice| format!("{:02X?}", slice)) + .collect::>() + .join("|"); + state.serialize_field("payload", &arg_string)?; + } } state.end() } @@ -386,12 +402,25 @@ impl<'a> FormattableMessage<'a> { payload_string, )) } + PayloadContent::NetworkTrace(slices) => { + let payload_string = slices + .iter() + .map(|slice| format!("{:02X?}", slice)) + .collect::>() + .join("|"); + Ok(PrintableMessage::new( + ext_h_app_id, + eh_ctx_id, + ext_h_msg_type, + payload_string, + )) + } } } fn write_app_id_context_id_and_message_type( &self, - f: &mut fmt::Formatter, + f: &mut impl std::fmt::Write, ) -> Result<(), fmt::Error> { match self.message.extended_header.as_ref() { Some(ext) => { @@ -419,7 +448,7 @@ impl<'a> FormattableMessage<'a> { &self, id: u32, data: &[u8], - f: &mut fmt::Formatter, + f: &mut impl std::fmt::Write, ) -> fmt::Result { trace!("format_nonverbose_data"); let mut fibex_info_added = false; @@ -511,7 +540,16 @@ impl<'a> FormattableMessage<'a> { } } -impl<'a> fmt::Display for FormattableMessage<'a> { +impl LogMessage for FormattableMessage<'_> { + type ParseError = GeneralParseLogError; + + fn to_writer(&self, writer: &mut W) -> Result { + let bytes = self.message.as_bytes(); + let len = bytes.len(); + writer.write_all(&bytes)?; + Ok(len) + } + /// will format dlt Message with those fields: /// ********* storage-header ******** /// date-time @@ -528,43 +566,101 @@ impl<'a> fmt::Display for FormattableMessage<'a> { /// context-id /// /// payload - fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + fn try_resolve( + &self, + resolver: Option<&mut ParseRestResolver>, + ) -> Result { + let mut msg = String::new(); + // Taken from Documentation: string formatting is considered an infallible operation. + // Thus we can ignore `fmt::Error` errors. + // Link from Clippy: 'https://rust-lang.github.io/rust-clippy/master/index.html#/format_push_string' + // TODO: Consider another way of concatenating the string after prototyping. if let Some(h) = &self.message.storage_header { let tz = self.options.map(|o| o.tz); match tz { Some(Some(tz)) => { - write_tz_string(f, &h.timestamp, &tz)?; - write!(f, "{DLT_COLUMN_SENTINAL}{}", h.ecu_id)?; + let _ = write_tz_string(&mut msg, &h.timestamp, &tz); + let _ = write!(msg, "{DLT_COLUMN_SENTINAL}{}", h.ecu_id); + } + _ => { + let _ = write!(msg, "{}", DltStorageHeader(h)); } - _ => write!(f, "{}", DltStorageHeader(h))?, }; } let header = DltStandardHeader(&self.message.header); - write!(f, "{DLT_COLUMN_SENTINAL}",)?; - write!(f, "{header}")?; - write!(f, "{DLT_COLUMN_SENTINAL}",)?; + write!(msg, "{DLT_COLUMN_SENTINAL}",).unwrap(); + write!(msg, "{header}").unwrap(); + write!(msg, "{DLT_COLUMN_SENTINAL}",).unwrap(); match &self.message.payload { PayloadContent::Verbose(arguments) => { - self.write_app_id_context_id_and_message_type(f)?; - arguments - .iter() - .try_for_each(|arg| write!(f, "{}{}", DLT_ARGUMENT_SENTINAL, DltArgument(arg))) + let _ = self.write_app_id_context_id_and_message_type(&mut msg); + arguments.iter().for_each(|arg| { + let _ = write!(msg, "{}{}", DLT_ARGUMENT_SENTINAL, DltArgument(arg)); + }); + } + PayloadContent::NonVerbose(id, data) => { + let _ = self.format_nonverbose_data(*id, data, &mut msg); } - PayloadContent::NonVerbose(id, data) => self.format_nonverbose_data(*id, data, f), PayloadContent::ControlMsg(ctrl_id, _data) => { - self.write_app_id_context_id_and_message_type(f)?; + let _ = self.write_app_id_context_id_and_message_type(&mut msg); match service_id_lookup(ctrl_id.value()) { - Some((name, _desc)) => write!(f, "[{name}]"), - None => write!(f, "[Unknown CtrlCommand]"), + Some((name, _desc)) => { + let _ = write!(msg, "[{name}]"); + } + None => { + let _ = write!(msg, "[Unknown CtrlCommand]"); + } } } + PayloadContent::NetworkTrace(slices) => { + let _ = self.write_app_id_context_id_and_message_type(&mut msg); + let is_someip = self + .message + .extended_header + .as_ref() + .is_some_and(|ext_header| { + matches!( + ext_header.message_type, + MessageType::NetworkTrace(NetworkTraceType::Ipc) + | MessageType::NetworkTrace(NetworkTraceType::Someip) + ) + }); + + if is_someip { + if let Some(resolver) = resolver { + if let Some(slice) = slices.get(1) { + match resolver.try_resolve(slice, ResolveParseHint::SomeIP) { + Some(Ok(resolved)) => { + let _ = write!(msg, "{resolved}"); + return Ok(msg); + } + Some(Err(_)) | None => { + //TODO: Ignore nested Error while prototyping + } + } + } + } + } + + slices.iter().for_each(|slice| { + let _ = write!(msg, "{}{:02X?}", DLT_ARGUMENT_SENTINAL, slice); + }); + + return Err(GeneralParseLogError::new( + msg, + "Error while resolving Network trace payload".into(), + ParseLogSeverity::Error, + )); + } } + + Ok(msg) } } fn write_tz_string( - f: &mut Formatter, + f: &mut impl std::fmt::Write, time_stamp: &DltTimeStamp, tz: &Tz, ) -> Result<(), fmt::Error> { diff --git a/application/apps/indexer/parsers/src/dlt/mod.rs b/application/apps/indexer/parsers/src/dlt/mod.rs index dae82f4ce4..7bd71434d0 100644 --- a/application/apps/indexer/parsers/src/dlt/mod.rs +++ b/application/apps/indexer/parsers/src/dlt/mod.rs @@ -13,19 +13,10 @@ use dlt_core::{ parse::{dlt_consume_msg, dlt_message}, }; use serde::Serialize; -use std::{io::Write, ops::Range}; +use std::{convert::Infallible, fmt::Display, io::Write, ops::Range}; use self::{attachment::FtScanner, fmt::FormatOptions}; -impl LogMessage for FormattableMessage<'_> { - fn to_writer(&self, writer: &mut W) -> Result { - let bytes = self.message.as_bytes(); - let len = bytes.len(); - writer.write_all(&bytes)?; - Ok(len) - } -} - #[derive(Debug, Serialize)] pub struct RawMessage { pub content: Vec, @@ -48,20 +39,38 @@ impl std::fmt::Display for RawMessage { } impl LogMessage for RangeMessage { + type ParseError = Infallible; + /// A RangeMessage only has range information and cannot serialize to bytes fn to_writer(&self, writer: &mut W) -> Result { writer.write_u64::(self.range.start as u64)?; writer.write_u64::(self.range.end as u64)?; Ok(8 + 8) } + + fn try_resolve( + &self, + _resolver: Option<&mut crate::nested_parser::ParseRestResolver>, + ) -> Result { + Ok(self) + } } impl LogMessage for RawMessage { + type ParseError = Infallible; + fn to_writer(&self, writer: &mut W) -> Result { let len = self.content.len(); writer.write_all(&self.content)?; Ok(len) } + + fn try_resolve( + &self, + _resolver: Option<&mut crate::nested_parser::ParseRestResolver>, + ) -> Result { + Ok(self) + } } #[derive(Default)] diff --git a/application/apps/indexer/parsers/src/lib.rs b/application/apps/indexer/parsers/src/lib.rs index 1b7fce8efe..9c8527fd29 100644 --- a/application/apps/indexer/parsers/src/lib.rs +++ b/application/apps/indexer/parsers/src/lib.rs @@ -1,9 +1,11 @@ #![deny(unused_crate_dependencies)] pub mod dlt; +pub mod nested_parser; pub mod someip; pub mod text; +use nested_parser::ParseRestResolver; use serde::Serialize; -use std::{fmt::Display, io::Write}; +use std::{convert::Infallible, fmt::Display, io::Write}; use thiserror::Error; extern crate log; @@ -78,12 +80,6 @@ pub enum ByteRepresentation { Range((usize, usize)), } -pub trait LogMessage: Display + Serialize { - /// Serializes a message directly into a Writer - /// returns the size of the serialized message - fn to_writer(&self, writer: &mut W) -> Result; -} - #[derive(Debug)] pub enum MessageStreamItem { Item(ParseYield), @@ -92,3 +88,101 @@ pub enum MessageStreamItem { Empty, Done, } + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ParseLogSeverity { + Error, + Warn, + Info, +} + +pub trait ParseLogMsgError { + fn parse_lossy(self) -> String; + fn severity(&self) -> ParseLogSeverity; + fn error_msg(&self) -> String; +} + +impl ParseLogMsgError for Infallible { + fn parse_lossy(self) -> String { + panic!("Infallible can't be instantiated") + } + + fn severity(&self) -> ParseLogSeverity { + panic!("Infallible can't be instantiated") + } + + fn error_msg(&self) -> String { + panic!("Infallible can't be instantiated") + } +} + +#[derive(Debug, Clone)] +//TODO AAZ: Move all those structs to a new module. It's getting crowded here. +pub struct GeneralParseLogError { + content: String, + err_msg: String, + severity: ParseLogSeverity, +} + +impl GeneralParseLogError { + pub fn new(content: String, err_msg: String, severity: ParseLogSeverity) -> Self { + Self { + content, + err_msg, + severity, + } + } + + //TODO: Make sure this converting is enough. + pub fn from_parser_err(bytes: &[u8], severity: ParseLogSeverity, err: Error) -> Self { + let content = format!("{bytes:?}"); + let err_msg = match err { + Error::Parse(parse_err) => format!("Nested Parser Error: Parse Error: {parse_err}"), + Error::Incomplete => "Nested Parser Error: Incomplete".into(), + Error::Eof => "Nested Parser Error: Eof".into(), + }; + + Self { + content, + severity, + err_msg, + } + } +} + +impl ParseLogMsgError for GeneralParseLogError { + fn parse_lossy(self) -> String { + self.content + } + + fn severity(&self) -> ParseLogSeverity { + self.severity + } + + fn error_msg(&self) -> String { + self.err_msg.to_owned() + } +} + +pub trait LogMessage: Serialize { + type ParseError: ParseLogMsgError; + /// Serializes a message directly into a Writer + /// returns the size of the serialized message + fn to_writer(&self, writer: &mut W) -> Result; + + /// Tries to resolve the message to get its text representation, with in optional help from + /// [`ParseRestResolver`] for the parts which can't be parsed. + fn try_resolve( + &self, + // TODO: Remember the point of making the resolver optional, is to avoid infinite + // recursions in case of parsers calling each others + resolver: Option<&mut ParseRestResolver>, + ) -> Result; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// Gives Hint about how the payload rest can be resolved +pub enum ResolveParseHint { + /// The message needs to be parsed with SomeIP Parser. + SomeIP, +} diff --git a/application/apps/indexer/parsers/src/nested_parser.rs b/application/apps/indexer/parsers/src/nested_parser.rs new file mode 100644 index 0000000000..0a434dc819 --- /dev/null +++ b/application/apps/indexer/parsers/src/nested_parser.rs @@ -0,0 +1,90 @@ +use std::convert::Infallible; + +use crate::GeneralParseLogError; +use crate::ParseLogMsgError; +use crate::ParseLogSeverity; +use crate::Parser; +use crate::{someip::SomeipParser, LogMessage, ParseYield, ResolveParseHint}; + +#[derive(Default)] +pub struct ParseRestResolver { + someip_praser: Option, +} + +impl ParseRestResolver { + pub fn new() -> Self { + Self::default() + } + + /// Sets SomeIP parser on the resolver + pub fn with_someip_parser(&mut self, someip_praser: SomeipParser) -> &mut Self { + self.someip_praser = Some(someip_praser); + self + } + + /// Tries to resolve the given error returning the parsed string if succeeded. + pub fn try_resolve( + &mut self, + bytes: &[u8], + resolve_hint: ResolveParseHint, + ) -> Option> { + match resolve_hint { + ResolveParseHint::SomeIP => { + let parser = self.someip_praser.as_mut()?; + //TODO: Proper error handling for parser return + let p_yield = match parser.parse(bytes, None) { + Ok(res) => res.1?, + Err(err) => { + let err = GeneralParseLogError::from_parser_err( + bytes, + ParseLogSeverity::Error, + err, + ); + + return Some(Err(err)); + } + }; + match p_yield { + ParseYield::Message(item) => { + let res = match item.try_resolve(Some(self)) { + Ok(parsed) => parsed, + Err(err) => { + if cfg!(debug_assertions) { + ensure_infalliable(err); + } + panic!("Infallible Error can't be created") + } + }; + + Some(Ok(res.to_string())) + } + // Ignore other parse types for now + ParseYield::Attachment(_) | ParseYield::MessageAndAttachment(_) => { + let err = GeneralParseLogError::new( + format!("{bytes:?}"), + "Found attachment in nested payload".into(), + ParseLogSeverity::Error, + ); + Some(Err(err)) + } + } + } + } + } +} + +// Ensure the type of given argument is Infallible, raising a compile time error if not. +fn ensure_infalliable(_err: Infallible) {} + +/// Get the text message of [`LogMessage`], resolving its rest payloads if existed when possible, +/// TODO: Otherwise it should save the error to the faulty messages store, which need to be +/// implemented as well :) +pub fn resolve_log_msg(item: T, err_resolver: &mut ParseRestResolver) -> String { + match item.try_resolve(Some(err_resolver)) { + Ok(item) => item.to_string(), + Err(err) => { + //TODO: Add error to errors cache. + err.parse_lossy() + } + } +} diff --git a/application/apps/indexer/parsers/src/someip.rs b/application/apps/indexer/parsers/src/someip.rs index 1601017d78..a216452a14 100644 --- a/application/apps/indexer/parsers/src/someip.rs +++ b/application/apps/indexer/parsers/src/someip.rs @@ -1,5 +1,11 @@ use crate::{Error, LogMessage, ParseYield, Parser}; -use std::{borrow::Cow, fmt, fmt::Display, io::Write, path::PathBuf}; +use std::{ + borrow::Cow, + convert::Infallible, + fmt::{self, Display}, + io::Write, + path::PathBuf, +}; use someip_messages::*; use someip_payload::{ @@ -325,10 +331,19 @@ impl SomeipLogMessage { } impl LogMessage for SomeipLogMessage { + type ParseError = Infallible; + fn to_writer(&self, writer: &mut W) -> Result { writer.write_all(&self.bytes)?; Ok(self.bytes.len()) } + + fn try_resolve( + &self, + _resolver: Option<&mut crate::nested_parser::ParseRestResolver>, + ) -> Result { + Ok(self) + } } impl Display for SomeipLogMessage { diff --git a/application/apps/indexer/parsers/src/text.rs b/application/apps/indexer/parsers/src/text.rs index 6a09a52b9c..8022fbdce8 100644 --- a/application/apps/indexer/parsers/src/text.rs +++ b/application/apps/indexer/parsers/src/text.rs @@ -1,6 +1,6 @@ use crate::{Error, LogMessage, ParseYield, Parser}; use serde::Serialize; -use std::{fmt, io::Write}; +use std::{convert::Infallible, fmt, io::Write}; pub struct StringTokenizer {} @@ -16,11 +16,20 @@ impl fmt::Display for StringMessage { } impl LogMessage for StringMessage { + type ParseError = Infallible; + fn to_writer(&self, writer: &mut W) -> Result { let len = self.content.len(); writer.write_all(self.content.as_bytes())?; Ok(len) } + + fn try_resolve( + &self, + _resolver: Option<&mut crate::nested_parser::ParseRestResolver>, + ) -> Result { + Ok(self) + } } impl Parser for StringTokenizer diff --git a/application/apps/indexer/session/Cargo.toml b/application/apps/indexer/session/Cargo.toml index 803f87bd4c..8ae61602b4 100644 --- a/application/apps/indexer/session/Cargo.toml +++ b/application/apps/indexer/session/Cargo.toml @@ -8,7 +8,9 @@ edition = "2021" blake3 = "1.3" crossbeam-channel.workspace = true dirs.workspace = true -dlt-core.workspace = true +# dlt-core.workspace = true +# TODO https://github.com/esrlabs/dlt-core/pull/24 +dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "dlt_network_traces", features=["statistics"] } envvars = "0.1" file-tools = { path = "../addons/file-tools" } futures.workspace = true diff --git a/application/apps/indexer/session/src/handlers/observing/mod.rs b/application/apps/indexer/session/src/handlers/observing/mod.rs index 48d8b1a250..8a780b79c1 100644 --- a/application/apps/indexer/session/src/handlers/observing/mod.rs +++ b/application/apps/indexer/session/src/handlers/observing/mod.rs @@ -8,6 +8,7 @@ use crate::{ use log::trace; use parsers::{ dlt::{fmt::FormatOptions, DltParser}, + nested_parser::{resolve_log_msg, ParseRestResolver}, someip::SomeipParser, text::StringTokenizer, LogMessage, MessageStreamItem, ParseYield, Parser, @@ -77,6 +78,7 @@ async fn run_source_intern( rx_sde: Option, rx_tail: Option>>, ) -> OperationResult<()> { + let mut parse_rest_resolver = ParseRestResolver::new(); match parser { ParserType::SomeIp(settings) => { let someip_parser = match &settings.fibex_file_paths { @@ -86,11 +88,27 @@ async fn run_source_intern( None => SomeipParser::new(), }; let producer = MessageProducer::new(someip_parser, source, rx_sde); - run_producer(operation_api, state, source_id, producer, rx_tail).await + run_producer( + operation_api, + state, + source_id, + producer, + rx_tail, + &mut parse_rest_resolver, + ) + .await } ParserType::Text => { let producer = MessageProducer::new(StringTokenizer {}, source, rx_sde); - run_producer(operation_api, state, source_id, producer, rx_tail).await + run_producer( + operation_api, + state, + source_id, + producer, + rx_tail, + &mut parse_rest_resolver, + ) + .await } ParserType::Dlt(settings) => { let fmt_options = Some(FormatOptions::from(settings.tz.as_ref())); @@ -101,7 +119,24 @@ async fn run_source_intern( settings.with_storage_header, ); let producer = MessageProducer::new(dlt_parser, source, rx_sde); - run_producer(operation_api, state, source_id, producer, rx_tail).await + + let someip_parse = match &settings.fibex_file_paths { + Some(paths) => { + SomeipParser::from_fibex_files(paths.iter().map(PathBuf::from).collect()) + } + None => SomeipParser::new(), + }; + parse_rest_resolver.with_someip_parser(someip_parse); + + run_producer( + operation_api, + state, + source_id, + producer, + rx_tail, + &mut parse_rest_resolver, + ) + .await } } } @@ -112,6 +147,7 @@ async fn run_producer, S: ByteSource>( source_id: u16, mut producer: MessageProducer, mut rx_tail: Option>>, + parse_rest_resolver: &mut ParseRestResolver, ) -> OperationResult<()> { use log::debug; state.set_session_file(None).await?; @@ -139,16 +175,18 @@ async fn run_producer, S: ByteSource>( Next::Item(item) => { match item { MessageStreamItem::Item(ParseYield::Message(item)) => { + let msg = resolve_log_msg(item, parse_rest_resolver); state - .write_session_file(source_id, format!("{item}\n")) + .write_session_file(source_id, format!("{msg}\n")) .await?; } MessageStreamItem::Item(ParseYield::MessageAndAttachment(( item, attachment, ))) => { + let msg = resolve_log_msg(item, parse_rest_resolver); state - .write_session_file(source_id, format!("{item}\n")) + .write_session_file(source_id, format!("{msg}\n")) .await?; state.add_attachment(attachment)?; } diff --git a/application/apps/rustcore/rs-bindings/Cargo.lock b/application/apps/rustcore/rs-bindings/Cargo.lock index 56cf3e3c13..a7559efd54 100644 --- a/application/apps/rustcore/rs-bindings/Cargo.lock +++ b/application/apps/rustcore/rs-bindings/Cargo.lock @@ -698,9 +698,8 @@ dependencies = [ [[package]] name = "dlt-core" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6fd69a328826e883613fbbcf468a33a53df1198c3e39bae7ad079c449431bfd" +version = "0.16.0" +source = "git+https://github.com/kruss/dlt-core.git?branch=dlt_network_traces#525f591862437ca15c56639143f205a956d01b6a" dependencies = [ "buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder", @@ -2026,9 +2025,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.22.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b" +checksum = "11bafc859c6815fbaffbbbf4229ecb767ac913fecb27f9ad4343662e9ef099ea" dependencies = [ "memchr", ] @@ -2476,12 +2475,12 @@ dependencies = [ [[package]] name = "someip-payload" -version = "0.1.1" -source = "git+https://github.com/esrlabs/someip-payload#9a58a561a3d284e37a25ea2dc52188c70694682d" +version = "0.1.3" +source = "git+https://github.com/kruss/someip-payload.git?branch=robustness#ccd100907b38f60d9e1ac657250111b2e0a69518" dependencies = [ "byteorder", "log", - "quick-xml 0.22.0", + "quick-xml 0.23.1", "regex", "thiserror", "ux", From 1b3abb0c16ad0cbfadeba84ea50c8a4d7e007b1a Mon Sep 17 00:00:00 2001 From: Ammar Abou Zor Date: Mon, 9 Sep 2024 12:42:18 +0200 Subject: [PATCH 2/2] Nested Parsers: Separate parsing in sessions and CLI * Sessions needs to keep track on errors but indexer CLI doesn't need that for now. * Each part should responsible of how errors for parsing log messages should be handled. * Added reminder for handling resolver in indexer CLI. --- .../indexer/indexer_cli/src/interactive.rs | 18 +++++++++++++----- .../apps/indexer/parsers/src/nested_parser.rs | 13 ------------- .../session/src/handlers/observing/mod.rs | 17 +++++++++++++++-- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/application/apps/indexer/indexer_cli/src/interactive.rs b/application/apps/indexer/indexer_cli/src/interactive.rs index 32a8e5a41b..e1dbea9165 100644 --- a/application/apps/indexer/indexer_cli/src/interactive.rs +++ b/application/apps/indexer/indexer_cli/src/interactive.rs @@ -1,9 +1,8 @@ use crate::{duration_report, Instant}; use futures::{pin_mut, stream::StreamExt}; use parsers::{ - dlt::DltParser, - nested_parser::{resolve_log_msg, ParseRestResolver}, - MessageStreamItem, ParseYield, + dlt::DltParser, nested_parser::ParseRestResolver, LogMessage, MessageStreamItem, + ParseLogMsgError, ParseYield, }; use processor::grabber::LineRange; use rustyline::{error::ReadlineError, DefaultEditor}; @@ -50,6 +49,7 @@ pub(crate) async fn handle_interactive_session(input: Option) { let udp_source = UdpSource::new(RECEIVER, vec![]).await.unwrap(); let dlt_parser = DltParser::new(None, None, None, false); let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, None); + //TODO AAZ: Make sure we need to provide the resolver in indexer CLI. let mut parse_reslover = ParseRestResolver::new(); let msg_stream = dlt_msg_producer.as_stream(); pin_mut!(msg_stream); @@ -62,11 +62,11 @@ pub(crate) async fn handle_interactive_session(input: Option) { item = msg_stream.next() => { match item { Some((_, MessageStreamItem::Item(ParseYield::Message(item)))) => { - let msg = resolve_log_msg(item, &mut parse_reslover); + let msg = parse_log_msg_lossy(item, &mut parse_reslover); println!("msg: {msg}"); } Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((item, attachment))))) => { - let msg = resolve_log_msg(item, &mut parse_reslover); + let msg = parse_log_msg_lossy(item, &mut parse_reslover); println!("msg: {msg}, attachment: {attachment:?}"); } Some((_, MessageStreamItem::Item(ParseYield::Attachment(attachment)))) => { @@ -201,3 +201,11 @@ async fn collect_user_input(tx: mpsc::UnboundedSender) -> JoinHandle<() println!("done with readline loop"); }) } + +/// Parse log messages without registering errors and calling [`ParseLogMsgError::parse_lossy()`] on errors +pub fn parse_log_msg_lossy(item: T, err_resolver: &mut ParseRestResolver) -> String { + match item.try_resolve(Some(err_resolver)) { + Ok(item) => item.to_string(), + Err(err) => err.parse_lossy(), + } +} diff --git a/application/apps/indexer/parsers/src/nested_parser.rs b/application/apps/indexer/parsers/src/nested_parser.rs index 0a434dc819..3327ed56c3 100644 --- a/application/apps/indexer/parsers/src/nested_parser.rs +++ b/application/apps/indexer/parsers/src/nested_parser.rs @@ -75,16 +75,3 @@ impl ParseRestResolver { // Ensure the type of given argument is Infallible, raising a compile time error if not. fn ensure_infalliable(_err: Infallible) {} - -/// Get the text message of [`LogMessage`], resolving its rest payloads if existed when possible, -/// TODO: Otherwise it should save the error to the faulty messages store, which need to be -/// implemented as well :) -pub fn resolve_log_msg(item: T, err_resolver: &mut ParseRestResolver) -> String { - match item.try_resolve(Some(err_resolver)) { - Ok(item) => item.to_string(), - Err(err) => { - //TODO: Add error to errors cache. - err.parse_lossy() - } - } -} diff --git a/application/apps/indexer/session/src/handlers/observing/mod.rs b/application/apps/indexer/session/src/handlers/observing/mod.rs index 8a780b79c1..b452f59bc1 100644 --- a/application/apps/indexer/session/src/handlers/observing/mod.rs +++ b/application/apps/indexer/session/src/handlers/observing/mod.rs @@ -8,10 +8,10 @@ use crate::{ use log::trace; use parsers::{ dlt::{fmt::FormatOptions, DltParser}, - nested_parser::{resolve_log_msg, ParseRestResolver}, + nested_parser::ParseRestResolver, someip::SomeipParser, text::StringTokenizer, - LogMessage, MessageStreamItem, ParseYield, Parser, + LogMessage, MessageStreamItem, ParseLogMsgError, ParseYield, Parser, }; use sources::{ factory::ParserType, @@ -240,3 +240,16 @@ async fn run_producer, S: ByteSource>( debug!("listen done"); Ok(None) } + +/// Get the text message of [`LogMessage`], resolving its rest payloads if existed when possible, +/// TODO: Otherwise it should save the error to the faulty messages store, which need to be +/// implemented as well :) +pub fn resolve_log_msg(item: T, err_resolver: &mut ParseRestResolver) -> String { + match item.try_resolve(Some(err_resolver)) { + Ok(item) => item.to_string(), + Err(err) => { + //TODO: Add error to errors cache. + err.parse_lossy() + } + } +}