From 91e973844dcb202e4a795258942efa108ec4fb2f Mon Sep 17 00:00:00 2001
From: DmitryAstafyev <dvastafyev@gmail.com>
Date: Tue, 27 Aug 2024 13:12:47 +0200
Subject: [PATCH] Support nested parsers on Producer scope

---
 application/apps/indexer/Cargo.lock           |  15 ++-
 application/apps/indexer/Cargo.toml           |   4 +-
 application/apps/indexer/parsers/Cargo.toml   |   4 +-
 .../apps/indexer/parsers/src/dlt/fmt.rs       | 102 ++++++++++++++++++
 .../apps/indexer/parsers/src/dlt/mod.rs       |  37 +++++--
 application/apps/indexer/parsers/src/lib.rs   |  14 +++
 application/apps/indexer/session/Cargo.toml   |   4 +-
 .../session/src/handlers/observing/mod.rs     |  45 +++++++-
 .../apps/rustcore/rs-bindings/Cargo.lock      |  15 ++-
 9 files changed, 210 insertions(+), 30 deletions(-)

diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock
index e7cbfaebfa..453a5d379e 100644
--- a/application/apps/indexer/Cargo.lock
+++ b/application/apps/indexer/Cargo.lock
@@ -650,9 +650,8 @@ dependencies = [
 
 [[package]]
 name = "dlt-core"
-version = "0.14.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f6fd69a328826e883613fbbcf468a33a53df1198c3e39bae7ad079c449431bfd"
+version = "0.16.0"
+source = "git+https://github.com/kruss/dlt-core.git?branch=dlt_network_traces#525f591862437ca15c56639143f205a956d01b6a"
 dependencies = [
  "buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "byteorder",
@@ -1686,9 +1685,9 @@ dependencies = [
 
 [[package]]
 name = "quick-xml"
-version = "0.22.0"
+version = "0.23.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b"
+checksum = "11bafc859c6815fbaffbbbf4229ecb767ac913fecb27f9ad4343662e9ef099ea"
 dependencies = [
  "memchr",
 ]
@@ -2085,12 +2084,12 @@ dependencies = [
 
 [[package]]
 name = "someip-payload"
-version = "0.1.1"
-source = "git+https://github.com/esrlabs/someip-payload#9a58a561a3d284e37a25ea2dc52188c70694682d"
+version = "0.1.3"
+source = "git+https://github.com/kruss/someip-payload.git?branch=robustness#ccd100907b38f60d9e1ac657250111b2e0a69518"
 dependencies = [
  "byteorder",
  "log",
- "quick-xml 0.22.0",
+ "quick-xml 0.23.1",
  "regex",
  "thiserror",
  "ux",
diff --git a/application/apps/indexer/Cargo.toml b/application/apps/indexer/Cargo.toml
index ea5581935d..3998b4b67a 100644
--- a/application/apps/indexer/Cargo.toml
+++ b/application/apps/indexer/Cargo.toml
@@ -22,7 +22,9 @@ thiserror = "1.0"
 lazy_static = "1.4"
 tokio = { version = "1", features = ["full"] }
 tokio-stream = "0.1"
-dlt-core = "0.14"
+# dlt-core = "0.14"
+# TODO https://github.com/esrlabs/dlt-core/pull/24
+dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "dlt_network_traces" }
 crossbeam-channel = "0.5"
 futures = "0.3"
 tokio-util = "0.7"
diff --git a/application/apps/indexer/parsers/Cargo.toml b/application/apps/indexer/parsers/Cargo.toml
index c2ac11d3ac..8645323e88 100644
--- a/application/apps/indexer/parsers/Cargo.toml
+++ b/application/apps/indexer/parsers/Cargo.toml
@@ -18,7 +18,9 @@ rand.workspace = true
 # someip-messages = { path = "../../../../../someip"}
 someip-messages = { git = "https://github.com/esrlabs/someip" }
 # someip-payload = { path = "../../../../../someip-payload" }
-someip-payload = { git = "https://github.com/esrlabs/someip-payload" }
+# someip-payload = { git = "https://github.com/esrlabs/someip-payload" }
+# TODO
+someip-payload = { git = "https://github.com/kruss/someip-payload.git", branch = "robustness" }
 
 [dev-dependencies]
 stringreader = "0.1.1"
diff --git a/application/apps/indexer/parsers/src/dlt/fmt.rs b/application/apps/indexer/parsers/src/dlt/fmt.rs
index 52941eee2b..656426f532 100644
--- a/application/apps/indexer/parsers/src/dlt/fmt.rs
+++ b/application/apps/indexer/parsers/src/dlt/fmt.rs
@@ -28,6 +28,7 @@ use log::trace;
 use serde::ser::{Serialize, SerializeStruct, Serializer};
 
 use std::{
+    collections::HashMap,
     fmt::{self, Formatter},
     str,
 };
@@ -193,11 +194,47 @@ impl From<Option<&String>> for FormatOptions {
     }
 }
 
+#[derive(Hash, Clone, Copy, PartialEq, Eq)]
+pub enum UnresolvedDataType {
+    SomeIpPayload,
+}
+
+pub struct UnresolvedData {
+    data: Vec<UnresolvedDataType>,
+    requested: Option<UnresolvedDataType>,
+}
+
+impl UnresolvedData {
+    pub fn next_unresolved(&mut self) -> Option<UnresolvedDataType> {
+        if self.data.is_empty() {
+            self.requested = None;
+            None
+        } else {
+            let requested = self.data.remove(0);
+            self.requested = Some(requested);
+            Some(requested)
+        }
+    }
+    pub fn requested(&self) -> Option<UnresolvedDataType> {
+        self.requested
+    }
+}
+
+impl Default for UnresolvedData {
+    fn default() -> Self {
+        Self {
+            data: vec![UnresolvedDataType::SomeIpPayload],
+            requested: None,
+        }
+    }
+}
 /// A dlt message that can be formatted with optional FIBEX data support
 pub struct FormattableMessage<'a> {
     pub message: Message,
     pub fibex_metadata: Option<&'a FibexMetadata>,
     pub options: Option<&'a FormatOptions>,
+    pub unresolved: UnresolvedData,
+    pub resolved: HashMap<UnresolvedDataType, Result<String, String>>,
 }
 
 impl<'a> Serialize for FormattableMessage<'a> {
@@ -281,6 +318,17 @@ impl<'a> Serialize for FormattableMessage<'a> {
                     None => state.serialize_field("payload", "[Unknown CtrlCommand]")?,
                 }
             }
+            PayloadContent::NetworkTrace(slices) => {
+                state.serialize_field("app-id", &ext_header_app_id)?;
+                state.serialize_field("context-id", &ext_header_context_id)?;
+                state.serialize_field("message-type", &ext_header_msg_type)?;
+                let arg_string = slices
+                    .iter()
+                    .map(|slice| format!("{:02X?}", slice))
+                    .collect::<Vec<String>>()
+                    .join("|");
+                state.serialize_field("payload", &arg_string)?;
+            }
         }
         state.end()
     }
@@ -292,6 +340,8 @@ impl<'a> From<Message> for FormattableMessage<'a> {
             message,
             fibex_metadata: None,
             options: None,
+            unresolved: UnresolvedData::default(),
+            resolved: HashMap::new(),
         }
     }
 }
@@ -320,6 +370,27 @@ impl<'a> PrintableMessage<'a> {
 }
 
 impl<'a> FormattableMessage<'a> {
+    pub fn get_someip_payload(&self) -> Option<&[u8]> {
+        if let PayloadContent::NetworkTrace(slices) = &self.message.payload {
+            if self
+                .message
+                .extended_header
+                .as_ref()
+                .is_some_and(|ext_header| {
+                    matches!(
+                        ext_header.message_type,
+                        MessageType::NetworkTrace(NetworkTraceType::Ipc)
+                            | MessageType::NetworkTrace(NetworkTraceType::Someip)
+                    )
+                })
+            {
+                if let Some(slice) = slices.get(1) {
+                    return Some(slice);
+                }
+            }
+        }
+        None
+    }
     pub fn printable_parts<'b>(
         &'b self,
         ext_h_app_id: &'b str,
@@ -386,6 +457,19 @@ impl<'a> FormattableMessage<'a> {
                     payload_string,
                 ))
             }
+            PayloadContent::NetworkTrace(slices) => {
+                let payload_string = slices
+                    .iter()
+                    .map(|slice| format!("{:02X?}", slice))
+                    .collect::<Vec<String>>()
+                    .join("|");
+                Ok(PrintableMessage::new(
+                    ext_h_app_id,
+                    eh_ctx_id,
+                    ext_h_msg_type,
+                    payload_string,
+                ))
+            }
         }
     }
 
@@ -559,6 +643,24 @@ impl<'a> fmt::Display for FormattableMessage<'a> {
                     None => write!(f, "[Unknown CtrlCommand]"),
                 }
             }
+            PayloadContent::NetworkTrace(slices) => {
+                self.write_app_id_context_id_and_message_type(f)?;
+                if let Some(resolved) = self.resolved.get(&UnresolvedDataType::SomeIpPayload) {
+                    match resolved {
+                        Ok(msg) => write!(f, "SOME/IP: {}", msg.replace("", " | ")),
+                        Err(err) => {
+                            write!(f, "SOME/IP PARSING ERROR: {err}; ORIGIN: ")?;
+                            slices.iter().try_for_each(|slice| {
+                                write!(f, "{}{:02X?}", DLT_ARGUMENT_SENTINAL, slice)
+                            })
+                        }
+                    }
+                } else {
+                    slices
+                        .iter()
+                        .try_for_each(|slice| write!(f, "{}{:02X?}", DLT_ARGUMENT_SENTINAL, slice))
+                }
+            }
         }
     }
 }
diff --git a/application/apps/indexer/parsers/src/dlt/mod.rs b/application/apps/indexer/parsers/src/dlt/mod.rs
index dae82f4ce4..613b95aa1a 100644
--- a/application/apps/indexer/parsers/src/dlt/mod.rs
+++ b/application/apps/indexer/parsers/src/dlt/mod.rs
@@ -1,19 +1,22 @@
 pub mod attachment;
 pub mod fmt;
 
-use crate::{dlt::fmt::FormattableMessage, Error, LogMessage, ParseYield, Parser};
+use crate::{
+    dlt::fmt::FormattableMessage, Error, LogMessage, ParseYield, Parser, ParserInstanceAlias,
+};
 use byteorder::{BigEndian, WriteBytesExt};
+use dlt_core::{
+    dlt,
+    parse::{dlt_consume_msg, dlt_message},
+};
 pub use dlt_core::{
     dlt::LogLevel,
     fibex::{gather_fibex_data, FibexConfig, FibexMetadata},
     filtering::{DltFilterConfig, ProcessedDltFilterConfig},
 };
-use dlt_core::{
-    dlt::{self},
-    parse::{dlt_consume_msg, dlt_message},
-};
+use fmt::{UnresolvedData, UnresolvedDataType};
 use serde::Serialize;
-use std::{io::Write, ops::Range};
+use std::{collections::HashMap, io::Write, ops::Range};
 
 use self::{attachment::FtScanner, fmt::FormatOptions};
 
@@ -24,6 +27,26 @@ impl LogMessage for FormattableMessage<'_> {
         writer.write_all(&bytes)?;
         Ok(len)
     }
+    fn next_unresolved(&mut self) -> Option<(ParserInstanceAlias, &[u8])> {
+        if let Some(next) = self.unresolved.next_unresolved() {
+            match next {
+                UnresolvedDataType::SomeIpPayload => {
+                    if let Some(payload) = self.get_someip_payload() {
+                        return Some((ParserInstanceAlias::SomeIp, payload));
+                    }
+                }
+            }
+            None
+        } else {
+            None
+        }
+    }
+    fn resolve(&mut self, result: Option<Result<String, String>>) {
+        let (Some(reqested), Some(parsed)) = (self.unresolved.requested(), result) else {
+            return;
+        };
+        self.resolved.insert(reqested, parsed);
+    }
 }
 
 #[derive(Debug, Serialize)]
@@ -143,6 +166,8 @@ impl<'m> Parser<FormattableMessage<'m>> for DltParser<'m> {
                     message: msg_with_storage_header,
                     fibex_metadata: self.fibex_metadata,
                     options: self.fmt_options,
+                    unresolved: UnresolvedData::default(),
+                    resolved: HashMap::new(),
                 };
                 self.offset += input.len() - rest.len();
                 Ok((
diff --git a/application/apps/indexer/parsers/src/lib.rs b/application/apps/indexer/parsers/src/lib.rs
index 1b7fce8efe..c53af15f82 100644
--- a/application/apps/indexer/parsers/src/lib.rs
+++ b/application/apps/indexer/parsers/src/lib.rs
@@ -8,6 +8,16 @@ use thiserror::Error;
 
 extern crate log;
 
+#[derive(Debug, Clone, Copy)]
+pub enum ParserInstanceAlias {
+    SomeIp,
+}
+
+#[allow(clippy::large_enum_variant)]
+pub enum ParserInstance {
+    SomeIp(someip::SomeipParser),
+}
+
 #[derive(Error, Debug)]
 pub enum Error {
     #[error("Parse error: {0}")]
@@ -82,6 +92,10 @@ pub trait LogMessage: Display + Serialize {
     /// Serializes a message directly into a Writer
     /// returns the size of the serialized message
     fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error>;
+    fn next_unresolved(&mut self) -> Option<(ParserInstanceAlias, &[u8])> {
+        None
+    }
+    fn resolve(&mut self, _result: Option<Result<String, String>>) {}
 }
 
 #[derive(Debug)]
diff --git a/application/apps/indexer/session/Cargo.toml b/application/apps/indexer/session/Cargo.toml
index 803f87bd4c..8ae61602b4 100644
--- a/application/apps/indexer/session/Cargo.toml
+++ b/application/apps/indexer/session/Cargo.toml
@@ -8,7 +8,9 @@ edition = "2021"
 blake3 = "1.3"
 crossbeam-channel.workspace = true
 dirs.workspace = true
-dlt-core.workspace = true
+# dlt-core.workspace = true
+# TODO https://github.com/esrlabs/dlt-core/pull/24
+dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "dlt_network_traces", features=["statistics"] }
 envvars = "0.1"
 file-tools = { path = "../addons/file-tools" }
 futures.workspace = true
diff --git a/application/apps/indexer/session/src/handlers/observing/mod.rs b/application/apps/indexer/session/src/handlers/observing/mod.rs
index 48d8b1a250..02743e072c 100644
--- a/application/apps/indexer/session/src/handlers/observing/mod.rs
+++ b/application/apps/indexer/session/src/handlers/observing/mod.rs
@@ -10,7 +10,7 @@ use parsers::{
     dlt::{fmt::FormatOptions, DltParser},
     someip::SomeipParser,
     text::StringTokenizer,
-    LogMessage, MessageStreamItem, ParseYield, Parser,
+    LogMessage, MessageStreamItem, ParseYield, Parser, ParserInstance, ParserInstanceAlias,
 };
 use sources::{
     factory::ParserType,
@@ -77,6 +77,7 @@ async fn run_source_intern<S: ByteSource>(
     rx_sde: Option<SdeReceiver>,
     rx_tail: Option<Receiver<Result<(), tail::Error>>>,
 ) -> OperationResult<()> {
+    let mut nested: Vec<ParserInstance> = Vec::new();
     match parser {
         ParserType::SomeIp(settings) => {
             let someip_parser = match &settings.fibex_file_paths {
@@ -86,13 +87,19 @@ async fn run_source_intern<S: ByteSource>(
                 None => SomeipParser::new(),
             };
             let producer = MessageProducer::new(someip_parser, source, rx_sde);
-            run_producer(operation_api, state, source_id, producer, rx_tail).await
+            run_producer(operation_api, state, source_id, nested, producer, rx_tail).await
         }
         ParserType::Text => {
             let producer = MessageProducer::new(StringTokenizer {}, source, rx_sde);
-            run_producer(operation_api, state, source_id, producer, rx_tail).await
+            run_producer(operation_api, state, source_id, nested, producer, rx_tail).await
         }
         ParserType::Dlt(settings) => {
+            nested.push(ParserInstance::SomeIp(match &settings.fibex_file_paths {
+                Some(paths) => {
+                    SomeipParser::from_fibex_files(paths.iter().map(PathBuf::from).collect())
+                }
+                None => SomeipParser::new(),
+            }));
             let fmt_options = Some(FormatOptions::from(settings.tz.as_ref()));
             let dlt_parser = DltParser::new(
                 settings.filter_config.as_ref().map(|f| f.into()),
@@ -101,7 +108,7 @@ async fn run_source_intern<S: ByteSource>(
                 settings.with_storage_header,
             );
             let producer = MessageProducer::new(dlt_parser, source, rx_sde);
-            run_producer(operation_api, state, source_id, producer, rx_tail).await
+            run_producer(operation_api, state, source_id, nested, producer, rx_tail).await
         }
     }
 }
@@ -110,6 +117,7 @@ async fn run_producer<T: LogMessage, P: Parser<T>, S: ByteSource>(
     operation_api: OperationAPI,
     state: SessionStateAPI,
     source_id: u16,
+    mut nested: Vec<ParserInstance>,
     mut producer: MessageProducer<T, P, S>,
     mut rx_tail: Option<Receiver<Result<(), tail::Error>>>,
 ) -> OperationResult<()> {
@@ -138,7 +146,34 @@ async fn run_producer<T: LogMessage, P: Parser<T>, S: ByteSource>(
         match next {
             Next::Item(item) => {
                 match item {
-                    MessageStreamItem::Item(ParseYield::Message(item)) => {
+                    MessageStreamItem::Item(ParseYield::Message(mut item)) => {
+                        while let Some((required, data)) = item.next_unresolved() {
+                            match required {
+                                ParserInstanceAlias::SomeIp => {
+                                    if let Some(ParserInstance::SomeIp(parser)) = nested
+                                        .iter_mut()
+                                        .find(|p| matches!(p, ParserInstance::SomeIp(..)))
+                                    {
+                                        match parser.parse(data, None) {
+                                            Ok((_, Some(ParseYield::Message(resolved)))) => {
+                                                item.resolve(Some(Ok(resolved.to_string())));
+                                                continue;
+                                            }
+                                            Err(e) => {
+                                                item.resolve(Some(Err(e.to_string())));
+                                                continue;
+                                            }
+                                            _ => {
+                                                item.resolve(Some(Err(String::from(
+                                                    "Fail to extract printable message",
+                                                ))));
+                                            }
+                                        }
+                                    }
+                                }
+                            };
+                            item.resolve(None);
+                        }
                         state
                             .write_session_file(source_id, format!("{item}\n"))
                             .await?;
diff --git a/application/apps/rustcore/rs-bindings/Cargo.lock b/application/apps/rustcore/rs-bindings/Cargo.lock
index 56cf3e3c13..a7559efd54 100644
--- a/application/apps/rustcore/rs-bindings/Cargo.lock
+++ b/application/apps/rustcore/rs-bindings/Cargo.lock
@@ -698,9 +698,8 @@ dependencies = [
 
 [[package]]
 name = "dlt-core"
-version = "0.14.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f6fd69a328826e883613fbbcf468a33a53df1198c3e39bae7ad079c449431bfd"
+version = "0.16.0"
+source = "git+https://github.com/kruss/dlt-core.git?branch=dlt_network_traces#525f591862437ca15c56639143f205a956d01b6a"
 dependencies = [
  "buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "byteorder",
@@ -2026,9 +2025,9 @@ dependencies = [
 
 [[package]]
 name = "quick-xml"
-version = "0.22.0"
+version = "0.23.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b"
+checksum = "11bafc859c6815fbaffbbbf4229ecb767ac913fecb27f9ad4343662e9ef099ea"
 dependencies = [
  "memchr",
 ]
@@ -2476,12 +2475,12 @@ dependencies = [
 
 [[package]]
 name = "someip-payload"
-version = "0.1.1"
-source = "git+https://github.com/esrlabs/someip-payload#9a58a561a3d284e37a25ea2dc52188c70694682d"
+version = "0.1.3"
+source = "git+https://github.com/kruss/someip-payload.git?branch=robustness#ccd100907b38f60d9e1ac657250111b2e0a69518"
 dependencies = [
  "byteorder",
  "log",
- "quick-xml 0.22.0",
+ "quick-xml 0.23.1",
  "regex",
  "thiserror",
  "ux",