|
| 1 | +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | +#![cfg_attr(not(test), deny(clippy::panic))] |
| 4 | +#![cfg_attr(not(test), deny(clippy::unwrap_used))] |
| 5 | +#![cfg_attr(not(test), deny(clippy::expect_used))] |
| 6 | +#![cfg_attr(not(test), deny(clippy::todo))] |
| 7 | +#![cfg_attr(not(test), deny(clippy::unimplemented))] |
| 8 | + |
| 9 | +use std::{str::FromStr, vec}; |
| 10 | + |
| 11 | +use datadog_remote_config::{ |
| 12 | + fetch::{ConfigInvariants, SingleChangesFetcher}, |
| 13 | + file_change_tracker::{Change, FilePath}, |
| 14 | + file_storage::{ParsedFileStorage, RawFileStorage}, |
| 15 | + RemoteConfigData, RemoteConfigProduct, Target, |
| 16 | +}; |
| 17 | +use ddcommon::Endpoint; |
| 18 | + |
| 19 | +/// Represent error that can happen while using the tracer flare. |
| 20 | +#[derive(Debug, PartialEq)] |
| 21 | +pub enum FlareError { |
| 22 | + /// Send the flare was asking without being prepared. |
| 23 | + NoFlare(String), |
| 24 | + /// Listening to the RemoteConfig failed. |
| 25 | + ListeningError(String), |
| 26 | +} |
| 27 | + |
| 28 | +impl std::fmt::Display for FlareError { |
| 29 | + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 30 | + match self { |
| 31 | + FlareError::NoFlare(msg) => write!(f, "No flare prepared to send: {}", msg), |
| 32 | + FlareError::ListeningError(msg) => write!(f, "Listening failed with: {}", msg), |
| 33 | + } |
| 34 | + } |
| 35 | +} |
| 36 | + |
| 37 | +/// Enum that hold the different log level possible |
| 38 | +#[derive(Debug)] |
| 39 | +pub enum LogLevel { |
| 40 | + Trace = 0, |
| 41 | + Debug = 1, |
| 42 | + Info = 2, |
| 43 | + Warn = 3, |
| 44 | + Error = 4, |
| 45 | + Critical = 5, |
| 46 | + Off = 6, |
| 47 | +} |
| 48 | + |
| 49 | +/// Enum that hold the different returned action to do after listening |
| 50 | +#[derive(Debug)] |
| 51 | +pub enum ReturnAction { |
| 52 | + None, |
| 53 | + StartTrace, |
| 54 | + StartDebug, |
| 55 | + StartInfo, |
| 56 | + StartWarn, |
| 57 | + StartError, |
| 58 | + StartCritical, |
| 59 | + StartOff, |
| 60 | + Stop, |
| 61 | +} |
| 62 | + |
| 63 | +impl From<LogLevel> for ReturnAction { |
| 64 | + fn from(level: LogLevel) -> Self { |
| 65 | + match level { |
| 66 | + LogLevel::Trace => ReturnAction::StartTrace, |
| 67 | + LogLevel::Debug => ReturnAction::StartDebug, |
| 68 | + LogLevel::Info => ReturnAction::StartInfo, |
| 69 | + LogLevel::Warn => ReturnAction::StartWarn, |
| 70 | + LogLevel::Error => ReturnAction::StartError, |
| 71 | + LogLevel::Critical => ReturnAction::StartCritical, |
| 72 | + LogLevel::Off => ReturnAction::StartOff, |
| 73 | + } |
| 74 | + } |
| 75 | +} |
| 76 | + |
| 77 | +pub type Listener = SingleChangesFetcher<RawFileStorage<Result<RemoteConfigData, anyhow::Error>>>; |
| 78 | + |
| 79 | +/// Function that init and return a listener of RemoteConfig |
| 80 | +/// |
| 81 | +/// # Arguments |
| 82 | +/// |
| 83 | +/// * `agent_url` - Agent url computed from the environment. |
| 84 | +/// * `language` - Language of the tracer. |
| 85 | +/// * `tracer_version` - Version of the tracer. |
| 86 | +/// * `service` - Service to listen to. |
| 87 | +/// * `env` - Environment. |
| 88 | +/// * `app_version` - Version of the application. |
| 89 | +/// * `runtime_id` - Runtime id. |
| 90 | +/// |
| 91 | +/// These arguments will be used to listen to the remote config endpoint. |
| 92 | +pub fn init_remote_config_listener( |
| 93 | + agent_url: String, |
| 94 | + language: String, |
| 95 | + tracer_version: String, |
| 96 | + service: String, |
| 97 | + env: String, |
| 98 | + app_version: String, |
| 99 | + runtime_id: String, |
| 100 | +) -> Result<Listener, FlareError> { |
| 101 | + let agent_url = match hyper::Uri::from_str(&agent_url) { |
| 102 | + Ok(uri) => uri, |
| 103 | + Err(_) => { |
| 104 | + return Err(FlareError::ListeningError("Invalid agent url".to_string())); |
| 105 | + } |
| 106 | + }; |
| 107 | + let remote_config_endpoint = Endpoint { |
| 108 | + url: agent_url, |
| 109 | + ..Default::default() |
| 110 | + }; |
| 111 | + let config_to_fetch = ConfigInvariants { |
| 112 | + language, |
| 113 | + tracer_version, |
| 114 | + endpoint: remote_config_endpoint, |
| 115 | + products: vec![ |
| 116 | + RemoteConfigProduct::AgentConfig, |
| 117 | + RemoteConfigProduct::AgentTask, |
| 118 | + ], |
| 119 | + capabilities: vec![], |
| 120 | + }; |
| 121 | + |
| 122 | + Ok(SingleChangesFetcher::new( |
| 123 | + ParsedFileStorage::default(), |
| 124 | + Target { |
| 125 | + service, |
| 126 | + env, |
| 127 | + app_version, |
| 128 | + tags: vec![], |
| 129 | + }, |
| 130 | + runtime_id, |
| 131 | + config_to_fetch, |
| 132 | + )) |
| 133 | +} |
| 134 | + |
| 135 | +/// Function that listen to RemoteConfig on the agent |
| 136 | +/// |
| 137 | +/// # Arguments |
| 138 | +/// |
| 139 | +/// * `listener` - Listener use to fetch RemoteConfig from the agent with specific config. |
| 140 | +/// |
| 141 | +/// # Returns |
| 142 | +/// |
| 143 | +/// * `Ok(ReturnAction)` - If successful. |
| 144 | +/// * `FlareError(msg)` - If something fail. |
| 145 | +/// |
| 146 | +/// # Examples |
| 147 | +/// |
| 148 | +/// Implementing and using the listener to fetch RemoteConfig from the agent |
| 149 | +/// |
| 150 | +/// ```rust no_run |
| 151 | +/// use datadog_tracer_flare::{init_remote_config_listener, run_remote_config_listener}; |
| 152 | +/// use std::time::Duration; |
| 153 | +/// use tokio::time::sleep; |
| 154 | +/// |
| 155 | +/// #[tokio::main(flavor = "current_thread")] |
| 156 | +/// async fn main() { |
| 157 | +/// // Setup the listener |
| 158 | +/// let mut listener = init_remote_config_listener( |
| 159 | +/// "http://0.0.0.0:8126".to_string(), // agent_url |
| 160 | +/// "rust".to_string(), // language |
| 161 | +/// "1.0.0".to_string(), // tracer_version |
| 162 | +/// "test-service".to_string(), // service |
| 163 | +/// "test-env".to_string(), // env |
| 164 | +/// "1.0.0".to_string(), // app_version |
| 165 | +/// "test-runtime".to_string(), // runtime_id |
| 166 | +/// ) |
| 167 | +/// .unwrap(); |
| 168 | +/// |
| 169 | +/// // Listen every second |
| 170 | +/// loop { |
| 171 | +/// let result = run_remote_config_listener(&mut listener).await; |
| 172 | +/// assert!(result.is_ok()); |
| 173 | +/// // Use the result ... |
| 174 | +/// sleep(Duration::from_secs(1)).await; |
| 175 | +/// } |
| 176 | +/// } |
| 177 | +/// ``` |
| 178 | +pub async fn run_remote_config_listener( |
| 179 | + listener: &mut Listener, |
| 180 | +) -> Result<ReturnAction, FlareError> { |
| 181 | + match listener.fetch_changes().await { |
| 182 | + Ok(changes) => { |
| 183 | + println!("Got {} changes.", changes.len()); |
| 184 | + for change in changes { |
| 185 | + match change { |
| 186 | + Change::Add(file) => { |
| 187 | + println!("Added file: {} (version: {})", file.path(), file.version()); |
| 188 | + println!("Content: {:?}", file.contents().as_ref()); |
| 189 | + } |
| 190 | + Change::Update(file, _) => { |
| 191 | + println!( |
| 192 | + "Got update for file: {} (version: {})", |
| 193 | + file.path(), |
| 194 | + file.version() |
| 195 | + ); |
| 196 | + } |
| 197 | + Change::Remove(file) => { |
| 198 | + println!("Removing file {}", file.path()); |
| 199 | + } |
| 200 | + } |
| 201 | + } |
| 202 | + } |
| 203 | + Err(e) => { |
| 204 | + return Err(FlareError::ListeningError(e.to_string())); |
| 205 | + } |
| 206 | + } |
| 207 | + |
| 208 | + Ok(ReturnAction::None) |
| 209 | +} |
0 commit comments