mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
chore(obs): Improve tracing instrumentation (#2086)
Co-authored-by: loverustfs <hello@rustfs.com>
This commit is contained in:
@@ -56,8 +56,7 @@ use tokio::sync::mpsc::Sender;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use tracing::{info, instrument, warn};
|
||||
|
||||
// Worker limits
|
||||
pub const WORKER_MAX_LIMIT: usize = 500;
|
||||
@@ -796,6 +795,7 @@ impl<S: StorageAPI> ReplicationPool<S> {
|
||||
}
|
||||
|
||||
/// Load bucket replication resync statuses into memory
|
||||
#[instrument(skip(cancellation_token))]
|
||||
async fn load_resync(self: Arc<Self>, buckets: &[String], cancellation_token: CancellationToken) -> Result<(), EcstoreError> {
|
||||
// TODO: add leader_lock
|
||||
// Make sure only one node running resync on the cluster
|
||||
|
||||
@@ -73,7 +73,7 @@ use tokio::task::JoinSet;
|
||||
use tokio::time::Duration as TokioDuration;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
const REPLICATION_DIR: &str = ".replication";
|
||||
const RESYNC_FILE_NAME: &str = "resync.bin";
|
||||
@@ -294,6 +294,7 @@ impl ReplicationResyncer {
|
||||
// TODO: Metrics
|
||||
}
|
||||
|
||||
#[instrument(skip(cancellation_token, storage))]
|
||||
pub async fn resync_bucket<S: StorageAPI>(
|
||||
self: Arc<Self>,
|
||||
cancellation_token: CancellationToken,
|
||||
|
||||
@@ -22,7 +22,7 @@ use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::LazyLock;
|
||||
use tracing::{error, warn};
|
||||
use tracing::{error, instrument, warn};
|
||||
|
||||
pub const CONFIG_PREFIX: &str = "config";
|
||||
const CONFIG_FILE: &str = "config.json";
|
||||
@@ -36,6 +36,8 @@ static SUB_SYSTEMS_DYNAMIC: LazyLock<HashSet<String>> = LazyLock::new(|| {
|
||||
h.insert(STORAGE_CLASS_SUB_SYS.to_owned());
|
||||
h
|
||||
});
|
||||
|
||||
#[instrument(skip(api))]
|
||||
pub async fn read_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<Vec<u8>> {
|
||||
let (data, _obj) = read_config_with_metadata(api, file, &ObjectOptions::default()).await?;
|
||||
Ok(data)
|
||||
@@ -68,6 +70,7 @@ pub async fn read_config_with_metadata<S: StorageAPI>(
|
||||
Ok((data, rd.object_info))
|
||||
}
|
||||
|
||||
#[instrument(skip(api, data))]
|
||||
pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>) -> Result<()> {
|
||||
save_config_with_opts(
|
||||
api,
|
||||
@@ -81,6 +84,7 @@ pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(api))]
|
||||
pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()> {
|
||||
match api
|
||||
.delete_object(
|
||||
|
||||
@@ -34,7 +34,7 @@ use std::{
|
||||
};
|
||||
use tokio::fs;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info};
|
||||
use tracing::{debug, error, info, instrument};
|
||||
|
||||
// Data usage storage constants
|
||||
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR;
|
||||
@@ -77,6 +77,7 @@ lazy_static::lazy_static! {
|
||||
}
|
||||
|
||||
/// Store data usage info to backend storage
|
||||
#[instrument(skip(store))]
|
||||
pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: Arc<ECStore>) -> Result<(), Error> {
|
||||
// Prevent older data from overwriting newer persisted stats
|
||||
if let Ok(buf) = read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await
|
||||
@@ -103,6 +104,7 @@ pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store:
|
||||
}
|
||||
|
||||
/// Load data usage info from backend storage
|
||||
#[instrument(skip(store))]
|
||||
pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
|
||||
let buf: Vec<u8> = match read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await {
|
||||
Ok(data) => data,
|
||||
@@ -633,6 +635,7 @@ pub async fn load_data_usage_cache(store: &crate::set_disk::SetDisks, name: &str
|
||||
Ok(d)
|
||||
}
|
||||
|
||||
#[instrument(skip(cache))]
|
||||
pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate::error::Result<()> {
|
||||
use crate::config::com::save_config;
|
||||
use crate::disk::BUCKET_META_PREFIX;
|
||||
|
||||
@@ -1317,39 +1317,34 @@ fn normalize_path_components(path: impl AsRef<Path>) -> PathBuf {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl DiskAPI for LocalDisk {
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn to_string(&self) -> String {
|
||||
self.root.to_string_lossy().to_string()
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
|
||||
fn is_local(&self) -> bool {
|
||||
true
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
|
||||
fn host_name(&self) -> String {
|
||||
self.endpoint.host_port()
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
|
||||
async fn is_online(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn endpoint(&self) -> Endpoint {
|
||||
self.endpoint.clone()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn close(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn path(&self) -> PathBuf {
|
||||
self.root.clone()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn get_disk_location(&self) -> DiskLocation {
|
||||
DiskLocation {
|
||||
pool_idx: {
|
||||
@@ -1437,7 +1432,6 @@ impl DiskAPI for LocalDisk {
|
||||
Ok(Some(disk_id))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn set_disk_id(&self, _id: Option<Uuid>) -> Result<()> {
|
||||
// No setup is required locally
|
||||
Ok(())
|
||||
@@ -2601,6 +2595,7 @@ impl DiskAPI for LocalDisk {
|
||||
ScanGuard(Arc::clone(&self.scanning))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
// Try to use cached file content reading for better performance, with safe fallback
|
||||
let file_path = self.get_object_path(volume, path)?;
|
||||
@@ -2617,6 +2612,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> {
|
||||
let drive_path = drive_path.to_string_lossy().to_string();
|
||||
check_path_length(&drive_path)?;
|
||||
|
||||
@@ -60,7 +60,6 @@ pub enum Disk {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl DiskAPI for Disk {
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn to_string(&self) -> String {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.to_string(),
|
||||
@@ -68,7 +67,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn is_online(&self) -> bool {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.is_online().await,
|
||||
@@ -76,7 +74,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn is_local(&self) -> bool {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.is_local(),
|
||||
@@ -84,7 +81,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn host_name(&self) -> String {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.host_name(),
|
||||
@@ -92,7 +88,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn endpoint(&self) -> Endpoint {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.endpoint(),
|
||||
@@ -100,7 +95,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn close(&self) -> Result<()> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.close().await,
|
||||
@@ -108,7 +102,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.get_disk_id().await,
|
||||
@@ -116,7 +109,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.set_disk_id(id).await,
|
||||
@@ -124,7 +116,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn path(&self) -> PathBuf {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.path(),
|
||||
@@ -132,7 +123,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn get_disk_location(&self) -> DiskLocation {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.get_disk_location(),
|
||||
@@ -164,7 +154,6 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.stat_volume(volume).await,
|
||||
|
||||
@@ -321,7 +321,7 @@ impl Erasure {
|
||||
///
|
||||
/// # Returns
|
||||
/// A vector of encoded shards as `Bytes`.
|
||||
#[tracing::instrument(level = "info", skip_all, fields(data_len=data.len()))]
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(data_len=data.len()))]
|
||||
pub fn encode_data(&self, data: &[u8]) -> io::Result<Vec<Bytes>> {
|
||||
// let shard_size = self.shard_size();
|
||||
// let total_size = shard_size * self.total_shard_count();
|
||||
|
||||
@@ -519,7 +519,7 @@ impl ObjectIO for SetDisks {
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, data,))]
|
||||
#[tracing::instrument(skip(self, data,))]
|
||||
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
|
||||
@@ -680,6 +680,7 @@ impl SetDisks {
|
||||
Ok((result, None))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(super) async fn heal_object_dir(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
@@ -381,7 +381,6 @@ impl BucketOperations for Sets {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectOperations for Sets {
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
self.get_disks_by_key(object).get_object_info(bucket, object, opts).await
|
||||
}
|
||||
@@ -826,7 +825,6 @@ impl HealOperations for Sets {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for Sets {
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<NamespaceLockWrapper> {
|
||||
self.disk_set[0].new_ns_lock(bucket, object).await
|
||||
}
|
||||
|
||||
@@ -281,7 +281,6 @@ impl BucketOperations for ECStore {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectOperations for ECStore {
|
||||
#[instrument(skip(self))]
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
self.handle_get_object_info(bucket, object, opts).await
|
||||
}
|
||||
@@ -561,7 +560,6 @@ impl HealOperations for ECStore {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for ECStore {
|
||||
#[instrument(skip(self))]
|
||||
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<NamespaceLockWrapper> {
|
||||
self.handle_new_ns_lock(bucket, object).await
|
||||
}
|
||||
|
||||
@@ -430,7 +430,7 @@ impl Store for ObjectStore {
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<()>` - `Ok(())` on success, or an `Error` if all attempts fail.
|
||||
#[tracing::instrument(level = "debug", skip(self, item, path))]
|
||||
#[tracing::instrument(skip(self, item, path))]
|
||||
async fn save_iam_config<Item: Serialize + Send>(&self, item: Item, path: impl AsRef<str> + Send) -> Result<()> {
|
||||
let mut data = serde_json::to_vec(&item)?;
|
||||
data = Self::encrypt_data(&data)?;
|
||||
|
||||
@@ -32,7 +32,7 @@ use std::sync::OnceLock;
|
||||
use std::time::{Duration, Instant};
|
||||
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
use tracing::{instrument, warn};
|
||||
|
||||
/// Process start time for calculating uptime.
|
||||
static PROCESS_START: OnceLock<Instant> = OnceLock::new();
|
||||
@@ -44,6 +44,7 @@ fn get_process_start() -> &'static Instant {
|
||||
}
|
||||
|
||||
/// Collect cluster statistics from the storage layer.
|
||||
#[instrument]
|
||||
async fn collect_cluster_stats() -> ClusterStats {
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return ClusterStats::default();
|
||||
|
||||
@@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
fn data_scanner_start_delay() -> Duration {
|
||||
let secs = rustfs_utils::get_env_u64(ENV_DATA_SCANNER_START_DELAY_SECS, DEFAULT_DATA_SCANNER_START_DELAY_SECS);
|
||||
@@ -100,6 +100,7 @@ pub async fn read_background_heal_info(storeapi: Arc<ECStore>) -> BackgroundHeal
|
||||
}
|
||||
|
||||
/// Save background healing information to storage
|
||||
#[instrument(skip(storeapi))]
|
||||
pub async fn save_background_heal_info(storeapi: Arc<ECStore>, info: BackgroundHealInfo) {
|
||||
// Skip for ErasureSD setup
|
||||
if is_erasure_sd().await {
|
||||
@@ -128,6 +129,77 @@ fn get_lock_acquire_timeout() -> Duration {
|
||||
Duration::from_secs(rustfs_utils::get_env_u64("RUSTFS_LOCK_ACQUIRE_TIMEOUT", 5))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn run_scan_cycle(ctx: &CancellationToken, storeapi: &Arc<ECStore>, cycle_info: &mut CurrentCycle) {
|
||||
cycle_info.current = cycle_info.next;
|
||||
cycle_info.started = Utc::now();
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let background_heal_info = read_background_heal_info(storeapi.clone()).await;
|
||||
|
||||
let scan_mode = get_cycle_scan_mode(
|
||||
cycle_info.current,
|
||||
background_heal_info.bitrot_start_cycle,
|
||||
background_heal_info.bitrot_start_time,
|
||||
);
|
||||
if background_heal_info.current_scan_mode != scan_mode {
|
||||
let mut new_heal_info = background_heal_info.clone();
|
||||
new_heal_info.current_scan_mode = scan_mode;
|
||||
|
||||
if scan_mode == HealScanMode::Deep {
|
||||
new_heal_info.bitrot_start_cycle = cycle_info.current;
|
||||
new_heal_info.bitrot_start_time = Some(Utc::now());
|
||||
}
|
||||
|
||||
save_background_heal_info(storeapi.clone(), new_heal_info).await;
|
||||
}
|
||||
|
||||
let (sender, receiver) = mpsc::channel::<DataUsageInfo>(1);
|
||||
let storeapi_clone = storeapi.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
store_data_usage_in_backend(ctx_clone, storeapi_clone, receiver).await;
|
||||
});
|
||||
|
||||
let done_cycle = Metrics::time(Metric::ScanCycle);
|
||||
let cycle_start = std::time::Instant::now();
|
||||
if let Err(e) = storeapi
|
||||
.clone()
|
||||
.nsscanner(ctx.clone(), sender, cycle_info.current, scan_mode)
|
||||
.await
|
||||
{
|
||||
error!("Failed to scan namespace: {e}");
|
||||
emit_scan_cycle_complete(false, cycle_start.elapsed());
|
||||
} else {
|
||||
done_cycle();
|
||||
emit_scan_cycle_complete(true, cycle_start.elapsed());
|
||||
info!("Namespace scanned successfully");
|
||||
|
||||
cycle_info.next += 1;
|
||||
cycle_info.current = 0;
|
||||
cycle_info.cycle_completed.push(Utc::now());
|
||||
|
||||
if cycle_info.cycle_completed.len() >= data_usage_update_dir_cycles() as usize {
|
||||
cycle_info.cycle_completed = cycle_info.cycle_completed.split_off(data_usage_update_dir_cycles() as usize);
|
||||
}
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let cycle_info_buf = cycle_info.marshal().unwrap_or_default();
|
||||
|
||||
let mut buf = Vec::with_capacity(cycle_info_buf.len() + 8);
|
||||
buf.extend_from_slice(&cycle_info.next.to_le_bytes());
|
||||
buf.extend_from_slice(&cycle_info_buf);
|
||||
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH, buf).await {
|
||||
error!("Failed to save data usage bloom name to {}: {}", &*DATA_USAGE_BLOOM_NAME_PATH, e);
|
||||
} else {
|
||||
info!("Data usage bloom name saved successfully");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) -> Result<(), ScannerError> {
|
||||
// Acquire leader lock (write lock) to ensure only one scanner runs
|
||||
let _guard = match storeapi.new_ns_lock(RUSTFS_META_BUCKET, "leader.lock").await {
|
||||
@@ -167,73 +239,7 @@ pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) ->
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
|
||||
cycle_info.current = cycle_info.next;
|
||||
cycle_info.started = Utc::now();
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let background_heal_info = read_background_heal_info(storeapi.clone()).await;
|
||||
|
||||
let scan_mode = get_cycle_scan_mode(cycle_info.current, background_heal_info.bitrot_start_cycle, background_heal_info.bitrot_start_time);
|
||||
if background_heal_info.current_scan_mode != scan_mode {
|
||||
let mut new_heal_info = background_heal_info.clone();
|
||||
new_heal_info.current_scan_mode = scan_mode;
|
||||
|
||||
if scan_mode == HealScanMode::Deep {
|
||||
new_heal_info.bitrot_start_cycle = cycle_info.current;
|
||||
new_heal_info.bitrot_start_time = Some(Utc::now());
|
||||
}
|
||||
|
||||
save_background_heal_info(storeapi.clone(), new_heal_info).await;
|
||||
}
|
||||
|
||||
|
||||
|
||||
let (sender, receiver) = mpsc::channel::<DataUsageInfo>(1);
|
||||
let storeapi_clone = storeapi.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
store_data_usage_in_backend(ctx_clone, storeapi_clone, receiver).await;
|
||||
});
|
||||
|
||||
|
||||
let done_cycle = Metrics::time(Metric::ScanCycle);
|
||||
let cycle_start = std::time::Instant::now();
|
||||
if let Err(e) = storeapi.clone().nsscanner(ctx.clone(), sender, cycle_info.current, scan_mode).await {
|
||||
error!("Failed to scan namespace: {e}");
|
||||
emit_scan_cycle_complete(false, cycle_start.elapsed());
|
||||
} else {
|
||||
done_cycle();
|
||||
emit_scan_cycle_complete(true, cycle_start.elapsed());
|
||||
info!("Namespace scanned successfully");
|
||||
|
||||
cycle_info.next +=1;
|
||||
cycle_info.current = 0;
|
||||
cycle_info.cycle_completed.push(Utc::now());
|
||||
|
||||
if cycle_info.cycle_completed.len() >= data_usage_update_dir_cycles() as usize {
|
||||
cycle_info.cycle_completed = cycle_info.cycle_completed.split_off(data_usage_update_dir_cycles() as usize);
|
||||
}
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let cycle_info_buf = cycle_info.marshal().unwrap_or_default();
|
||||
|
||||
let mut buf = Vec::with_capacity(cycle_info_buf.len() + 8);
|
||||
buf.extend_from_slice(&cycle_info.next.to_le_bytes());
|
||||
buf.extend_from_slice(&cycle_info_buf);
|
||||
|
||||
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH, buf).await {
|
||||
error!("Failed to save data usage bloom name to {}: {}", &*DATA_USAGE_BLOOM_NAME_PATH, e);
|
||||
} else {
|
||||
info!("Data usage bloom name saved successfully");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
run_scan_cycle(&ctx, &storeapi, &mut cycle_info).await;
|
||||
ticker.reset();
|
||||
}
|
||||
}
|
||||
@@ -247,6 +253,7 @@ pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) ->
|
||||
}
|
||||
|
||||
/// Store data usage info in backend. Will store all objects sent on the receiver until closed.
|
||||
#[instrument(skip(ctx, storeapi))]
|
||||
pub async fn store_data_usage_in_backend(
|
||||
ctx: CancellationToken,
|
||||
storeapi: Arc<ECStore>,
|
||||
|
||||
@@ -86,6 +86,7 @@ pub trait ScannerIODisk: Send + Sync + Debug + 'static {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ScannerIO for ECStore {
|
||||
#[tracing::instrument(skip(self, updates))]
|
||||
async fn nsscanner(
|
||||
&self,
|
||||
ctx: CancellationToken,
|
||||
@@ -222,6 +223,7 @@ impl ScannerIO for ECStore {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ScannerIOCache for SetDisks {
|
||||
#[tracing::instrument(skip(self, updates))]
|
||||
async fn nsscanner_cache(
|
||||
self: Arc<Self>,
|
||||
ctx: CancellationToken,
|
||||
@@ -562,6 +564,8 @@ impl ScannerIODisk for Disk {
|
||||
|
||||
Ok(size_summary)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, updates, cache))]
|
||||
async fn nsscanner_disk(
|
||||
&self,
|
||||
ctx: CancellationToken,
|
||||
|
||||
@@ -19,7 +19,7 @@ use http::Request;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use tower::Service;
|
||||
use tracing::{Span, debug, instrument};
|
||||
use tracing::debug;
|
||||
|
||||
/// Tower Service for the trusted proxy middleware.
|
||||
#[derive(Clone)]
|
||||
@@ -61,25 +61,7 @@ where
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "trusted_proxy_middleware",
|
||||
skip_all,
|
||||
fields(
|
||||
http.method = %req.method(),
|
||||
http.uri = %req.uri(),
|
||||
http.version = ?req.version(),
|
||||
enabled = self.enabled,
|
||||
peer.addr = tracing::field::Empty,
|
||||
client.ip = tracing::field::Empty,
|
||||
client.trusted = tracing::field::Empty,
|
||||
client.hops = tracing::field::Empty,
|
||||
error = tracing::field::Empty,
|
||||
error.message = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
fn call(&mut self, mut req: Request<ReqBody>) -> Self::Future {
|
||||
let span = Span::current();
|
||||
|
||||
// If the middleware is disabled, pass the request through immediately.
|
||||
if !self.enabled {
|
||||
debug!("Trusted proxy middleware is disabled");
|
||||
@@ -91,17 +73,9 @@ where
|
||||
// Extract the direct peer address from the request extensions.
|
||||
let peer_addr = req.extensions().get::<std::net::SocketAddr>().copied();
|
||||
|
||||
if let Some(addr) = peer_addr {
|
||||
span.record("peer.addr", addr.to_string());
|
||||
}
|
||||
|
||||
// Validate the request and extract client information.
|
||||
match self.validator.validate_request(peer_addr, req.headers()) {
|
||||
Ok(client_info) => {
|
||||
span.record("client.ip", client_info.real_ip.to_string());
|
||||
span.record("client.trusted", client_info.is_from_trusted_proxy);
|
||||
span.record("client.hops", client_info.proxy_hops as i64);
|
||||
|
||||
// Insert the verified client info into the request extensions.
|
||||
req.extensions_mut().insert(client_info);
|
||||
|
||||
@@ -109,9 +83,6 @@ where
|
||||
debug!("Proxy validation successful in {:?}", duration);
|
||||
}
|
||||
Err(err) => {
|
||||
span.record("error", true);
|
||||
span.record("error.message", err.to_string());
|
||||
|
||||
// If the error is recoverable, fallback to a direct connection info.
|
||||
if err.is_recoverable() {
|
||||
let client_info = ClientInfo::direct(
|
||||
|
||||
@@ -30,7 +30,7 @@ use rustfs_kms::{
|
||||
};
|
||||
use rustfs_policy::policy::action::{Action, AdminAction};
|
||||
use s3s::{Body, S3Request, S3Response, S3Result, s3_error};
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
/// Path to store KMS configuration in the cluster metadata
|
||||
const KMS_CONFIG_PATH: &str = "config/kms_config.json";
|
||||
@@ -43,6 +43,7 @@ fn kms_service_manager_from_context() -> std::sync::Arc<rustfs_kms::KmsServiceMa
|
||||
}
|
||||
|
||||
/// Save KMS configuration to cluster storage
|
||||
#[instrument(skip(config))]
|
||||
async fn save_kms_config(config: &KmsConfig) -> Result<(), String> {
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err("Storage layer not initialized".to_string());
|
||||
@@ -59,6 +60,7 @@ async fn save_kms_config(config: &KmsConfig) -> Result<(), String> {
|
||||
}
|
||||
|
||||
/// Load KMS configuration from cluster storage
|
||||
#[instrument]
|
||||
pub async fn load_kms_config() -> Option<KmsConfig> {
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
warn!("Storage layer not initialized, cannot load KMS config");
|
||||
|
||||
@@ -648,6 +648,9 @@ fn process_connection(
|
||||
uri = %request.uri(),
|
||||
version = ?request.version(),
|
||||
);
|
||||
if span.is_disabled() {
|
||||
return span;
|
||||
}
|
||||
if let Err(e) = span.set_parent(parent_context) {
|
||||
warn!("Failed to propagate tracing context: `{:?}`", e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user