Skip to content

feat: Geneva uploader - Add ingestion service #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
c6e5f86
initial commit
lalitb Mar 29, 2025
c658654
fix ci
lalitb Mar 29, 2025
4344c66
add tests
lalitb Mar 29, 2025
b23f990
ignore unit-test temp
lalitb Mar 29, 2025
3b867ae
Add more tests, and docs
lalitb Mar 29, 2025
619fbd5
fix cert generatrion
lalitb Mar 29, 2025
4979614
Fix
lalitb Mar 30, 2025
c0c5e67
add test against real server
lalitb Mar 30, 2025
edc7ca0
Merge branch 'main' into geneva-uploader-config-service
lalitb Mar 31, 2025
2e59d5d
fix tests
lalitb Mar 31, 2025
036c3cd
Merge branch 'geneva-uploader-config-service' of github.com:lalitb/op…
lalitb Mar 31, 2025
e9593be
disable tests for windows
lalitb Mar 31, 2025
cc6c7ee
optimize response parsing
lalitb Mar 31, 2025
09e2516
add openssl for windows
lalitb Mar 31, 2025
7ede3cd
missed updating CI yaml
lalitb Mar 31, 2025
74da53f
move tokio to dev dependency
lalitb Mar 31, 2025
6c1f057
vendored openssl
lalitb Mar 31, 2025
0006549
update cargo
lalitb Mar 31, 2025
304f75a
,
lalitb Mar 31, 2025
591520e
use monikers
lalitb Mar 31, 2025
dc0aa53
fix test
lalitb Apr 2, 2025
cea7390
Merge branch 'main' into geneva-uploader-config-service
lalitb Apr 2, 2025
ac58faf
add readme
lalitb Apr 2, 2025
088a08e
nit update to readme
lalitb Apr 2, 2025
47751ac
fix access specifier
lalitb Apr 2, 2025
7b28dfa
Merge branch 'main' into geneva-uploader-config-service
lalitb Apr 2, 2025
6c0752f
update the code to extract token endpoint
lalitb Apr 2, 2025
f91a04e
add unit-test
lalitb Apr 3, 2025
5c15ab0
Merge branch 'main' into geneva-uploader-ingestion-service
lalitb Apr 25, 2025
2e43692
Update mod.rs
lalitb Apr 25, 2025
d087e2c
Update client.rs
lalitb Apr 25, 2025
21af272
fix lint
lalitb Apr 25, 2025
d8ce8ef
fix lint and conflicts
lalitb Apr 25, 2025
6c323fc
lint
lalitb Apr 25, 2025
430e9b3
make schema_id as non-optional
lalitb Apr 25, 2025
a076b8c
review comments
lalitb Apr 28, 2025
85b5827
fail test if parallel upload fail
lalitb Apr 28, 2025
4dc9ca2
check token in every request
lalitb Apr 28, 2025
d0d5cc4
lint error, fix the result comment
lalitb Apr 28, 2025
d175caf
fix lint
lalitb Apr 29, 2025
42d9968
review comments
lalitb May 2, 2025
009674f
Merge branch 'main' into geneva-uploader-ingestion-service
lalitb May 2, 2025
4b3e78b
review comment
lalitb May 2, 2025
9633d01
Merge branch 'geneva-uploader-ingestion-service' of github.com:lalitb…
lalitb May 2, 2025
3168b3e
lint error
lalitb May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ reqwest = { version = "0.12", features = ["native-tls", "native-tls-alpn"]}
native-tls = "0.2"
thiserror = "2.0"
chrono = "0.4"
url = "2.2"


[features]
self_signed_certs = [] # Empty by default for security
Expand All @@ -25,6 +27,8 @@ rcgen = "0.13"
openssl = { version = "0.10", features = ["vendored"] }
tempfile = "3.5"
wiremock = "0.6"
futures = "0.3"
num_cpus = "1.16"

[lints]
workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

use chrono::{DateTime, Utc};
use native_tls::{Identity, Protocol};
use std::fmt;
use std::fmt::Write;
use std::fs;
use std::path::PathBuf;
use std::sync::RwLock;
Expand Down Expand Up @@ -43,7 +45,7 @@
/// openssl pkcs12 -export -in cert.pem -inkey key.pem -out client.p12 -name "alias"
/// ```
#[allow(dead_code)]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) enum AuthMethod {
/// Certificate-based authentication
///
Expand Down Expand Up @@ -116,7 +118,7 @@
/// };
/// ```
#[allow(dead_code)]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct GenevaConfigClientConfig {
pub(crate) endpoint: String,
pub(crate) environment: String,
Expand Down Expand Up @@ -191,6 +193,18 @@
static_headers: HeaderMap,
}

impl fmt::Debug for GenevaConfigClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GenevaConfigClient")
.field("config", &self.config)
.field("precomputed_url_prefix", &self.precomputed_url_prefix)
.field("agent_identity", &self.agent_identity)
.field("agent_version", &self.agent_version)
.field("static_headers", &self.static_headers)
.finish()
}

Check warning on line 205 in opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs#L197-L205

Added lines #L197 - L205 were not covered by tests
}

/// Client for interacting with the Geneva Configuration Service.
///
/// This client handles authentication and communication with the Geneva Config
Expand Down Expand Up @@ -250,21 +264,18 @@
let version_str = format!("Ver{}v0", config.config_major_version);

let mut pre_url = String::with_capacity(config.endpoint.len() + 200);
pre_url.push_str(config.endpoint.trim_end_matches('/'));
pre_url.push_str("/api/agent/v3/");
pre_url.push_str(&config.environment);
pre_url.push('/');
pre_url.push_str(&config.account);
pre_url.push_str("/MonitoringStorageKeys/?Namespace=");
pre_url.push_str(&config.namespace);
pre_url.push_str("&Region=");
pre_url.push_str(&config.region);
pre_url.push_str("&Identity=");
pre_url.push_str(&encoded_identity);
pre_url.push_str("&OSType=");
pre_url.push_str(get_os_type());
pre_url.push_str("&ConfigMajorVersion=");
pre_url.push_str(&version_str);
write!(
&mut pre_url,
"{}/api/agent/v3/{}/{}/MonitoringStorageKeys/?Namespace={}&Region={}&Identity={}&OSType={}&ConfigMajorVersion={}",
config.endpoint.trim_end_matches('/'),
config.environment,
config.account,
config.namespace,
config.region,
encoded_identity,
get_os_type(),
version_str
).map_err(|e| GenevaConfigClientError::InternalError(format!("Failed to write URL: {e}")))?;

let http_client = client_builder.build()?;

Expand Down Expand Up @@ -405,9 +416,9 @@
async fn fetch_ingestion_info(&self) -> Result<(IngestionGatewayInfo, MonikerInfo)> {
let tag_id = Uuid::new_v4().to_string(); //TODO - uuid is costly, check if counter is enough?
let mut url = String::with_capacity(self.precomputed_url_prefix.len() + 50); // Pre-allocate with reasonable capacity
url.push_str(&self.precomputed_url_prefix);
url.push_str("&TagId=");
url.push_str(&tag_id);
write!(&mut url, "{}&TagId={}", self.precomputed_url_prefix, tag_id).map_err(|e| {
GenevaConfigClientError::InternalError(format!("Failed to write URL: {e}"))

Check warning on line 420 in opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs#L420

Added line #L420 was not covered by tests
})?;

let req_id = Uuid::new_v4().to_string();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod client;
pub(crate) mod client;

#[cfg(test)]
mod tests {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
pub(crate) mod uploader;

#[cfg(test)]
mod tests {
use std::time::Instant;

mod test_helpers {
use crate::{
AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, GenevaUploader,
GenevaUploaderConfig,
};
use std::env;
use std::fs;
use std::sync::Arc;

pub struct TestUploadContext {
pub data: Vec<u8>,
pub uploader: GenevaUploader,
pub event_name: String,
pub event_version: String,
}

pub async fn build_test_upload_context() -> TestUploadContext {
// Load binary blob
let blob_path =
env::var("GENEVA_BLOB_PATH").expect("GENEVA_BLOB_PATH env var is required");
let data = fs::read(&blob_path).expect("Failed to read binary blob");

// Read config from environment
let endpoint = env::var("GENEVA_ENDPOINT").expect("GENEVA_ENDPOINT is required");
let environment =
env::var("GENEVA_ENVIRONMENT").expect("GENEVA_ENVIRONMENT is required");
let account = env::var("GENEVA_ACCOUNT").expect("GENEVA_ACCOUNT is required");
let namespace = env::var("GENEVA_NAMESPACE").expect("GENEVA_NAMESPACE is required");
let region = env::var("GENEVA_REGION").expect("GENEVA_REGION is required");
let cert_path = std::path::PathBuf::from(
std::env::var("GENEVA_CERT_PATH").expect("GENEVA_CERT_PATH is required"),
);
let cert_password = env::var("GENEVA_CERT_PASSWORD").unwrap_or_default();
let config_major_version = env::var("GENEVA_CONFIG_MAJOR_VERSION")
.expect("GENEVA_CONFIG_MAJOR_VERSION is required")
.parse::<u32>()
.expect("GENEVA_CONFIG_MAJOR_VERSION must be a u32");
let source_identity = env::var("GENEVA_SOURCE_IDENTITY").unwrap_or_else(|_| {
"Tenant=Default/Role=Uploader/RoleInstance=devhost".to_string()
});
let schema_ids =
"c1ce0ecea020359624c493bbe97f9e80;0da22cabbee419e000541a5eda732eb3".to_string();

// Define uploader config
let uploader_config = GenevaUploaderConfig {
namespace: namespace.clone(),
source_identity,
environment: environment.clone(),
schema_ids,
};

let config = GenevaConfigClientConfig {
endpoint,
environment,
account,
namespace,
region,
config_major_version,
auth_method: AuthMethod::Certificate {
path: cert_path,
password: cert_password,
},
};

// Build client and uploader
let config_client =
GenevaConfigClient::new(config).expect("Failed to create config client");
let uploader =
GenevaUploader::from_config_client(Arc::new(config_client), uploader_config)
.await
.expect("Failed to create uploader");

// Event name/version
let event_name = "Log".to_string();
let event_version = "Ver2v0".to_string();

TestUploadContext {
data,
uploader,
event_name,
event_version,
}
}

Check warning on line 89 in opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs#L23-L89

Added lines #L23 - L89 were not covered by tests
}

#[tokio::test]
/// To run this test against a real Geneva Config Service and GIG, set the following environment variables:
///
/// ```bash
/// export GENEVA_ENDPOINT="xxxhttps://<gcs-endpoint>"
/// export GENEVA_ENVIRONMENT="Test"
/// export GENEVA_ACCOUNT="YourAccountName"
/// export GENEVA_NAMESPACE="YourNamespace"
/// export GENEVA_REGION="YourRegion"
/// export GENEVA_CONFIG_MAJOR_VERSION="2"
/// export GENEVA_CERT_PATH="/path/to/client.p12"
/// export GENEVA_CERT_PASSWORD="your-cert-password"
/// export GENEVA_SOURCE_IDENTITY="Tenant=YourTenant/Role=YourRole/RoleInstance=YourInstance"
/// export GENEVA_BLOB_PATH="/path/to/blob.bin"
///
/// cargo test test_upload_to_gig_real_server -- --ignored --nocapture
/// ```
#[ignore]
async fn test_upload_to_gig_real_server() {
let ctx = test_helpers::build_test_upload_context().await;
println!("✅ Loaded blob ({} bytes)", ctx.data.len());
// below call is only for logging purposes, to get endpoint and auth info.
let (auth_info, _, _) = ctx
.uploader
.config_client
.get_ingestion_info()
.await
.unwrap();
println!("🚀 Uploading to: {}", auth_info.endpoint);

let start = Instant::now();
let response = ctx
.uploader
.upload(ctx.data, &ctx.event_name, &ctx.event_version)
.await
.expect("Upload failed");

println!(
"✅ Upload complete in {:.2?}. Ticket: {}",
start.elapsed(),
response.ticket
);
}

Check warning on line 134 in opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs#L110-L134

Added lines #L110 - L134 were not covered by tests

/// To run this test with parallel uploads:
///
/// ```bash
/// export GENEVA_ENDPOINT="https://<gcs-endpoint>"
/// export GENEVA_ENVIRONMENT="Test"
/// export GENEVA_ACCOUNT="YourAccount"
/// export GENEVA_NAMESPACE="YourNamespace"
/// export GENEVA_REGION="YourRegion"
/// export GENEVA_CONFIG_MAJOR_VERSION="2"
/// export GENEVA_CERT_PATH="/path/to/client.p12"
/// export GENEVA_CERT_PASSWORD="your-password"
/// export GENEVA_SOURCE_IDENTITY="Tenant=YourTenant/Role=Role/RoleInstance=Instance"
/// export GENEVA_BLOB_PATH="/path/to/blob.bin"
/// export GENEVA_PARALLEL_UPLOADS="10"
///
/// cargo test test_parallel_uploads -- --ignored --nocapture
/// Output:
// 🔥 Performing warm-up upload...
// 🔥 Warm-up upload complete in 222.42ms
// 🚀 Launching 5 parallel uploads...
// ✅ Upload 2 complete in 120.43ms. Ticket: ...
// ✅ Upload 4 complete in 120.35ms. Ticket: ...
// ✅ Upload 3 complete in 120.50ms. Ticket: ...
// ✅ Upload 1 complete in 154.62ms. Ticket: ...
// ✅ Upload 0 complete in 154.65ms. Ticket: ...
// 📊 Average upload duration: 133.60 ms
// ⏱️ Total elapsed for 5 parallel uploads: 154.93ms

#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_parallel_uploads() {
use std::env;
use std::time::Instant;

// Read parallelism level from env
// Use env variable if provided, else saturate all tokio threads by default (num_cpus::get())
let parallel_uploads: usize = env::var("GENEVA_PARALLEL_UPLOADS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or_else(num_cpus::get);
let ctx = test_helpers::build_test_upload_context().await;

// --- Warm-up: do the first upload to populate the token cache ---
println!("🔥 Performing warm-up upload...");
let start_warmup = Instant::now();
let _ = ctx
.uploader
.upload(ctx.data.clone(), &ctx.event_name, &ctx.event_version)
.await
.expect("Warm-up upload failed");
println!(
"🔥 Warm-up upload complete in {:.2?}",
start_warmup.elapsed()
);

println!("🚀 Launching {parallel_uploads} parallel uploads...");

let start_all = Instant::now();

let mut handles = vec![];
for i in 0..parallel_uploads {
let uploader = ctx.uploader.clone();
let data = ctx.data.clone();
let event_name = ctx.event_name.to_string();
let event_version = ctx.event_version.to_string();

let handle = tokio::spawn(async move {
let start = Instant::now();
let resp = uploader
.upload(data, &event_name, &event_version)
.await
.unwrap_or_else(|_| panic!("Upload {} failed", i));
let elapsed = start.elapsed();
println!(
"✅ Upload {} complete in {:.2?}. Ticket: {}",
i, elapsed, resp.ticket
);
elapsed
});

handles.push(handle);
}

let durations: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(|res| res.expect("Join error in upload task"))
.collect();

let total_time = start_all.elapsed();

let avg_ms =
durations.iter().map(|d| d.as_millis()).sum::<u128>() as f64 / durations.len() as f64;
println!("📊 Average upload duration: {:.2} ms", avg_ms);

println!(
"⏱️ Total elapsed for {} parallel uploads: {:.2?}",
parallel_uploads, total_time
);
}

Check warning on line 235 in opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs#L166-L235

Added lines #L166 - L235 were not covered by tests
}
Loading