diff --git a/Cargo.lock b/Cargo.lock index 7e6b697..9286af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,7 +19,6 @@ dependencies = [ "proxy_client", "regex", "serde", - "table_log", "thiserror", "tokio", "tokio-util", @@ -196,7 +195,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -385,7 +384,7 @@ checksum = "4da9a32f3fed317401fa3c862968128267c3106685286e15d5aaa3d7389c2f60" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -501,7 +500,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -539,6 +538,7 @@ dependencies = [ "bytes", "bytesize", "duplicate", + "file_rotating_log", "futures-core", "hdv", "hdv_derive", @@ -556,7 +556,6 @@ dependencies = [ "slotmap", "strict-num", "swap", - "table_log", "thiserror", "tokio", "tokio-io-timeout", @@ -666,18 +665,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "csv_logger" -version = "0.1.0" -source = "git+https://github.com/Banyc/csv_logger.git?tag=v0.0.1#5483cb14f464bdb08c57be36c9e5d69dfbc67c3a" -dependencies = [ - "csv", - "erased-serde", - "serde", - "table_log", - "tempfile", -] - [[package]] name = "dfsql" version = "0.10.1" @@ -741,7 +728,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -750,26 +737,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "erased-serde" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" -dependencies = [ - "serde", - "typeid", -] - -[[package]] -name = "errno" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "ethnum" version = "1.5.0" @@ -789,10 +756,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c" [[package]] -name = "fastrand" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +name = "file_rotating_log" +version = "0.1.0" +source = "git+https://github.com/Banyc/file_rotating_log.git?tag=v0.0.1#80532285c8812885abd2ac5d078afe38be941fbb" [[package]] name = "file_watcher_tokio" @@ -910,7 +876,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -1039,7 +1005,7 @@ dependencies = [ "hdv", "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -1302,12 +1268,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "linux-raw-sys" -version = "0.4.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" - [[package]] name = "lock_api" version = "0.4.12" @@ -1733,7 +1693,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -2359,7 +2319,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -2498,19 +2458,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustix" -version = "0.38.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" -dependencies = [ - "bitflags 2.5.0", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.52.0", -] - [[package]] name = "rustversion" version = "1.0.17" @@ -2576,7 +2523,7 @@ checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -2643,7 +2590,7 @@ checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -2655,7 +2602,6 @@ dependencies = [ "axum", "clap", "common", - "csv_logger", "dhat", "file_watcher_tokio", "metrics", @@ -2857,7 +2803,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -2881,9 +2827,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.65" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2863d96a84c6439701d7a38f9de935ec562c8832cc55d1dde0f513b52fad106" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", @@ -2916,16 +2862,6 @@ dependencies = [ "windows", ] -[[package]] -name = "table_log" -version = "0.1.0" -source = "git+https://github.com/Banyc/table_log.git?tag=v0.0.1#fc49af71a17257e03583d93114546065e8f2f470" -dependencies = [ - "erased-serde", - "once_cell", - "serde", -] - [[package]] name = "tap" version = "1.0.1" @@ -2938,18 +2874,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5" -[[package]] -name = "tempfile" -version = "3.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" -dependencies = [ - "cfg-if", - "fastrand", - "rustix", - "windows-sys 0.52.0", -] - [[package]] name = "tests" version = "0.1.0" @@ -2984,7 +2908,7 @@ checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -3040,7 +2964,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -3191,7 +3115,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] @@ -3239,12 +3163,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "typeid" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "059d83cc991e7a42fc37bd50941885db0888e34209f8cfd9aab07ddec03bc9cf" - [[package]] name = "udp_listener" version = "0.1.0" @@ -3355,7 +3273,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", "wasm-bindgen-shared", ] @@ -3377,7 +3295,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3619,7 +3537,7 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.65", + "syn 2.0.66", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 991dcad..a001d5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,9 +20,9 @@ bincode = "1" bytes = "1" bytesize = "1" clap = "4" -csv_logger = { git = "https://github.com/Banyc/csv_logger.git", tag = "v0.0.1" } dhat = "0.3" duplicate = "1" +file_rotating_log = { git = "https://github.com/Banyc/file_rotating_log.git", tag = "v0.0.1" } file_watcher_tokio = "0.3" futures = "0.3" futures-core = "0.3" @@ -50,7 +50,6 @@ serial_test = "3" slotmap = "1" strict-num = "0.2" swap = { git = "https://github.com/Banyc/swap.git", rev = "d10a8b5b10503fa6ebac523cfcaa4d62135a665f" } -table_log = { git = "https://github.com/Banyc/table_log.git", tag = "v0.0.1" } thiserror = "1" tokio = "1" tokio-io-timeout = "1" diff --git a/access_server/Cargo.toml b/access_server/Cargo.toml index ebf70f1..1a28613 100644 --- a/access_server/Cargo.toml +++ b/access_server/Cargo.toml @@ -18,7 +18,6 @@ protocol = { path = "../protocol" } proxy_client = { path = "../proxy_client" } regex = { workspace = true } serde = { workspace = true, features = ["derive"] } -table_log = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio_chacha20 = { workspace = true } diff --git a/access_server/src/stream/streams/http_tunnel/mod.rs b/access_server/src/stream/streams/http_tunnel/mod.rs index 41df019..2e69da4 100644 --- a/access_server/src/stream/streams/http_tunnel/mod.rs +++ b/access_server/src/stream/streams/http_tunnel/mod.rs @@ -6,11 +6,12 @@ use common::{ addr::{InternetAddr, ParseInternetAddrError}, config::SharableConfig, loading, + log::Timing, proxy_table::{ProxyAction, ProxyTableBuildError}, stream::{ addr::StreamAddr, io_copy::{CopyBidirectional, LogContext, DEAD_SESSION_RETENTION_DURATION}, - log::{SimplifiedStreamLog, SimplifiedStreamProxyLog, StreamRecord}, + log::{SimplifiedStreamLog, SimplifiedStreamProxyLog, LOGGER}, metrics::{Session, StreamSessionTable}, IoAddr, IoStream, StreamServerHook, }, @@ -236,19 +237,22 @@ impl HttpAccess { }; let end = std::time::Instant::now(); - let metrics = SimplifiedStreamProxyLog { + let timing = Timing { start, end }; + let log = SimplifiedStreamProxyLog { stream: SimplifiedStreamLog { - start, - end, + timing, upstream_addr: upstream.addr, upstream_sock_addr: upstream.sock_addr, downstream_addr: None, }, destination: addr.address, }; - info!(%metrics, "{} finished", method); + info!(%log, "{} finished", method); - table_log::log!(&StreamRecord::SimplifiedProxyLog(&metrics)); + let record = (&log).into(); + if let Some(x) = LOGGER.lock().unwrap().as_ref() { + x.write(&record); + } res } diff --git a/common/Cargo.toml b/common/Cargo.toml index 685cfcf..67b6b97 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -13,6 +13,7 @@ bincode = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } duplicate = { workspace = true } +file_rotating_log = { workspace = true } futures-core = { workspace = true } hdv = { workspace = true } hdv_derive = { workspace = true } @@ -29,7 +30,6 @@ serde = { workspace = true, features = ["derive", "rc"] } slotmap = { workspace = true } strict-num = { workspace = true } swap = { workspace = true } -table_log = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-io-timeout = { workspace = true } diff --git a/common/src/addr.rs b/common/src/addr.rs index ce192aa..7b14f10 100644 --- a/common/src/addr.rs +++ b/common/src/addr.rs @@ -8,6 +8,7 @@ use std::{ sync::{Arc, Mutex}, }; +use hdv_derive::HdvSerde; use lru::LruCache; use once_cell::sync::Lazy; use serde::{de::Visitor, Deserialize, Serialize}; @@ -146,6 +147,27 @@ impl InternetAddr { } } +#[derive(Debug, Clone, HdvSerde)] +pub struct InternetAddrHdv { + pub host: Arc, + pub port: u16, +} +impl From<&InternetAddr> for InternetAddrHdv { + fn from(value: &InternetAddr) -> Self { + let (host, port) = match &value.0 { + InternetAddrKind::SocketAddr(x) => return (*x).into(), + InternetAddrKind::DomainName { addr, port } => (addr.clone(), *port), + }; + Self { host, port } + } +} +impl From for InternetAddrHdv { + fn from(value: SocketAddr) -> Self { + let (host, port) = (value.ip().to_string().into(), value.port()); + Self { host, port } + } +} + #[derive(Debug, Clone)] pub struct InternetAddrStr(pub InternetAddr); impl AddressString for InternetAddrStr { diff --git a/common/src/lib.rs b/common/src/lib.rs index 8e79838..6f0ae3a 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -10,6 +10,7 @@ pub mod error; pub mod filter; pub mod header; pub mod loading; +pub mod log; pub mod proxy_table; pub mod sampling; pub mod stream; diff --git a/common/src/log.rs b/common/src/log.rs new file mode 100644 index 0000000..ed03a4f --- /dev/null +++ b/common/src/log.rs @@ -0,0 +1,115 @@ +use std::{ + num::NonZeroUsize, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use file_rotating_log::{ + rotator::{spawn_flusher, LogRotator, RotationPolicy}, + LogWriter, +}; +use hdv::{ + io::text::{HdvTextWriter, HdvTextWriterOptions}, + serde::{HdvScheme, HdvSerialize}, +}; +use hdv_derive::HdvSerde; + +const FLUSH_INTERVAL: Duration = Duration::from_secs(30); + +#[derive(Debug)] +pub struct HdvLogger { + rotator: Arc>>>, +} +impl HdvLogger +where + T: HdvScheme + HdvSerialize + Sync + Send + 'static, +{ + pub fn new(output_dir: PathBuf) -> Self { + let rotation = RotationPolicy { + max_records: NonZeroUsize::new(1024 * 64).unwrap(), + max_epochs: 4, + }; + let rotator = LogRotator::new(output_dir, rotation); + let rotator = Arc::new(Mutex::new(rotator)); + spawn_flusher(Arc::clone(&rotator), FLUSH_INTERVAL); + Self { rotator } + } + + pub fn write(&self, record: &T) { + let mut rotator = self.rotator.lock().unwrap(); + rotator.writer().writer().write(record).unwrap(); + rotator.incr_record_count(); + } + + pub fn flush(&self) { + self.rotator.lock().unwrap().flush(); + } +} + +#[derive(Debug)] +struct HdvLogWriter { + writer: HdvTextWriter, +} +impl HdvLogWriter { + pub fn writer(&mut self) -> &mut HdvTextWriter { + &mut self.writer + } +} +impl LogWriter for HdvLogWriter +where + T: HdvScheme + HdvSerialize, +{ + fn flush(&mut self) { + self.writer.flush().unwrap(); + } + + fn open(path: impl AsRef) -> Self { + let file = std::fs::File::options() + .write(true) + .create(true) + .truncate(true) + .open(path) + .expect("Cannot create a log file"); + let options = HdvTextWriterOptions { + is_csv_header: true, + }; + let writer = HdvTextWriter::new(file, options); + Self { writer } + } + + fn file_extension() -> &'static str { + "csv" + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Timing { + pub start: (Instant, SystemTime), + pub end: Instant, +} +impl Timing { + pub fn duration(&self) -> Duration { + self.end - self.start.0 + } +} +#[derive(Debug, Clone, HdvSerde)] +pub struct TimingHdv { + pub start_ms: u64, + pub duration_ms: u64, +} +impl From<&Timing> for TimingHdv { + fn from(value: &Timing) -> Self { + let start_ms = value + .start + .1 + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let duration_ms = value.duration().as_millis() as u64; + Self { + start_ms, + duration_ms, + } + } +} diff --git a/common/src/stream/addr.rs b/common/src/stream/addr.rs index d7af0cb..1e457a8 100644 --- a/common/src/stream/addr.rs +++ b/common/src/stream/addr.rs @@ -1,8 +1,9 @@ use std::{fmt::Display, str::FromStr, sync::Arc}; +use hdv_derive::HdvSerde; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use crate::addr::{InternetAddr, ParseInternetAddrError}; +use crate::addr::{InternetAddr, InternetAddrHdv, ParseInternetAddrError}; pub trait StreamType: Clone @@ -57,3 +58,16 @@ impl> FromStr for StreamAddr { }) } } + +#[derive(Debug, Clone, HdvSerde)] +pub struct StreamAddrHdv { + pub addr: InternetAddrHdv, + pub ty: Arc, +} +impl From<&StreamAddr> for StreamAddrHdv { + fn from(value: &StreamAddr) -> Self { + let addr = (&value.address).into(); + let ty = value.stream_type.to_string().into(); + Self { addr, ty } + } +} diff --git a/common/src/stream/io_copy/mod.rs b/common/src/stream/io_copy/mod.rs index 97db9cc..8445898 100644 --- a/common/src/stream/io_copy/mod.rs +++ b/common/src/stream/io_copy/mod.rs @@ -12,9 +12,11 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_throughput::{ReadGauge, WriteGauge}; use tracing::info; +use crate::log::Timing; + use super::{ addr::{StreamAddr, StreamType}, - log::{StreamLog, StreamProxyLog, StreamRecord}, + log::{StreamLog, StreamProxyLog, LOGGER}, metrics::{Session, StreamSessionTable}, }; @@ -155,16 +157,24 @@ pub struct LogContext { pub destination: Option>, } -fn log(metrics: StreamLog, destination: Option>) { +fn log(log: StreamLog, destination: Option>) { match destination { Some(d) => { - let metrics = StreamProxyLog { - stream: metrics, + let log = StreamProxyLog { + stream: log, destination: d.address, }; - table_log::log!(&StreamRecord::ProxyLog(&metrics)); + let record = (&log).into(); + if let Some(x) = LOGGER.lock().unwrap().as_ref() { + x.write(&record); + } + } + None => { + let record = (&log).into(); + if let Some(x) = LOGGER.lock().unwrap().as_ref() { + x.write(&record); + } } - None => table_log::log!(&StreamRecord::Log(&metrics)), } } @@ -218,11 +228,14 @@ fn get_log_from_copy_result( ) -> (StreamLog, Result<(), tokio_io::CopyBiErrorKind>) { let (bytes_uplink, bytes_downlink) = result.amounts; - counter!("stream.bytes_uplink").increment(bytes_uplink); - counter!("stream.bytes_downlink").increment(bytes_downlink); - let metrics = StreamLog { + counter!("stream.up.bytes").increment(bytes_uplink); + counter!("stream.dn.bytes").increment(bytes_downlink); + let timing = Timing { start: log_context.start, end: result.end, + }; + let log = StreamLog { + timing, bytes_uplink, bytes_downlink, upstream_addr: log_context.upstream_addr, @@ -230,5 +243,5 @@ fn get_log_from_copy_result( downstream_addr: log_context.downstream_addr, }; - (metrics, result.io_result) + (log, result.io_result) } diff --git a/common/src/stream/log.rs b/common/src/stream/log.rs index 98c5148..5275d45 100644 --- a/common/src/stream/log.rs +++ b/common/src/stream/log.rs @@ -2,19 +2,32 @@ use std::{ fmt::{self, Display}, net::SocketAddr, ops::Deref, - time::{Instant, SystemTime, UNIX_EPOCH}, + path::PathBuf, + sync::{Arc, Mutex}, }; use bytesize::ByteSize; +use hdv_derive::HdvSerde; +use once_cell::sync::Lazy; -use crate::addr::{InternetAddr, InternetAddrKind}; +use crate::{ + addr::{InternetAddr, InternetAddrHdv, InternetAddrKind}, + log::{HdvLogger, Timing, TimingHdv}, +}; + +use super::addr::{StreamAddr, StreamAddrHdv}; -use super::addr::StreamAddr; +pub static LOGGER: Lazy>>>> = + Lazy::new(|| Arc::new(Mutex::new(None))); +pub fn init_logger(output_dir: PathBuf) { + let output_dir = output_dir.join("stream_record"); + let logger = HdvLogger::new(output_dir); + *LOGGER.lock().unwrap() = Some(logger); +} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamLog { - pub start: (Instant, SystemTime), - pub end: Instant, + pub timing: Timing, pub bytes_uplink: u64, pub bytes_downlink: u64, pub upstream_addr: StreamAddr, @@ -24,8 +37,7 @@ pub struct StreamLog { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SimplifiedStreamLog { - pub start: (Instant, SystemTime), - pub end: Instant, + pub timing: Timing, pub upstream_addr: StreamAddr, pub upstream_sock_addr: SocketAddr, pub downstream_addr: Option, @@ -43,86 +55,74 @@ pub struct SimplifiedStreamProxyLog { pub destination: InternetAddr, } -pub enum StreamRecord<'caller, ST> { - Log(&'caller StreamLog), - ProxyLog(&'caller StreamProxyLog), - SimplifiedLog(&'caller SimplifiedStreamLog), - SimplifiedProxyLog(&'caller SimplifiedStreamProxyLog), +#[derive(Debug, Clone, HdvSerde)] +pub struct StreamLogHdv { + pub timing: TimingHdv, + pub up_bytes: Option, + pub dn_bytes: Option, + pub upstream_addr: StreamAddrHdv, + pub upstream_sock_addr: InternetAddrHdv, + pub downstream_addr: Option, + pub destination: Option, } -impl serde::Serialize for StreamRecord<'_, ST> { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - #[derive(serde::Serialize)] - struct Record { - pub start_ms: u128, - pub duration_ms: u128, - pub bytes_uplink: Option, - pub bytes_downlink: Option, - pub upstream_addr_type: String, - pub upstream_addr: String, - pub upstream_sock_addr: SocketAddr, - pub downstream_addr: Option, - pub destination: Option, +impl From<&StreamLog> for StreamLogHdv { + fn from(value: &StreamLog) -> Self { + let timing = (&value.timing).into(); + let up_bytes = Some(value.bytes_uplink); + let dn_bytes = Some(value.bytes_downlink); + let upstream_addr = (&value.upstream_addr).into(); + let upstream_sock_addr = value.upstream_sock_addr.into(); + let downstream_addr = value.downstream_addr.map(|x| x.into()); + let destination = None; + Self { + timing, + up_bytes, + dn_bytes, + upstream_addr, + upstream_sock_addr, + downstream_addr, + destination, } - fn from_log(s: &StreamLog) -> Record { - let duration = s.end - s.start.0; - Record { - start_ms: s.start.1.duration_since(UNIX_EPOCH).unwrap().as_millis(), - duration_ms: duration.as_millis(), - bytes_uplink: Some(s.bytes_uplink), - bytes_downlink: Some(s.bytes_downlink), - upstream_addr_type: s.upstream_addr.stream_type.to_string(), - upstream_addr: s.upstream_addr.address.to_string(), - upstream_sock_addr: s.upstream_sock_addr, - downstream_addr: s.downstream_addr, - destination: None, - } - } - fn from_simplified_log(s: &SimplifiedStreamLog) -> Record { - let duration = s.end - s.start.0; - Record { - start_ms: s.start.1.duration_since(UNIX_EPOCH).unwrap().as_millis(), - duration_ms: duration.as_millis(), - bytes_uplink: None, - bytes_downlink: None, - upstream_addr_type: s.upstream_addr.stream_type.to_string(), - upstream_addr: s.upstream_addr.address.to_string(), - upstream_sock_addr: s.upstream_sock_addr, - downstream_addr: s.downstream_addr, - destination: None, - } + } +} +impl From<&SimplifiedStreamLog> for StreamLogHdv { + fn from(value: &SimplifiedStreamLog) -> Self { + let timing = (&value.timing).into(); + let up_bytes = None; + let dn_bytes = None; + let upstream_addr = (&value.upstream_addr).into(); + let upstream_sock_addr = value.upstream_sock_addr.into(); + let downstream_addr = value.downstream_addr.map(|x| x.into()); + let destination = None; + Self { + timing, + up_bytes, + dn_bytes, + upstream_addr, + upstream_sock_addr, + downstream_addr, + destination, } - let record = match &self { - Self::Log(s) => from_log(s), - Self::ProxyLog(s) => { - let mut r = from_log(&s.stream); - r.destination = Some(s.destination.to_string()); - r - } - Self::SimplifiedLog(s) => from_simplified_log(s), - Self::SimplifiedProxyLog(s) => { - let mut r = from_simplified_log(&s.stream); - r.destination = Some(s.destination.to_string()); - r - } - }; - record.serialize(serializer) } } -impl<'erased, ST: fmt::Display + 'erased> table_log::LogRecord<'erased> - for StreamRecord<'erased, ST> -{ - fn table_name(&self) -> &'static str { - "stream_record" +impl From<&StreamProxyLog> for StreamLogHdv { + fn from(value: &StreamProxyLog) -> Self { + let mut this: StreamLogHdv = (&value.stream).into(); + this.destination = Some((&value.destination).into()); + this + } +} +impl From<&SimplifiedStreamProxyLog> for StreamLogHdv { + fn from(value: &SimplifiedStreamProxyLog) -> Self { + let mut this: StreamLogHdv = (&value.stream).into(); + this.destination = Some((&value.destination).into()); + this } } impl Display for StreamLog { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let duration = self.end - self.start.0; - let duration = duration.as_secs_f64(); + let duration = self.timing.duration().as_secs_f64(); let uplink_speed = self.bytes_uplink as f64 / duration; let downlink_speed = self.bytes_downlink as f64 / duration; let upstream_addrs = match self.upstream_addr.address.deref() { @@ -150,8 +150,7 @@ impl Display for StreamLog { impl Display for SimplifiedStreamLog { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let duration = self.end - self.start.0; - let duration = duration.as_secs_f64(); + let duration = self.timing.duration().as_secs_f64(); let upstream_addrs = match self.upstream_addr.address.deref() { InternetAddrKind::SocketAddr(_) => self.upstream_addr.to_string(), InternetAddrKind::DomainName { .. } => { diff --git a/common/src/udp/io_copy.rs b/common/src/udp/io_copy.rs index 1385158..0d480e3 100644 --- a/common/src/udp/io_copy.rs +++ b/common/src/udp/io_copy.rs @@ -2,7 +2,7 @@ use std::{ io::{self, Write}, net::SocketAddr, sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, Mutex, RwLock, }, time::{Duration, SystemTime}, @@ -20,7 +20,11 @@ use udp_listener::{AcceptedUdpRead, AcceptedUdpWrite}; use crate::{ addr::InternetAddr, error::AnyError, - udp::{log::FlowRecord, TIMEOUT}, + log::Timing, + udp::{ + log::{TrafficLog, LOGGER}, + TIMEOUT, + }, }; use super::{ @@ -179,8 +183,10 @@ where match &res { Ok(log) => { - let record = FlowRecord(log); - table_log::log!(&record); + let record = log.into(); + if let Some(x) = LOGGER.lock().unwrap().as_mut() { + x.write(&record) + } info!(%log, "{log_prefix}: I/O copy finished"); } @@ -221,8 +227,8 @@ where let bytes_uplink = Arc::new(AtomicU64::new(0)); let bytes_downlink = Arc::new(AtomicU64::new(0)); - let packets_uplink = Arc::new(AtomicUsize::new(0)); - let packets_downlink = Arc::new(AtomicUsize::new(0)); + let packets_uplink = Arc::new(AtomicU64::new(0)); + let packets_downlink = Arc::new(AtomicU64::new(0)); let mut en_dec_buf = [0; BUFFER_LENGTH]; @@ -390,19 +396,27 @@ where *last_downlink_packet.read().unwrap(), *last_uplink_packet.read().unwrap(), ); - counter!("udp.io_copy.bytes_uplink").increment(bytes_uplink.load(Ordering::Relaxed)); - counter!("udp.io_copy.bytes_downlink").increment(bytes_downlink.load(Ordering::Relaxed)); - counter!("udp.io_copy.packets_uplink").increment(packets_uplink.load(Ordering::Relaxed) as _); - counter!("udp.io_copy.packets_downlink") - .increment(packets_downlink.load(Ordering::Relaxed) as _); - Ok(FlowLog { - flow, + let up = TrafficLog { + bytes: bytes_uplink.load(Ordering::Relaxed), + packets: packets_uplink.load(Ordering::Relaxed), + }; + let dn = TrafficLog { + bytes: bytes_downlink.load(Ordering::Relaxed), + packets: packets_downlink.load(Ordering::Relaxed), + }; + let timing = Timing { start, end: last_packet, - bytes_uplink: bytes_uplink.load(Ordering::Relaxed), - bytes_downlink: bytes_downlink.load(Ordering::Relaxed), - packets_uplink: packets_uplink.load(Ordering::Relaxed), - packets_downlink: packets_downlink.load(Ordering::Relaxed), + }; + counter!("udp.io_copy.up.bytes").increment(up.bytes); + counter!("udp.io_copy.up.packets").increment(up.packets); + counter!("udp.io_copy.dn.bytes").increment(dn.bytes); + counter!("udp.io_copy.dn.packets").increment(dn.packets); + Ok(FlowLog { + flow, + timing, + up, + dn, }) } diff --git a/common/src/udp/log.rs b/common/src/udp/log.rs index d977344..4c0ae62 100644 --- a/common/src/udp/log.rs +++ b/common/src/udp/log.rs @@ -1,36 +1,53 @@ use core::fmt; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::{ + path::PathBuf, + sync::{Arc, Mutex}, +}; use bytesize::ByteSize; +use hdv_derive::HdvSerde; +use once_cell::sync::Lazy; -use super::Flow; +use crate::log::{HdvLogger, Timing, TimingHdv}; + +use super::{Flow, FlowHdv}; + +pub static LOGGER: Lazy>>>> = + Lazy::new(|| Arc::new(Mutex::new(None))); +pub fn init_logger(output_dir: PathBuf) { + let output_dir = output_dir.join("udp_record"); + let logger = HdvLogger::new(output_dir); + *LOGGER.lock().unwrap() = Some(logger); +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, HdvSerde)] +pub struct TrafficLog { + pub bytes: u64, + pub packets: u64, +} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FlowLog { pub flow: Flow, - pub start: (Instant, SystemTime), - pub end: Instant, - pub bytes_uplink: u64, - pub bytes_downlink: u64, - pub packets_uplink: usize, - pub packets_downlink: usize, + pub timing: Timing, + pub up: TrafficLog, + pub dn: TrafficLog, } impl fmt::Display for FlowLog { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let duration = self.end - self.start.0; - let duration = duration.as_secs_f64(); - let uplink_speed = self.bytes_uplink as f64 / duration; - let downlink_speed = self.bytes_downlink as f64 / duration; + let duration = self.timing.duration().as_secs_f64(); + let uplink_speed = self.up.bytes as f64 / duration; + let downlink_speed = self.dn.bytes as f64 / duration; write!( f, "{:.1}s,up{{{},{},{}/s}},dn{{{},{},{}/s}},up:{},dn:{}", duration, - self.packets_uplink, - ByteSize::b(self.bytes_uplink), + self.up.packets, + ByteSize::b(self.up.bytes), ByteSize::b(uplink_speed as u64), - self.packets_downlink, - ByteSize::b(self.bytes_downlink), + self.dn.packets, + ByteSize::b(self.dn.bytes), ByteSize::b(downlink_speed as u64), self.flow.upstream.as_ref().unwrap().0, self.flow.downstream.0, @@ -39,45 +56,24 @@ impl fmt::Display for FlowLog { } } -pub struct FlowRecord<'caller>(pub &'caller FlowLog); -impl<'caller> serde::Serialize for FlowRecord<'caller> { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - #[derive(serde::Serialize)] - struct Record { - pub upstream_addr: String, - pub downstream_addr: String, - pub start_ms: u128, - pub duration_ms: u128, - pub bytes_uplink: u64, - pub bytes_downlink: u64, - pub packets_uplink: usize, - pub packets_downlink: usize, - } - let duration = self.0.end - self.0.start.0; - Record { - upstream_addr: self.0.flow.upstream.as_ref().unwrap().0.to_string(), - downstream_addr: self.0.flow.downstream.0.to_string(), - start_ms: self - .0 - .start - .1 - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis(), - duration_ms: duration.as_millis(), - bytes_uplink: self.0.bytes_uplink, - bytes_downlink: self.0.bytes_downlink, - packets_uplink: self.0.packets_uplink, - packets_downlink: self.0.packets_downlink, - } - .serialize(serializer) - } +#[derive(Debug, Clone, HdvSerde)] +pub struct FlowLogHdv { + pub flow: FlowHdv, + pub timing: TimingHdv, + pub up: TrafficLog, + pub dn: TrafficLog, } -impl<'caller> table_log::LogRecord<'caller> for FlowRecord<'caller> { - fn table_name(&self) -> &'static str { - "udp_record" +impl From<&FlowLog> for FlowLogHdv { + fn from(value: &FlowLog) -> Self { + let flow = (&value.flow).into(); + let timing = (&value.timing).into(); + let up = value.up.clone(); + let dn = value.dn.clone(); + Self { + flow, + timing, + up, + dn, + } } } diff --git a/common/src/udp/mod.rs b/common/src/udp/mod.rs index fa9a752..d6acaac 100644 --- a/common/src/udp/mod.rs +++ b/common/src/udp/mod.rs @@ -1,6 +1,7 @@ use std::{io, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration}; use bytes::BytesMut; +use hdv_derive::HdvSerde; use lockfree_object_pool::{LinearObjectPool, LinearOwnedReusable}; use once_cell::sync::Lazy; use thiserror::Error; @@ -8,7 +9,11 @@ use tokio::{net::UdpSocket, sync::mpsc}; use tracing::{error, info, instrument, trace, warn}; use udp_listener::{AcceptedUdp, UdpListener}; -use crate::{addr::InternetAddr, error::AnyResult, loading}; +use crate::{ + addr::{InternetAddr, InternetAddrHdv}, + error::AnyResult, + loading, +}; pub mod context; pub mod header; @@ -178,6 +183,21 @@ pub struct Flow { pub upstream: Option, pub downstream: DownstreamAddr, } +#[derive(Debug, Clone, HdvSerde)] +pub struct FlowHdv { + pub upstream: Option, + pub downstream: InternetAddrHdv, +} +impl From<&Flow> for FlowHdv { + fn from(value: &Flow) -> Self { + let upstream = value.upstream.as_ref().map(|x| (&x.0).into()); + let downstream = value.downstream.0.into(); + Self { + upstream, + downstream, + } + } +} pub struct Packet { buf: LinearOwnedReusable, diff --git a/server/Cargo.toml b/server/Cargo.toml index fabe403..ce7553d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,7 +15,6 @@ anyhow = { workspace = true } axum = { workspace = true } clap = { workspace = true, features = ["derive"] } common = { path = "../common" } -csv_logger = { workspace = true } dhat = { workspace = true, optional = true } file_watcher_tokio = { workspace = true } metrics = { workspace = true } diff --git a/server/src/main.rs b/server/src/main.rs index 99ada16..39c3f34 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use axum::Router; use clap::Parser; @@ -37,13 +37,8 @@ async fn main() -> AnyResult { std::process::abort(); } if let Some(path) = args.csv_log_path { - csv_logger::init( - path, - csv_logger::RotationPolicy { - max_records: NonZeroUsize::new(1024 * 64).unwrap(), - max_epochs: 4, - }, - ); + common::stream::log::init_logger(path.clone()); + common::udp::log::init_logger(path.clone()); }; #[cfg(feature = "dhat-heap")]