From fa780e93ced87fba5561ee219a4af4c08a61e7a7 Mon Sep 17 00:00:00 2001 From: Daiki Ueno Date: Sat, 18 Nov 2023 16:06:31 +0900 Subject: [PATCH] Replace tarpc dependency with directly using tokio-serde While tarpc works nicely, it pulls in a number of dependencies through its opentelemetry support, which prevents packaging the event-broker in Fedora. This switches to using tokio-serde directly for RPC. Signed-off-by: Daiki Ueno --- Cargo.lock | 192 +------------------- README.md | 1 - crypto-auditing/Cargo.toml | 8 +- crypto-auditing/src/event_broker.rs | 2 - crypto-auditing/src/event_broker/client.rs | 66 ++++--- crypto-auditing/src/event_broker/service.rs | 10 - event-broker/Cargo.toml | 2 +- event-broker/src/main.rs | 67 +++---- event-broker/src/service.rs | 10 - 9 files changed, 83 insertions(+), 275 deletions(-) delete mode 100644 crypto-auditing/src/event_broker/service.rs delete mode 100644 event-broker/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index cbd7792..3d73ae8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,17 +108,6 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" -[[package]] -name = "async-trait" -version = "0.1.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -378,25 +367,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] - [[package]] name = "crypto-auditing" version = "0.1.0" @@ -409,11 +379,11 @@ dependencies = [ "serde", "serde_cbor", "serde_with", - "tarpc", "thiserror", "tokio", "tokio-serde", "tokio-stream", + "tokio-util", "tracing", ] @@ -473,11 +443,11 @@ dependencies = [ "inotify", "libsystemd", "serde_cbor", - "tarpc", "tempfile", "tokio", "tokio-serde", "tokio-stream", + "tokio-util", "toml", "tracing", "tracing-subscriber", @@ -746,17 +716,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "getrandom" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" -dependencies = [ - "cfg-if", - "libc", - "wasi", -] - [[package]] name = "gimli" version = "0.28.0" @@ -823,12 +782,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "iana-time-zone" version = "0.1.57" @@ -1259,49 +1212,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "opentelemetry" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" -dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", -] - -[[package]] -name = "opentelemetry_api" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" -dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", -] - -[[package]] -name = "opentelemetry_sdk" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "once_cell", - "opentelemetry_api", - "percent-encoding", - "rand", - "thiserror", -] - [[package]] name = "os_str_bytes" version = "6.5.1" @@ -1330,12 +1240,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" -[[package]] -name = "percent-encoding" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" - [[package]] name = "pin-project" version = "1.1.3" @@ -1380,12 +1284,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" -[[package]] -name = "ppv-lite86" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - [[package]] name = "probe" version = "0.3.0" @@ -1444,36 +1342,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -1744,12 +1612,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.10.0" @@ -1797,41 +1659,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tarpc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f41bce44d290df0598ae4b9cd6ea7f58f651fd3aa4af1b26060c4fa32b08af7" -dependencies = [ - "anyhow", - "fnv", - "futures", - "humantime", - "opentelemetry", - "pin-project", - "rand", - "serde", - "static_assertions", - "tarpc-plugins", - "thiserror", - "tokio", - "tokio-serde", - "tokio-util", - "tracing", - "tracing-opentelemetry", -] - -[[package]] -name = "tarpc-plugins" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ee42b4e559f17bce0385ebf511a7beb67d5cc33c12c96b7f4e9789919d9c10f" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "tempfile" version = "3.8.0" @@ -1990,7 +1817,6 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", - "slab", "tokio", "tracing", ] @@ -2053,7 +1879,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2091,19 +1916,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-opentelemetry" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" -dependencies = [ - "once_cell", - "opentelemetry", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "tracing-subscriber" version = "0.3.17" diff --git a/README.md b/README.md index 1171b9c..f13ad32 100644 --- a/README.md +++ b/README.md @@ -213,4 +213,3 @@ You can open the generated `flamegraph.html` with your browser. - [libbpf-async](https://github.com/fujita/libbpf-async) for asynchronous BPF ringbuf implementation over libbpf-rs - [rust-keylime](https://github.com/keylime/rust-keylime/) for permissions management code -- [tarpc](https://github.com/google/tarpc) for the pubsub example implementation diff --git a/crypto-auditing/Cargo.toml b/crypto-auditing/Cargo.toml index 79c85a0..20b22b0 100644 --- a/crypto-auditing/Cargo.toml +++ b/crypto-auditing/Cargo.toml @@ -12,11 +12,11 @@ libc = "0.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11" serde_with = "2.2" -tarpc = { version = "0.33", features = ["serde-transport", "unix"] } thiserror = "1.0" -tokio = "1.23" -tokio-serde = { version = "0.8", features=["cbor"] } +tokio = { version = "1.23", features = ["net", "rt"] } +tokio-serde = { version = "0.8", features = ["cbor"] } tokio-stream = "0.1" +tokio-util = { version = "0.7", features = ["codec"] } tracing = "0.1" [build-dependencies] @@ -24,7 +24,7 @@ bindgen = "0.63" [dev-dependencies] anyhow = "1.0" -clap = { version = "4", features=["derive"] } +clap = { version = "4", features = ["derive"] } [[example]] name = "client" diff --git a/crypto-auditing/src/event_broker.rs b/crypto-auditing/src/event_broker.rs index 1d6dd90..5be1ed9 100644 --- a/crypto-auditing/src/event_broker.rs +++ b/crypto-auditing/src/event_broker.rs @@ -4,8 +4,6 @@ mod error; pub use error::{Error, Result}; -mod service; - mod client; pub use client::{Client, ClientHandle}; diff --git a/crypto-auditing/src/event_broker/client.rs b/crypto-auditing/src/event_broker/client.rs index e65d39f..9e8c541 100644 --- a/crypto-auditing/src/event_broker/client.rs +++ b/crypto-auditing/src/event_broker/client.rs @@ -1,20 +1,19 @@ // SPDX-License-Identifier: GPL-3.0-or-later // Copyright (C) 2022-2023 The crypto-auditing developers. -use crate::event_broker::{error::Result, service::Subscriber as _, SOCKET_PATH}; +use crate::event_broker::{error::Result, SOCKET_PATH}; use crate::types::EventGroup; use futures::{ future::{self, AbortHandle}, stream::Stream, + SinkExt, TryStreamExt, }; use std::path::{Path, PathBuf}; -use tarpc::{ - context, - server::{self, Channel}, - tokio_serde::formats::Cbor, -}; +use tokio::net::UnixStream; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_serde::{formats::SymmetricalCbor, SymmetricallyFramed}; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tracing::info; #[derive(Clone, Debug)] @@ -54,20 +53,6 @@ pub struct Client { receiver: Receiver, } -#[tarpc::server] -impl crate::event_broker::service::Subscriber for ClientInner { - async fn scopes(self, _: context::Context) -> Vec { - self.scopes.clone() - } - - async fn receive(self, _: context::Context, group: EventGroup) { - if let Err(e) = self.sender.send(group).await { - info!(error = %e, - "unable to send event"); - } - } -} - /// A handle for the client connection, which will be aborted once /// the ownership is dropped pub struct ClientHandle(AbortHandle); @@ -110,11 +95,42 @@ impl Client { /// This returns a tuple consisting a [`ClientHandle`] and a [`Stream`] /// which generates a sequence of event groups. pub async fn start(self) -> Result<(ClientHandle, impl Stream)> { - let server = tarpc::serde_transport::unix::connect(&self.address, Cbor::default).await?; - let local_addr = server.local_addr()?; - let handler = server::BaseChannel::with_defaults(server).requests(); - let (handler, abort_handle) = - future::abortable(handler.execute(self.inner.clone().serve())); + let stream = UnixStream::connect(&self.address).await?; + let local_addr = stream.local_addr()?; + + let (de, ser) = stream.into_split(); + + let ser = FramedWrite::new(ser, LengthDelimitedCodec::new()); + let de = FramedRead::new(de, LengthDelimitedCodec::new()); + + let mut ser = SymmetricallyFramed::new(ser, SymmetricalCbor::>::default()); + let mut de = SymmetricallyFramed::new(de, SymmetricalCbor::::default()); + + let inner = self.inner.clone(); + let (handler, abort_handle) = future::abortable(async move { + if let Err(e) = ser.send(inner.scopes).await { + info!(error = %e, + "unable to send subscription request"); + } + loop { + let group = match de.try_next().await { + Ok(group) => group, + Err(e) => { + info!(error = %e, + "unable to deserialize event"); + break; + } + }; + + if let Some(group) = group { + if let Err(e) = inner.sender.send(group).await { + info!(error = %e, + "unable to send event"); + break; + } + } + } + }); tokio::spawn(async move { match handler.await { Ok(()) | Err(future::Aborted) => info!(?local_addr, "client shutdown."), diff --git a/crypto-auditing/src/event_broker/service.rs b/crypto-auditing/src/event_broker/service.rs deleted file mode 100644 index 79c6ea5..0000000 --- a/crypto-auditing/src/event_broker/service.rs +++ /dev/null @@ -1,10 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// Copyright (C) 2022-2023 The crypto-auditing developers. - -use crate::types::EventGroup; - -#[tarpc::service] -pub trait Subscriber { - async fn scopes() -> Vec; - async fn receive(group: EventGroup); -} diff --git a/event-broker/Cargo.toml b/event-broker/Cargo.toml index a667605..8b28aa4 100644 --- a/event-broker/Cargo.toml +++ b/event-broker/Cargo.toml @@ -19,10 +19,10 @@ futures = "0.3" inotify = "0.10.2" libsystemd = { version = "0.6", optional = true } serde_cbor = "0.11" -tarpc = { version = "0.33", features = ["serde-transport", "unix"] } tokio = { version = "1.23", features = ["macros", "rt-multi-thread"] } tokio-serde = { version = "0.8", features = ["cbor"] } tokio-stream = "0.1" +tokio-util = "0.7" toml = "0.6" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/event-broker/src/main.rs b/event-broker/src/main.rs index ae48e4f..4dec61d 100644 --- a/event-broker/src/main.rs +++ b/event-broker/src/main.rs @@ -5,7 +5,7 @@ use anyhow::bail; use anyhow::{Context as _, Result}; use crypto_auditing::types::EventGroup; -use futures::{future, stream::StreamExt, try_join}; +use futures::{future, stream::StreamExt, try_join, SinkExt, TryStreamExt}; use inotify::{EventMask, Inotify, WatchMask}; #[cfg(feature = "libsystemd")] use libsystemd::activation::receive_descriptors; @@ -17,18 +17,16 @@ use std::os::fd::{FromRawFd, IntoRawFd}; use std::os::unix::net::UnixListener as StdUnixListener; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use tarpc::{client, context, tokio_serde::formats::Cbor}; -use tokio::net::UnixListener; +use tokio::net::{unix::OwnedWriteHalf, UnixListener}; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_serde::{formats::SymmetricalCbor, SymmetricallyFramed}; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tracing::{debug, info}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; mod config; -mod service; -use service::SubscriberClient; - struct Reader { log_file: PathBuf, } @@ -70,13 +68,18 @@ impl Reader { } } -#[derive(Clone, Debug)] +#[derive(Debug)] struct Subscription { - client: SubscriberClient, + stream: SymmetricallyFramed< + FramedWrite, + EventGroup, + SymmetricalCbor, + >, scopes: Vec, + errored: bool, } -#[derive(Clone, Debug)] +#[derive(Debug)] struct Publisher { socket_path: PathBuf, subscriptions: Arc>>, @@ -119,34 +122,27 @@ impl Publisher { tokio::spawn(async move { while let Ok((stream, _sock_addr)) = listener.accept().await { - let conn = tarpc::serde_transport::Transport::from((stream, Cbor::default())); - let subscriber_fd = conn.get_ref().as_raw_fd(); - - let tarpc::client::NewClient { - client: subscriber, - dispatch, - } = SubscriberClient::new(client::Config::default(), conn); + let subscriber_fd = stream.as_raw_fd(); debug!(socket = subscriber_fd, "subscriber connected"); - let subscriptions2 = subscriptions.clone(); - tokio::spawn(async move { - if let Err(e) = dispatch.await { - info!(error = %e, - "subscriber connection broken"); - } + let (de, ser) = stream.into_split(); - debug!(socket = %subscriber_fd, "closing connection"); - subscriptions2.write().unwrap().remove(&subscriber_fd); - }); + let ser = FramedWrite::new(ser, LengthDelimitedCodec::new()); + let de = FramedRead::new(de, LengthDelimitedCodec::new()); + + let ser = SymmetricallyFramed::new(ser, SymmetricalCbor::::default()); + let mut de = + SymmetricallyFramed::new(de, SymmetricalCbor::>::default()); // Populate the scopes - if let Ok(scopes) = subscriber.scopes(context::current()).await { + if let Some(scopes) = de.try_next().await.unwrap() { subscriptions.write().unwrap().insert( subscriber_fd, Subscription { - client: subscriber, - scopes: scopes.clone(), + stream: ser, + scopes, + errored: Default::default(), }, ); } @@ -154,20 +150,27 @@ impl Publisher { }); let mut stream = ReceiverStream::new(receiver); - let mut subscriptions; while let Some(group) = stream.next().await { + let mut subscriptions = self.subscriptions.write().unwrap(); let mut publications = Vec::new(); - subscriptions = self.subscriptions.read().unwrap().clone(); - for subscription in subscriptions.values() { + for (_, subscription) in subscriptions.iter_mut() { let mut group = group.clone(); group.events_filtered(&subscription.scopes); if !group.events().is_empty() { - publications.push(subscription.client.receive(context::current(), group)); + publications.push(async move { + if let Err(e) = subscription.stream.send(group).await { + info!(error = %e, "unable to send event"); + subscription.errored = true; + } + }); } } future::join_all(publications).await; + + // Remove errored subscriptions + subscriptions.retain(|_, v| !v.errored); } Ok(()) diff --git a/event-broker/src/service.rs b/event-broker/src/service.rs deleted file mode 100644 index aec5ccb..0000000 --- a/event-broker/src/service.rs +++ /dev/null @@ -1,10 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// Copyright (C) 2022-2023 The crypto-auditing developers. - -use crypto_auditing::types::EventGroup; - -#[tarpc::service] -pub trait Subscriber { - async fn scopes() -> Vec; - async fn receive(group: EventGroup); -}