Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ cargo_metadata = "0.19"
fast-float2 = "0.2.3"
gix = "0.71.0"
indent = "0.1.1"
inventory = "0.3.15"
logos = "0.12.1"
nom = "7.1.1"
nom-rule = "0.4"
Expand Down
1 change: 1 addition & 0 deletions src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defer = { workspace = true }
fastrace = { workspace = true }
fastrace-opentelemetry = { workspace = true }
itertools = { workspace = true }
inventory = { workspace = true }
jiff = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/common/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod crash_hook;
mod filter;
mod init;
mod loggers;
pub mod module_tag;
mod panic_hook;
mod remote_log;
mod structlog;
Expand Down
23 changes: 11 additions & 12 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use logforth::Layout;
use serde_json::Map;
use serde_json::Value as JsonValue;

use crate::module_tag::label_for_module;

const PRINTER: DateTimePrinter = DateTimePrinter::new().precision(Some(6));

pub fn format_timestamp(zdt: &Zoned) -> String {
Expand Down Expand Up @@ -123,18 +125,15 @@ impl<const FIXED_TIME: bool> Layout for TextLayout<FIXED_TIME> {
PRINTER.print_timestamp_with_offset(&timestamp, offset, &mut buf)?;
}

write!(
buf,
" {:>5} {}: {}:{} {}",
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
)?;
let level = record.level();
let module = record.module_path().map(label_for_module).unwrap_or("");
let log_file = Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default();
let line = record.line().unwrap_or(0);
let msg = record.args();
write!(buf, " {level:>5} {module}: {log_file}:{line} {msg}")?;
record.key_values().visit(&mut KvWriter(&mut buf))?;

Ok(buf)
Expand Down
89 changes: 89 additions & 0 deletions src/common/tracing/src/module_tag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// A mapping between a module path prefix and a short log label.
#[derive(Debug)]
pub struct ModuleTag {
pub prefix: &'static str,
pub label: &'static str,
}

inventory::collect!(ModuleTag);

/// Resolve the best matching label for the provided module path.
pub fn label_for_module<'a>(full_name: &'a str) -> &'a str {
inventory::iter::<ModuleTag>
.into_iter()
.filter_map(|entry| {
full_name
.starts_with(entry.prefix)
.then_some((entry.prefix.len(), entry.label))
})
.max_by_key(|(len, _)| *len)
.map(|(_, label)| label)
.unwrap_or(full_name)
}

#[macro_export]
macro_rules! register_module_tag {
($label:expr $(,)?) => {
inventory::submit! {
#![crate = databend_common_tracing]

databend_common_tracing::module_tag::ModuleTag {
prefix: module_path!(),
label: $label,
}
}
};
($label:expr, $prefix:expr $(,)?) => {
inventory::submit! {
#![crate = databend_common_tracing]

databend_common_tracing::module_tag::ModuleTag {
prefix: $prefix,
label: $label,
}
}
};
}

#[cfg(test)]
mod tests {
use super::*;

inventory::submit! {
ModuleTag {
prefix: "module::submodule",
label: "[SUB]",
}
}

inventory::submit! {
ModuleTag {
prefix: "module",
label: "[MODULE]",
}
}

#[test]
fn chooses_longest_prefix() {
assert_eq!(label_for_module("module::submodule::leaf"), "[SUB]");
}

#[test]
fn falls_back_to_full_module_name() {
assert_eq!(label_for_module("unknown::module"), "unknown::module");
}
}
38 changes: 19 additions & 19 deletions src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Logs from this module will show up as "[FUSE-VACUUM2] ...".
databend_common_tracing::register_module_tag!("[FUSE-VACUUM2]");

use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -89,7 +92,7 @@ pub async fn do_vacuum2(
{
if ctx.txn_mgr().lock().is_active() {
info!(
"[FUSE-VACUUM2] Transaction is active, skipping vacuum, target table {}",
"Transaction is active, skipping vacuum, target table {}",
table.get_table_info().desc
);
return Ok(vec![]);
Expand All @@ -100,7 +103,7 @@ pub async fn do_vacuum2(

let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
info!(
"[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum",
"Table {} has no snapshot, stopping vacuum",
fuse_table.get_table_info().desc
);
return Ok(vec![]);
Expand All @@ -116,10 +119,7 @@ pub async fn do_vacuum2(

let snapshots_before_lvt = match retention_policy {
RetentionPolicy::ByTimePeriod(delta_duration) => {
info!(
"[FUSE-VACUUM2] Using ByTimePeriod policy {:?}",
delta_duration
);
info!("Using ByTimePeriod policy {:?}", delta_duration);
let retention_period = if fuse_table.is_transient() {
// For transient table, keep no history data
TimeDelta::zero()
Expand All @@ -141,7 +141,7 @@ pub async fn do_vacuum2(
}

ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Set LVT for table {}, elapsed: {:?}, LVT: {:?}",
"Set LVT for table {}, elapsed: {:?}, LVT: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
lvt
Expand All @@ -153,7 +153,7 @@ pub async fn do_vacuum2(
}
RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => {
info!(
"[FUSE-VACUUM2] Using ByNumOfSnapshotsToKeep policy {:?}",
"Using ByNumOfSnapshotsToKeep policy {:?}",
num_snapshots_to_keep
);
// List the snapshot order by timestamp asc, till the current snapshot(inclusively).
Expand Down Expand Up @@ -195,7 +195,7 @@ pub async fn do_vacuum2(

let elapsed = start.elapsed();
ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Listed snapshots for table {}, elapsed: {:?}, snapshots_dir: {:?}, snapshots: {:?}",
"Listed snapshots for table {}, elapsed: {:?}, snapshots_dir: {:?}, snapshots: {:?}",
fuse_table.get_table_info().desc,
elapsed,
fuse_table
Expand All @@ -217,7 +217,7 @@ pub async fn do_vacuum2(
return Ok(vec![]);
};
ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Selected gc_root for table {}, elapsed: {:?}, gc_root: {:?}, snapshots_to_gc: {:?}",
"Selected gc_root for table {}, elapsed: {:?}, gc_root: {:?}, snapshots_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
gc_root,
Expand Down Expand Up @@ -246,7 +246,7 @@ pub async fn do_vacuum2(
.collect::<Vec<_>>();

ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Listed segments before gc_root for table {}, elapsed: {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}",
"Listed segments before gc_root for table {}, elapsed: {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
fuse_table.meta_location_generator().segment_location_prefix(),
Expand All @@ -264,7 +264,7 @@ pub async fn do_vacuum2(
.map(|v| TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location(v))
.collect::<Vec<_>>();
ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Filtered segments_to_gc for table {}, elapsed: {:?}, segments_to_gc: {:?}, stats_to_gc: {:?}",
"Filtered segments_to_gc for table {}, elapsed: {:?}, segments_to_gc: {:?}, stats_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&segments_to_gc),
Expand All @@ -282,7 +282,7 @@ pub async fn do_vacuum2(
gc_root_blocks.extend(segment?.block_metas()?.iter().map(|b| b.location.0.clone()));
}
ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Read segments for table {}, elapsed: {:?}",
"Read segments for table {}, elapsed: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
));
Expand All @@ -301,7 +301,7 @@ pub async fn do_vacuum2(
.collect::<Vec<_>>();

ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Listed blocks before gc_root for table {}, elapsed: {:?}, block_dir: {:?}, gc_root_timestamp: {:?}, blocks: {:?}",
"Listed blocks before gc_root for table {}, elapsed: {:?}, block_dir: {:?}, gc_root_timestamp: {:?}, blocks: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
fuse_table.meta_location_generator().block_location_prefix(),
Expand All @@ -315,7 +315,7 @@ pub async fn do_vacuum2(
.filter(|b| !gc_root_blocks.contains(b))
.collect();
ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Filtered blocks_to_gc for table {}, elapsed: {:?}, blocks_to_gc: {:?}",
"Filtered blocks_to_gc for table {}, elapsed: {:?}, blocks_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&blocks_to_gc)
Expand Down Expand Up @@ -355,7 +355,7 @@ pub async fn do_vacuum2(
}

ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Collected indexes_to_gc for table {}, elapsed: {:?}, indexes_to_gc: {:?}",
"Collected indexes_to_gc for table {}, elapsed: {:?}, indexes_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&indexes_to_gc)
Expand Down Expand Up @@ -399,7 +399,7 @@ pub async fn do_vacuum2(
.chain(indexes_to_gc.into_iter())
.collect();
ctx.set_status_info(&format!(
"[FUSE-VACUUM2] Removed files for table {}, elapsed: {:?}, files_to_gc: {:?}",
"Removed files for table {}, elapsed: {:?}, files_to_gc: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
slice_summary(&files_to_gc)
Expand Down Expand Up @@ -450,7 +450,7 @@ async fn set_lvt(
) -> Result<Option<DateTime<Utc>>> {
if !is_uuid_v7(&latest_snapshot.snapshot_id) {
info!(
"[FUSE-VACUUM2] Latest snapshot is not v7, stopping vacuum: {:?}",
"Latest snapshot is not v7, stopping vacuum: {:?}",
latest_snapshot.snapshot_id
);
return Ok(None);
Expand Down Expand Up @@ -482,7 +482,7 @@ async fn list_until_prefix(
need_one_more: bool,
gc_root_meta_ts: Option<DateTime<Utc>>,
) -> Result<Vec<Entry>> {
info!("[FUSE-VACUUM2] Listing files until prefix: {}", until);
info!("Listing files until prefix: {}", until);
let dal = fuse_table.get_operator_ref();

match dal.info().scheme() {
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-trait = { workspace = true }
databend-common-base = { workspace = true }
databend-common-exception = { workspace = true }
databend-common-expression = { workspace = true }
databend-common-tracing = { workspace = true }
fastrace = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
Expand Down
7 changes: 5 additions & 2 deletions src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Logs from this module will show up as "[PIPELINE-EXECUTOR] ...".
databend_common_tracing::register_module_tag!("[PIPELINE-EXECUTOR]");

use std::any::Any;
use std::cell::UnsafeCell;
use std::ops::Deref;
Expand Down Expand Up @@ -175,7 +178,7 @@ impl ProcessorPtr {
("error.message", err.display_text()),
]
});
log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process");
log::info!(error = err.to_string(); "Error in process");
Err(err)
}
}
Expand Down Expand Up @@ -215,7 +218,7 @@ impl ProcessorPtr {
("error.message", err.display_text()),
]
});
log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process");
log::info!(error = err.to_string(); "Error in process");
Err(err)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ databend-common-functions = { workspace = true }
databend-common-license = { workspace = true }
databend-common-pipeline-core = { workspace = true }
databend-common-sql = { workspace = true }
databend-common-tracing = { workspace = true }
databend-storages-common-cache = { workspace = true }
enum-as-inner = { workspace = true }
fastrace = { workspace = true }
Expand Down
Loading