diff --git a/.gitignore b/.gitignore index ed447fcc..1b46a92f 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ profile.json .secrets *.go *.pb -*.svg \ No newline at end of file +*.svg +deploy/logs/*.log.* \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 126bfae9..729e06d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -474,6 +474,7 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c" dependencies = [ + "brotli 7.0.0", "bzip2 0.5.2", "flate2", "futures-core", @@ -876,7 +877,7 @@ dependencies = [ "hyper-util", "pin-project-lite", "rustls 0.21.12", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-native-certs 0.8.2", "rustls-pki-types", "tokio", @@ -1057,9 +1058,9 @@ dependencies = [ [[package]] name = "axum-extra" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "460c45604cb25834835e3b4d3468510322852783dac36261d642424d75562ff3" +checksum = "5136e6c5e7e7978fe23e9876fb924af2c0f84c72127ac6ac17e7c46f457d362c" dependencies = [ "axum", "axum-core", @@ -1090,7 +1091,7 @@ dependencies = [ "hyper 1.7.0", "hyper-util", "pin-project-lite", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-pemfile 2.2.0", "rustls-pki-types", "tokio", @@ -1275,6 +1276,17 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "brotli" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 4.0.3", +] + [[package]] name = "brotli" version = "8.0.2" @@ -1283,7 +1295,17 @@ checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 5.0.0", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", ] [[package]] @@ -1433,9 +1455,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.43" +version = "1.2.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2" +checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3" dependencies = [ "find-msvc-tools", "jobserver", @@ -3584,7 +3606,7 @@ dependencies = [ "http 1.3.1", "reqwest", "rustc_version", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-pemfile 2.2.0", "serde", "serde_json", @@ -4155,7 +4177,7 @@ dependencies = [ "hyper 1.7.0", "hyper-util", "log", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-native-certs 0.8.2", "rustls-pki-types", "tokio", @@ -5197,11 +5219,10 @@ dependencies = [ [[package]] name = "num-bigint-dig" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +checksum = "82c79c15c05d4bf82b6f5ef163104cc81a760d8e874d38ac50ab67c8877b647b" dependencies = [ - "byteorder", "lazy_static", "libm", "num-integer", @@ -5659,7 +5680,7 @@ dependencies = [ "arrow-schema", "arrow-select", "base64 0.22.1", - "brotli", + "brotli 8.0.2", "bytes", "chrono", "flate2", @@ -5982,9 +6003,9 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppmd-rust" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c834641d8ad1b348c9ee86dec3b9840d805acd5f24daa5f90c788951a52ff59b" +checksum = "d558c559f0450f16f2a27a1f017ef38468c1090c9ce63c8e51366232d53717b4" [[package]] name = "pprof" @@ -6272,7 +6293,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.34", + "rustls 0.23.35", "socket2 0.6.1", "thiserror 2.0.17", "tokio", @@ -6292,7 +6313,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-pki-types", "slab", "thiserror 2.0.17", @@ -6548,7 +6569,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-pki-types", "serde", "serde_json", @@ -6624,7 +6645,7 @@ dependencies = [ "paste", "pin-project-lite", "rmcp-macros", - "schemars 1.0.4", + "schemars 1.0.5", "serde", "serde_json", "thiserror 2.0.17", @@ -6824,8 +6845,9 @@ dependencies = [ "rustfs-targets", "rustfs-utils", "rustfs-zip", - "rustls 0.23.34", + "rustls 0.23.35", "s3s", + "scopeguard", "serde", "serde_json", "serde_urlencoded", @@ -6929,7 +6951,6 @@ version = "0.0.5" dependencies = [ "async-trait", "chrono", - "lazy_static", "path-clean", "rmp-serde", "rustfs-filemeta", @@ -7023,7 +7044,7 @@ dependencies = [ "rustfs-signer", "rustfs-utils", "rustfs-workers", - "rustls 0.23.34", + "rustls 0.23.35", "s3s", "serde", "serde_json", @@ -7167,7 +7188,7 @@ dependencies = [ "clap", "mime_guess", "rmcp", - "schemars 1.0.4", + "schemars 1.0.5", "serde", "serde_json", "tokio", @@ -7228,6 +7249,7 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tracing", + "tracing-appender", "tracing-error", "tracing-opentelemetry", "tracing-subscriber", @@ -7381,7 +7403,7 @@ version = "0.0.5" dependencies = [ "base64-simd", "blake3", - "brotli", + "brotli 8.0.2", "bytes", "convert_case", "crc32fast", @@ -7402,7 +7424,7 @@ dependencies = [ "rand 0.9.2", "regex", "rustfs-config", - "rustls 0.23.34", + "rustls 0.23.35", "rustls-pemfile 2.2.0", "rustls-pki-types", "s3s", @@ -7514,9 +7536,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.34" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "aws-lc-rs", "log", @@ -7718,9 +7740,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" +checksum = "1317c3bf3e7df961da95b0a56a172a02abead31276215a0497241a7624b487ce" dependencies = [ "chrono", "dyn-clone", @@ -7732,9 +7754,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +checksum = "5f760a6150d45dd66ec044983c124595ae76912e77ed0b44124cb3e415cce5d9" dependencies = [ "proc-macro2", "quote", @@ -7973,7 +7995,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.12.0", "schemars 0.9.0", - "schemars 1.0.4", + "schemars 1.0.5", "serde_core", "serde_json", "serde_with_macros", @@ -8891,7 +8913,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls 0.23.34", + "rustls 0.23.35", "tokio", ] @@ -9086,6 +9108,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "uuid", ] [[package]] @@ -9112,6 +9135,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.30" @@ -9272,9 +9307,9 @@ checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" [[package]] name = "unicode-ident" -version = "1.0.20" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "462eeb75aeb73aea900253ce739c8e18a67423fadf006037cd3ff27e82748a06" +checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" [[package]] name = "unicode-segmentation" @@ -9566,9 +9601,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" dependencies = [ "rustls-pki-types", ] @@ -10171,9 +10206,9 @@ checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2" [[package]] name = "zopfli" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edfc5ee405f504cd4984ecc6f14d02d55cfda60fa4b689434ef4102aae150cd7" +checksum = "f05cd8797d63865425ff89b5c4a48804f35ba0ce8d125800027ad6017d2b5249" dependencies = [ "bumpalo", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index 1da101bf..a7d5cdd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,7 @@ async-compression = { version = "0.4.19" } async-recursion = "1.1.1" async-trait = "0.1.89" axum = "0.8.6" -axum-extra = "0.12.0" +axum-extra = "0.12.1" axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"], default-features = false } futures = "0.3.31" futures-core = "0.3.31" @@ -135,7 +135,7 @@ rmp-serde = { version = "1.3.0" } serde = { version = "1.0.228", features = ["derive"] } serde_json = { version = "1.0.145", features = ["raw_value"] } serde_urlencoded = "0.7.1" -schemars = "1.0.4" +schemars = "1.0.5" # Cryptography and Security aes-gcm = { version = "0.10.3", features = ["std"] } @@ -150,7 +150,7 @@ hmac = "0.12.1" jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] } pbkdf2 = "0.12.2" rsa = { version = "0.9.8" } -rustls = { version = "0.23.34", features = ["ring", "logging", "std", "tls12"], default-features = false } +rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false } rustls-pemfile = "2.2.0" rustls-pki-types = "1.13.0" sha1 = "0.10.6" @@ -190,6 +190,7 @@ glob = "0.3.3" google-cloud-storage = "1.2.0" google-cloud-auth = "1.1.0" hashbrown = { version = "0.16.0", features = ["serde", "rayon"] } +heed = { version = "0.22.0" } hex-simd = "0.8.0" highway = { version = "1.3.0" } ipnetwork = { version = "0.21.1", features = ["serde"] } @@ -225,6 +226,7 @@ rumqttc = { version = "0.25.0" } rust-embed = { version = "8.9.0" } rustc-hash = { version = "2.1.1" } s3s = { version = "0.12.0-rc.3", features = ["minio"] } +scopeguard = "1.2.0" serial_test = "3.2.0" shadow-rs = { version = "1.4.0", default-features = false } siphasher = "1.0.1" @@ -241,6 +243,7 @@ tempfile = "3.23.0" test-case = "3.3.1" thiserror = "2.0.17" tracing = { version = "0.1.41" } +tracing-appender = "0.2.3" tracing-error = "0.2.1" tracing-opentelemetry = "0.32.0" tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] } @@ -277,7 +280,7 @@ mimalloc = "0.1" [workspace.metadata.cargo-shear] -ignored = ["rustfs", "rustfs-mcp", "tokio-test"] +ignored = ["rustfs", "rustfs-mcp", "tokio-test", "scopeguard"] [profile.release] opt-level = 3 diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index 9ca10a35..1b6eb36e 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -40,4 +40,4 @@ serde_json = { workspace = true } serial_test = { workspace = true } tracing-subscriber = { workspace = true } tempfile = { workspace = true } -heed = "0.22.0" +heed = { workspace = true } diff --git a/crates/ahm/src/error.rs b/crates/ahm/src/error.rs index 3aa617cc..af2c1e28 100644 --- a/crates/ahm/src/error.rs +++ b/crates/ahm/src/error.rs @@ -14,6 +14,10 @@ use thiserror::Error; +/// Custom error type for AHM operations +/// This enum defines various error variants that can occur during +/// the execution of AHM-related tasks, such as I/O errors, storage errors, +/// configuration errors, and specific errors related to healing operations. #[derive(Debug, Error)] pub enum Error { #[error("I/O error: {0}")] @@ -85,9 +89,13 @@ pub enum Error { ProgressTrackingFailed { message: String }, } +/// A specialized Result type for AHM operations +///This type is a convenient alias for results returned by functions in the AHM crate, +/// using the custom Error type defined above. pub type Result = std::result::Result; impl Error { + /// Create an Other error from any error type pub fn other(error: E) -> Self where E: Into>, diff --git a/crates/ahm/src/heal/channel.rs b/crates/ahm/src/heal/channel.rs index ecfeae78..eb377380 100644 --- a/crates/ahm/src/heal/channel.rs +++ b/crates/ahm/src/heal/channel.rs @@ -12,18 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::Result; +use crate::Result; use crate::heal::{ manager::HealManager, task::{HealOptions, HealPriority, HealRequest, HealType}, }; - use rustfs_common::heal_channel::{ HealChannelCommand, HealChannelPriority, HealChannelReceiver, HealChannelRequest, HealChannelResponse, HealScanMode, }; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::{error, info}; +use tracing::{debug, error, info}; /// Heal channel processor pub struct HealChannelProcessor { @@ -60,7 +59,7 @@ impl HealChannelProcessor { } } None => { - info!("Heal channel receiver closed, stopping processor"); + debug!("Heal channel receiver closed, stopping processor"); break; } } diff --git a/crates/ahm/src/heal/erasure_healer.rs b/crates/ahm/src/heal/erasure_healer.rs index 65065cbb..46bbd06c 100644 --- a/crates/ahm/src/heal/erasure_healer.rs +++ b/crates/ahm/src/heal/erasure_healer.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; use crate::heal::{ progress::HealProgress, resume::{CheckpointManager, ResumeManager, ResumeUtils}, storage::HealStorageAPI, }; +use crate::{Error, Result}; use futures::future::join_all; use rustfs_common::heal_channel::{HealOpts, HealScanMode}; use rustfs_ecstore::disk::DiskStore; @@ -182,7 +182,7 @@ impl ErasureSetHealer { // check cancel status if self.cancel_token.is_cancelled() { - info!("Heal task cancelled"); + warn!("Heal task cancelled"); return Err(Error::TaskCancelled); } @@ -222,7 +222,7 @@ impl ErasureSetHealer { resume_manager: &ResumeManager, checkpoint_manager: &CheckpointManager, ) -> Result<()> { - info!("Starting heal for bucket: {} from object index {}", bucket, current_object_index); + info!(target: "rustfs:ahm:heal_bucket_with_resume" ,"Starting heal for bucket: {} from object index {}", bucket, current_object_index); // 1. get bucket info let _bucket_info = match self.storage.get_bucket_info(bucket).await? { @@ -260,7 +260,7 @@ impl ErasureSetHealer { if !object_exists { info!( - "Object {}/{} no longer exists, skipping heal (likely deleted intentionally)", + target: "rustfs:ahm:heal_bucket_with_resume" ,"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)", bucket, object ); checkpoint_manager.add_processed_object(object.clone()).await?; diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index 044289e1..3df7eb97 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; use crate::heal::{ progress::{HealProgress, HealStatistics}, storage::HealStorageAPI, task::{HealOptions, HealPriority, HealRequest, HealTask, HealTaskStatus, HealType}, }; +use crate::{Error, Result}; use rustfs_ecstore::disk::DiskAPI; use rustfs_ecstore::disk::error::DiskError; use rustfs_ecstore::global::GLOBAL_LOCAL_DISK_MAP; diff --git a/crates/ahm/src/heal/resume.rs b/crates/ahm/src/heal/resume.rs index cf9d5cae..0b15496d 100644 --- a/crates/ahm/src/heal/resume.rs +++ b/crates/ahm/src/heal/resume.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; +use crate::{Error, Result}; use rustfs_ecstore::disk::{BUCKET_META_PREFIX, DiskAPI, DiskStore, RUSTFS_META_BUCKET}; use serde::{Deserialize, Serialize}; use std::path::Path; diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index 8d5e9006..6f571a1d 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; +use crate::{Error, Result}; use async_trait::async_trait; use rustfs_common::heal_channel::{HealOpts, HealScanMode}; use rustfs_ecstore::{ diff --git a/crates/ahm/src/heal/task.rs b/crates/ahm/src/heal/task.rs index 89c71b1b..af53e4d6 100644 --- a/crates/ahm/src/heal/task.rs +++ b/crates/ahm/src/heal/task.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; use crate::heal::ErasureSetHealer; use crate::heal::{progress::HealProgress, storage::HealStorageAPI}; +use crate::{Error, Result}; use rustfs_common::heal_channel::{HealOpts, HealScanMode}; use serde::{Deserialize, Serialize}; use std::sync::Arc; diff --git a/crates/ahm/src/lib.rs b/crates/ahm/src/lib.rs index 17a70ff4..6010a2a7 100644 --- a/crates/ahm/src/lib.rs +++ b/crates/ahm/src/lib.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, OnceLock}; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - -pub mod error; +mod error; pub mod heal; pub mod scanner; pub use error::{Error, Result}; pub use heal::{HealManager, HealOptions, HealPriority, HealRequest, HealType, channel::HealChannelProcessor}; pub use scanner::Scanner; +use std::sync::{Arc, OnceLock}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; // Global cancellation token for AHM services (scanner and other background tasks) static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock = OnceLock::new(); diff --git a/crates/ahm/src/scanner/checkpoint.rs b/crates/ahm/src/scanner/checkpoint.rs index 81009f3f..3304791f 100644 --- a/crates/ahm/src/scanner/checkpoint.rs +++ b/crates/ahm/src/scanner/checkpoint.rs @@ -12,18 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::scanner::node_scanner::ScanProgress; +use crate::{Error, Result}; +use serde::{Deserialize, Serialize}; use std::{ path::{Path, PathBuf}, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; -use super::node_scanner::ScanProgress; -use crate::{Error, error::Result}; - #[derive(Debug, Serialize, Deserialize, Clone)] pub struct CheckpointData { pub version: u32, diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 6dbc2d58..e4b2ab1a 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -12,46 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. +// IO throttling component is integrated into NodeScanner +use crate::{ + Error, HealRequest, Result, get_ahm_services_cancel_token, + heal::HealManager, + scanner::{ + BucketMetrics, DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, DiskMetrics, MetricsCollector, + NodeScanner, NodeScannerConfig, ScannerMetrics, + lifecycle::ScannerItem, + local_scan::{self, LocalObjectRecord, LocalScanOutcome}, + }, +}; +use rustfs_common::data_usage::{DataUsageInfo, SizeSummary}; +use rustfs_common::metrics::{Metric, Metrics, global_metrics}; +use rustfs_ecstore::{ + self as ecstore, StorageAPI, + bucket::versioning::VersioningApi, + bucket::versioning_sys::BucketVersioningSys, + data_usage::{aggregate_local_snapshots, store_data_usage_in_backend}, + disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions}, + set_disk::SetDisks, + store_api::ObjectInfo, +}; +use rustfs_filemeta::{MetacacheReader, VersionType}; +use s3s::dto::{BucketVersioningStatus, VersioningConfiguration}; use std::{ collections::HashMap, sync::Arc, time::{Duration, SystemTime}, }; use time::OffsetDateTime; - -use ecstore::{ - disk::{Disk, DiskAPI, DiskStore, WalkDirOptions}, - set_disk::SetDisks, -}; -use rustfs_ecstore::store_api::ObjectInfo; -use rustfs_ecstore::{ - self as ecstore, StorageAPI, - data_usage::{aggregate_local_snapshots, store_data_usage_in_backend}, -}; -use rustfs_filemeta::{MetacacheReader, VersionType}; -use s3s::dto::{BucketVersioningStatus, VersioningConfiguration}; use tokio::sync::{Mutex, RwLock}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; - -use super::metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics}; -use super::node_scanner::{NodeScanner, NodeScannerConfig}; -use super::stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig}; -// IO throttling component is integrated into NodeScanner -use crate::heal::HealManager; -use crate::scanner::lifecycle::ScannerItem; -use crate::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome}; -use crate::{ - HealRequest, - error::{Error, Result}, - get_ahm_services_cancel_token, -}; - -use rustfs_common::data_usage::{DataUsageInfo, SizeSummary}; -use rustfs_common::metrics::{Metric, Metrics, globalMetrics}; -use rustfs_ecstore::bucket::versioning::VersioningApi; -use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; -use rustfs_ecstore::disk::RUSTFS_META_BUCKET; use uuid; /// Custom scan mode enum for AHM scanner @@ -772,7 +765,7 @@ impl Scanner { /// Get global metrics from common crate pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics { - (*globalMetrics).report().await + global_metrics().report().await } /// Perform a single scan cycle using optimized node scanner @@ -802,7 +795,7 @@ impl Scanner { cycle_completed: vec![chrono::Utc::now()], started: chrono::Utc::now(), }; - (*globalMetrics).set_cycle(Some(cycle_info)).await; + global_metrics().set_cycle(Some(cycle_info)).await; self.metrics.set_current_cycle(self.state.read().await.current_cycle); self.metrics.increment_total_cycles(); diff --git a/crates/ahm/src/scanner/histogram.rs b/crates/ahm/src/scanner/histogram.rs index f5d7b73b..e5ee6ede 100644 --- a/crates/ahm/src/scanner/histogram.rs +++ b/crates/ahm/src/scanner/histogram.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, sync::atomic::{AtomicU64, Ordering}, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tracing::info; /// Scanner metrics diff --git a/crates/ahm/src/scanner/io_monitor.rs b/crates/ahm/src/scanner/io_monitor.rs index 0db719f2..0d5bbe3b 100644 --- a/crates/ahm/src/scanner/io_monitor.rs +++ b/crates/ahm/src/scanner/io_monitor.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use serde::{Deserialize, Serialize}; use std::{ collections::VecDeque, sync::{ @@ -20,8 +21,6 @@ use std::{ }, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; diff --git a/crates/ahm/src/scanner/io_throttler.rs b/crates/ahm/src/scanner/io_throttler.rs index 0c31a2c4..752ce079 100644 --- a/crates/ahm/src/scanner/io_throttler.rs +++ b/crates/ahm/src/scanner/io_throttler.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::scanner::LoadLevel; use std::{ sync::{ Arc, @@ -19,12 +20,9 @@ use std::{ }, time::{Duration, SystemTime}, }; - use tokio::sync::RwLock; use tracing::{debug, info, warn}; -use super::node_scanner::LoadLevel; - /// IO throttler config #[derive(Debug, Clone)] pub struct IOThrottlerConfig { diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs index d2466157..9d410b26 100644 --- a/crates/ahm/src/scanner/lifecycle.rs +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -12,25 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; - -use crate::error::Result; +use crate::Result; use rustfs_common::data_usage::SizeSummary; use rustfs_common::metrics::IlmAction; -use rustfs_ecstore::bucket::lifecycle::{ - bucket_lifecycle_audit::LcEventSrc, - bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle}, - lifecycle, - lifecycle::Lifecycle, +use rustfs_ecstore::bucket::{ + lifecycle::{ + bucket_lifecycle_audit::LcEventSrc, + bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle}, + lifecycle, + lifecycle::Lifecycle, + }, + metadata_sys::get_object_lock_config, + object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion}, + versioning::VersioningApi, + versioning_sys::BucketVersioningSys, }; -use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config; -use rustfs_ecstore::bucket::object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion}; -use rustfs_ecstore::bucket::versioning::VersioningApi; -use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete}; use rustfs_filemeta::FileInfo; use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration}; +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; use time::OffsetDateTime; use tracing::info; diff --git a/crates/ahm/src/scanner/local_stats.rs b/crates/ahm/src/scanner/local_stats.rs index 771cb1f6..5529a824 100644 --- a/crates/ahm/src/scanner/local_stats.rs +++ b/crates/ahm/src/scanner/local_stats.rs @@ -12,22 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::scanner::node_scanner::{BucketStats, DiskStats, LocalScanStats}; +use crate::{Error, Result}; +use rustfs_common::data_usage::DataUsageInfo; +use serde::{Deserialize, Serialize}; use std::{ path::{Path, PathBuf}, sync::Arc, sync::atomic::{AtomicU64, Ordering}, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; -use rustfs_common::data_usage::DataUsageInfo; - -use super::node_scanner::{BucketStats, DiskStats, LocalScanStats}; -use crate::{Error, error::Result}; - /// local stats manager pub struct LocalStatsManager { /// node id diff --git a/crates/ahm/src/scanner/metrics.rs b/crates/ahm/src/scanner/metrics.rs index f5d7b73b..e5ee6ede 100644 --- a/crates/ahm/src/scanner/metrics.rs +++ b/crates/ahm/src/scanner/metrics.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, sync::atomic::{AtomicU64, Ordering}, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tracing::info; /// Scanner metrics diff --git a/crates/ahm/src/scanner/mod.rs b/crates/ahm/src/scanner/mod.rs index f3fdd105..2107f45f 100644 --- a/crates/ahm/src/scanner/mod.rs +++ b/crates/ahm/src/scanner/mod.rs @@ -27,8 +27,10 @@ pub mod stats_aggregator; pub use checkpoint::{CheckpointData, CheckpointInfo, CheckpointManager}; pub use data_scanner::{ScanMode, Scanner, ScannerConfig, ScannerState}; pub use io_monitor::{AdvancedIOMonitor, IOMetrics, IOMonitorConfig}; -pub use io_throttler::{AdvancedIOThrottler, IOThrottlerConfig, ResourceAllocation, ThrottleDecision}; +pub use io_throttler::{AdvancedIOThrottler, IOThrottlerConfig, MetricsSnapshot, ResourceAllocation, ThrottleDecision}; pub use local_stats::{BatchScanResult, LocalStatsManager, ScanResultEntry, StatsSummary}; -pub use metrics::ScannerMetrics; +pub use metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics}; pub use node_scanner::{IOMonitor, IOThrottler, LoadLevel, LocalScanStats, NodeScanner, NodeScannerConfig}; -pub use stats_aggregator::{AggregatedStats, DecentralizedStatsAggregator, NodeClient, NodeInfo}; +pub use stats_aggregator::{ + AggregatedStats, DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, NodeClient, NodeInfo, +}; diff --git a/crates/ahm/src/scanner/node_scanner.rs b/crates/ahm/src/scanner/node_scanner.rs index 170ea532..a6a01f95 100644 --- a/crates/ahm/src/scanner/node_scanner.rs +++ b/crates/ahm/src/scanner/node_scanner.rs @@ -12,6 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::Result; +use crate::scanner::{ + AdvancedIOMonitor, AdvancedIOThrottler, BatchScanResult, CheckpointManager, IOMonitorConfig, IOThrottlerConfig, + LocalStatsManager, MetricsSnapshot, ScanResultEntry, +}; +use rustfs_common::data_usage::DataUsageInfo; +use rustfs_ecstore::StorageAPI; +use rustfs_ecstore::disk::{DiskAPI, DiskStore}; +use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, path::{Path, PathBuf}, @@ -21,22 +30,10 @@ use std::{ }, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; -use rustfs_common::data_usage::DataUsageInfo; -use rustfs_ecstore::StorageAPI; -use rustfs_ecstore::disk::{DiskAPI, DiskStore}; // Add this import - -use super::checkpoint::CheckpointManager; -use super::io_monitor::{AdvancedIOMonitor, IOMonitorConfig}; -use super::io_throttler::{AdvancedIOThrottler, IOThrottlerConfig, MetricsSnapshot}; -use super::local_stats::{BatchScanResult, LocalStatsManager, ScanResultEntry}; -use crate::error::Result; - /// SystemTime serde mod system_time_serde { use serde::{Deserialize, Deserializer, Serialize, Serializer}; diff --git a/crates/ahm/src/scanner/stats_aggregator.rs b/crates/ahm/src/scanner/stats_aggregator.rs index 553407db..ed56b549 100644 --- a/crates/ahm/src/scanner/stats_aggregator.rs +++ b/crates/ahm/src/scanner/stats_aggregator.rs @@ -12,24 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::scanner::{ + local_stats::StatsSummary, + node_scanner::{BucketStats, LoadLevel, ScanProgress}, +}; +use crate::{Error, Result}; +use rustfs_common::data_usage::DataUsageInfo; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, sync::Arc, time::{Duration, SystemTime}, }; - -use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tracing::{debug, info, warn}; -use rustfs_common::data_usage::DataUsageInfo; - -use super::{ - local_stats::StatsSummary, - node_scanner::{BucketStats, LoadLevel, ScanProgress}, -}; -use crate::{Error, error::Result}; - /// node client config #[derive(Debug, Clone)] pub struct NodeClientConfig { diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index 70ea5398..85ce694e 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -25,9 +25,11 @@ use rustfs_ecstore::{ store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, }; use serial_test::serial; -use std::sync::Once; -use std::sync::OnceLock; -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{ + path::PathBuf, + sync::{Arc, Once, OnceLock}, + time::Duration, +}; use tokio::fs; use tokio_util::sync::CancellationToken; use tracing::info; diff --git a/crates/ahm/tests/integration_tests.rs b/crates/ahm/tests/integration_tests.rs index 76aa6f93..a5f280e8 100644 --- a/crates/ahm/tests/integration_tests.rs +++ b/crates/ahm/tests/integration_tests.rs @@ -12,19 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{sync::Arc, time::Duration}; -use tempfile::TempDir; - use rustfs_ahm::scanner::{ io_throttler::MetricsSnapshot, local_stats::StatsSummary, node_scanner::{LoadLevel, NodeScanner, NodeScannerConfig}, stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, NodeInfo}, }; - -mod scanner_optimization_tests; use scanner_optimization_tests::{PerformanceBenchmark, create_test_scanner}; - +use std::{sync::Arc, time::Duration}; +use tempfile::TempDir; +mod scanner_optimization_tests; #[tokio::test] async fn test_end_to_end_scanner_lifecycle() { let temp_dir = TempDir::new().unwrap(); diff --git a/crates/ahm/tests/lifecycle_cache_test.rs b/crates/ahm/tests/lifecycle_cache_test.rs index a54b2d12..2a61b18b 100644 --- a/crates/ahm/tests/lifecycle_cache_test.rs +++ b/crates/ahm/tests/lifecycle_cache_test.rs @@ -23,16 +23,16 @@ use rustfs_ecstore::{ store_api::{MakeBucketOptions, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader, StorageAPI}, }; use serial_test::serial; -use std::borrow::Cow; -use std::sync::Once; -use std::sync::OnceLock; -use std::{path::PathBuf, sync::Arc}; -use tokio::fs; -use tokio_util::sync::CancellationToken; -use tracing::warn; -use tracing::{debug, info}; +use std::{ + borrow::Cow, + path::PathBuf, + sync::{Arc, Once, OnceLock}, +}; //use heed_traits::Comparator; use time::OffsetDateTime; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; use uuid::Uuid; static GLOBAL_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index 90a01ed2..21ab3122 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -24,9 +24,11 @@ use rustfs_ecstore::{ tier::tier_config::{TierConfig, TierMinIO, TierType}, }; use serial_test::serial; -use std::sync::Once; -use std::sync::OnceLock; -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{ + path::PathBuf, + sync::{Arc, Once, OnceLock}, + time::Duration, +}; use tokio::fs; use tokio_util::sync::CancellationToken; use tracing::info; diff --git a/crates/ahm/tests/optimized_scanner_tests.rs b/crates/ahm/tests/optimized_scanner_tests.rs index cd6a23a1..f0aecdd8 100644 --- a/crates/ahm/tests/optimized_scanner_tests.rs +++ b/crates/ahm/tests/optimized_scanner_tests.rs @@ -12,26 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fs, net::SocketAddr, sync::Arc, sync::OnceLock, time::Duration}; -use tempfile::TempDir; -use tokio_util::sync::CancellationToken; - -use serial_test::serial; - use rustfs_ahm::heal::manager::HealConfig; use rustfs_ahm::scanner::{ Scanner, data_scanner::ScanMode, node_scanner::{LoadLevel, NodeScanner, NodeScannerConfig}, }; - -use rustfs_ecstore::disk::endpoint::Endpoint; -use rustfs_ecstore::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}; -use rustfs_ecstore::store::ECStore; use rustfs_ecstore::{ StorageAPI, + disk::endpoint::Endpoint, + endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, + store::ECStore, store_api::{MakeBucketOptions, ObjectIO, PutObjReader}, }; +use serial_test::serial; +use std::{fs, net::SocketAddr, sync::Arc, sync::OnceLock, time::Duration}; +use tempfile::TempDir; +use tokio_util::sync::CancellationToken; // Global test environment cache to avoid repeated initialization static GLOBAL_TEST_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); diff --git a/crates/ahm/tests/scanner_optimization_tests.rs b/crates/ahm/tests/scanner_optimization_tests.rs index 4f1358f2..ae2db170 100644 --- a/crates/ahm/tests/scanner_optimization_tests.rs +++ b/crates/ahm/tests/scanner_optimization_tests.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; -use tempfile::TempDir; - use rustfs_ahm::scanner::{ checkpoint::{CheckpointData, CheckpointManager}, io_monitor::{AdvancedIOMonitor, IOMonitorConfig}, @@ -23,6 +20,8 @@ use rustfs_ahm::scanner::{ node_scanner::{LoadLevel, NodeScanner, NodeScannerConfig, ScanProgress}, stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig}, }; +use std::time::Duration; +use tempfile::TempDir; #[tokio::test] async fn test_checkpoint_manager_save_and_load() { diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index a2b57c66..8a765340 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -28,7 +28,6 @@ categories = ["web-programming", "development-tools", "data-structures"] workspace = true [dependencies] -lazy_static = { workspace = true} tokio = { workspace = true } tonic = { workspace = true } uuid = { workspace = true } diff --git a/crates/common/src/bucket_stats.rs b/crates/common/src/bucket_stats.rs index d07aee1b..980586b3 100644 --- a/crates/common/src/bucket_stats.rs +++ b/crates/common/src/bucket_stats.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::last_minute::{self}; use std::collections::HashMap; -use crate::last_minute::{self}; pub struct ReplicationLatency { // Delays for single and multipart PUT requests upload_histogram: last_minute::LastMinuteHistogram, diff --git a/crates/common/src/data_usage.rs b/crates/common/src/data_usage.rs index 65f6fd83..6eddc438 100644 --- a/crates/common/src/data_usage.rs +++ b/crates/common/src/data_usage.rs @@ -14,10 +14,10 @@ use path_clean::PathClean; use serde::{Deserialize, Serialize}; -use std::hash::{DefaultHasher, Hash, Hasher}; -use std::path::Path; use std::{ collections::{HashMap, HashSet}, + hash::{DefaultHasher, Hash, Hasher}, + path::Path, time::SystemTime, }; diff --git a/crates/common/src/globals.rs b/crates/common/src/globals.rs index df2fbe10..af0dc312 100644 --- a/crates/common/src/globals.rs +++ b/crates/common/src/globals.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::sync::LazyLock; - use tokio::sync::RwLock; use tonic::transport::Channel; diff --git a/crates/common/src/last_minute.rs b/crates/common/src/last_minute.rs index eee7beca..508ef79f 100644 --- a/crates/common/src/last_minute.rs +++ b/crates/common/src/last_minute.rs @@ -27,11 +27,11 @@ struct TimedAction { #[allow(dead_code)] impl TimedAction { // Avg returns the average time spent on the action. - pub fn avg(&self) -> Option { + pub fn avg(&self) -> Option { if self.count == 0 { return None; } - Some(std::time::Duration::from_nanos(self.acc_time / self.count)) + Some(Duration::from_nanos(self.acc_time / self.count)) } // AvgBytes returns the average bytes processed. @@ -860,7 +860,7 @@ impl LastMinuteHistogram { } } - pub fn add(&mut self, size: i64, t: std::time::Duration) { + pub fn add(&mut self, size: i64, t: Duration) { let index = size_to_tag(size); self.histogram[index].add(&t); } diff --git a/crates/common/src/metrics.rs b/crates/common/src/metrics.rs index d88e5a3a..9d2a5f64 100644 --- a/crates/common/src/metrics.rs +++ b/crates/common/src/metrics.rs @@ -12,23 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::last_minute::{AccElem, LastMinuteLatency}; use chrono::{DateTime, Utc}; -use lazy_static::lazy_static; use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics; use std::{ collections::HashMap, fmt::Display, pin::Pin, sync::{ - Arc, + Arc, OnceLock, atomic::{AtomicU64, Ordering}, }, time::{Duration, SystemTime}, }; use tokio::sync::{Mutex, RwLock}; -use crate::last_minute::{AccElem, LastMinuteLatency}; - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum IlmAction { NoneAction = 0, @@ -73,8 +71,10 @@ impl Display for IlmAction { } } -lazy_static! { - pub static ref globalMetrics: Arc = Arc::new(Metrics::new()); +pub static GLOBAL_METRICS: OnceLock> = OnceLock::new(); + +pub fn global_metrics() -> &'static Arc { + GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new())) } #[derive(Clone, Debug, PartialEq, PartialOrd)] @@ -294,13 +294,13 @@ impl Metrics { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); // Update operation count - globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed); // Update latency for realtime metrics (spawn async task for this) if (metric) < Metric::LastRealtime as usize { let metric_index = metric; tokio::spawn(async move { - globalMetrics.latency[metric_index].add(duration).await; + global_metrics().latency[metric_index].add(duration).await; }); } @@ -319,13 +319,13 @@ impl Metrics { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); // Update operation count - globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed); // Update latency for realtime metrics with size (spawn async task) if (metric) < Metric::LastRealtime as usize { let metric_index = metric; tokio::spawn(async move { - globalMetrics.latency[metric_index].add_size(duration, size).await; + global_metrics().latency[metric_index].add_size(duration, size).await; }); } } @@ -339,13 +339,13 @@ impl Metrics { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); // Update operation count - globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed); // Update latency for realtime metrics (spawn async task) if (metric) < Metric::LastRealtime as usize { let metric_index = metric; tokio::spawn(async move { - globalMetrics.latency[metric_index].add(duration).await; + global_metrics().latency[metric_index].add(duration).await; }); } } @@ -360,13 +360,13 @@ impl Metrics { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); // Update operation count - globalMetrics.operations[metric].fetch_add(count as u64, Ordering::Relaxed); + global_metrics().operations[metric].fetch_add(count as u64, Ordering::Relaxed); // Update latency for realtime metrics (spawn async task) if (metric) < Metric::LastRealtime as usize { let metric_index = metric; tokio::spawn(async move { - globalMetrics.latency[metric_index].add(duration).await; + global_metrics().latency[metric_index].add(duration).await; }); } }) @@ -384,8 +384,8 @@ impl Metrics { Box::new(move || { let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); tokio::spawn(async move { - globalMetrics.actions[a_clone].fetch_add(versions, Ordering::Relaxed); - globalMetrics.actions_latency[a_clone].add(duration).await; + global_metrics().actions[a_clone].fetch_add(versions, Ordering::Relaxed); + global_metrics().actions_latency[a_clone].add(duration).await; }); }) }) @@ -395,11 +395,11 @@ impl Metrics { pub async fn inc_time(metric: Metric, duration: Duration) { let metric = metric as usize; // Update operation count - globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed); // Update latency for realtime metrics if (metric) < Metric::LastRealtime as usize { - globalMetrics.latency[metric].add(duration).await; + global_metrics().latency[metric].add(duration).await; } } @@ -501,7 +501,7 @@ pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, let tracker_clone = Arc::clone(&tracker); let disk_clone = disk_name.clone(); tokio::spawn(async move { - globalMetrics.current_paths.write().await.insert(disk_clone, tracker_clone); + global_metrics().current_paths.write().await.insert(disk_clone, tracker_clone); }); let update_fn = { @@ -520,7 +520,7 @@ pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, Arc::new(move || -> Pin + Send>> { let disk_name = disk_name.clone(); Box::pin(async move { - globalMetrics.current_paths.write().await.remove(&disk_name); + global_metrics().current_paths.write().await.remove(&disk_name); }) }) }; diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index 5dba9b03..45d3fecd 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -18,7 +18,24 @@ #![allow(unused_must_use)] #![allow(clippy::all)] +use crate::bucket::lifecycle::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc}; +use crate::bucket::lifecycle::lifecycle::{self, ExpirationOptions, Lifecycle, TransitionOptions}; +use crate::bucket::lifecycle::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats}; +use crate::bucket::lifecycle::tier_sweeper::{Jentry, delete_object_from_remote_tier}; +use crate::bucket::object_lock::objectlock_sys::enforce_retention_for_deletion; +use crate::bucket::{metadata_sys::get_lifecycle_config, versioning_sys::BucketVersioningSys}; +use crate::client::object_api_utils::new_getobjectreader; +use crate::error::Error; use crate::error::StorageError; +use crate::error::{error_resp_to_object_err, is_err_object_not_found, is_err_version_not_found, is_network_or_host_down}; +use crate::event::name::EventName; +use crate::event_notification::{EventArgs, send_event}; +use crate::global::GLOBAL_LocalNodeName; +use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id}; +use crate::store::ECStore; +use crate::store_api::StorageAPI; +use crate::store_api::{GetObjectReader, HTTPRangeSpec, ObjectInfo, ObjectOptions, ObjectToDelete}; +use crate::tier::warm_backend::WarmBackendGetOpts; use async_channel::{Receiver as A_Receiver, Sender as A_Sender, bounded}; use bytes::BytesMut; use futures::Future; @@ -27,10 +44,15 @@ use lazy_static::lazy_static; use rustfs_common::data_usage::TierStats; use rustfs_common::heal_channel::rep_has_active_rules; use rustfs_common::metrics::{IlmAction, Metrics}; -use rustfs_filemeta::fileinfo::{NULL_VERSION_ID, RestoreStatusOps, is_restored_object_on_disk}; +use rustfs_filemeta::{NULL_VERSION_ID, RestoreStatusOps, is_restored_object_on_disk}; use rustfs_utils::path::encode_dir_object; use rustfs_utils::string::strings_has_prefix_fold; use s3s::Body; +use s3s::dto::{ + BucketLifecycleConfiguration, DefaultRetention, ReplicationConfiguration, RestoreRequest, RestoreRequestType, RestoreStatus, + ServerSideEncryption, Timestamp, +}; +use s3s::header::{X_AMZ_RESTORE, X_AMZ_SERVER_SIDE_ENCRYPTION, X_AMZ_STORAGE_CLASS}; use sha2::{Digest, Sha256}; use std::any::Any; use std::collections::HashMap; @@ -47,31 +69,6 @@ use tracing::{debug, error, info}; use uuid::Uuid; use xxhash_rust::xxh64; -//use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; -//use rustfs_notify::{initialize, notification_system}; -use super::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc}; -use super::lifecycle::{self, ExpirationOptions, Lifecycle, TransitionOptions}; -use super::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats}; -use super::tier_sweeper::{Jentry, delete_object_from_remote_tier}; -use crate::bucket::object_lock::objectlock_sys::enforce_retention_for_deletion; -use crate::bucket::{metadata_sys::get_lifecycle_config, versioning_sys::BucketVersioningSys}; -use crate::client::object_api_utils::new_getobjectreader; -use crate::error::Error; -use crate::error::{error_resp_to_object_err, is_err_object_not_found, is_err_version_not_found, is_network_or_host_down}; -use crate::event::name::EventName; -use crate::event_notification::{EventArgs, send_event}; -use crate::global::GLOBAL_LocalNodeName; -use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id}; -use crate::store::ECStore; -use crate::store_api::StorageAPI; -use crate::store_api::{GetObjectReader, HTTPRangeSpec, ObjectInfo, ObjectOptions, ObjectToDelete}; -use crate::tier::warm_backend::WarmBackendGetOpts; -use s3s::dto::{ - BucketLifecycleConfiguration, DefaultRetention, ReplicationConfiguration, RestoreRequest, RestoreRequestType, RestoreStatus, - ServerSideEncryption, Timestamp, -}; -use s3s::header::{X_AMZ_RESTORE, X_AMZ_SERVER_SIDE_ENCRYPTION, X_AMZ_STORAGE_CLASS}; - pub type TimeFn = Arc Pin + Send>> + Send + Sync + 'static>; pub type TraceFn = Arc) -> Pin + Send>> + Send + Sync + 'static>; diff --git a/crates/ecstore/src/client/object_api_utils.rs b/crates/ecstore/src/client/object_api_utils.rs index cc2b8296..73ecf7ce 100644 --- a/crates/ecstore/src/client/object_api_utils.rs +++ b/crates/ecstore/src/client/object_api_utils.rs @@ -21,13 +21,12 @@ use http::HeaderMap; use s3s::dto::ETag; -use std::pin::Pin; use std::{collections::HashMap, io::Cursor, sync::Arc}; use tokio::io::BufReader; use crate::error::ErrorResponse; use crate::store_api::{GetObjectReader, HTTPRangeSpec, ObjectInfo, ObjectOptions}; -use rustfs_filemeta::fileinfo::ObjectPartInfo; +use rustfs_filemeta::ObjectPartInfo; use rustfs_rio::HashReader; use s3s::S3ErrorCode; diff --git a/crates/ecstore/src/disk/os.rs b/crates/ecstore/src/disk/os.rs index 097cd61f..66a05e75 100644 --- a/crates/ecstore/src/disk/os.rs +++ b/crates/ecstore/src/disk/os.rs @@ -25,6 +25,7 @@ use tracing::warn; use super::error::DiskError; +/// Check path length according to OS limits. pub fn check_path_length(path_name: &str) -> Result<()> { // Apple OS X path length is limited to 1016 if cfg!(target_os = "macos") && path_name.len() > 1016 { @@ -64,6 +65,10 @@ pub fn check_path_length(path_name: &str) -> Result<()> { Ok(()) } +/// Check if the given disk path is the root disk. +/// On Windows, always return false. +/// On Unix, compare the disk paths. +#[tracing::instrument(level = "debug", skip_all)] pub fn is_root_disk(disk_path: &str, root_disk: &str) -> Result { if cfg!(target_os = "windows") { return Ok(false); @@ -72,6 +77,8 @@ pub fn is_root_disk(disk_path: &str, root_disk: &str) -> Result { rustfs_utils::os::same_disk(disk_path, root_disk).map_err(|e| to_file_error(e).into()) } +/// Create a directory and all its parent components if they are missing. +#[tracing::instrument(level = "debug", skip_all)] pub async fn make_dir_all(path: impl AsRef, base_dir: impl AsRef) -> Result<()> { check_path_length(path.as_ref().to_string_lossy().to_string().as_str())?; @@ -82,11 +89,16 @@ pub async fn make_dir_all(path: impl AsRef, base_dir: impl AsRef) -> Ok(()) } +/// Check if a directory is empty. +/// Only reads one entry to determine if the directory is empty. +#[tracing::instrument(level = "debug", skip_all)] pub async fn is_empty_dir(path: impl AsRef) -> bool { read_dir(path.as_ref(), 1).await.is_ok_and(|v| v.is_empty()) } // read_dir count read limit. when count == 0 unlimit. +/// Return file names in the directory. +#[tracing::instrument(level = "debug", skip_all)] pub async fn read_dir(path: impl AsRef, count: i32) -> std::io::Result> { let mut entries = fs::read_dir(path.as_ref()).await?; @@ -197,6 +209,10 @@ pub async fn reliable_mkdir_all(path: impl AsRef, base_dir: impl AsRef, base_dir: impl AsRef) -> io::Result<()> { if !base_dir.as_ref().to_string_lossy().is_empty() && base_dir.as_ref().starts_with(dir_path.as_ref()) { return Ok(()); @@ -225,6 +241,9 @@ pub async fn os_mkdir_all(dir_path: impl AsRef, base_dir: impl AsRef Ok(()) } +/// Check if a file exists. +/// Returns true if the file exists, false otherwise. +#[tracing::instrument(level = "debug", skip_all)] pub fn file_exists(path: impl AsRef) -> bool { std::fs::metadata(path.as_ref()).map(|_| true).unwrap_or(false) } diff --git a/crates/ecstore/src/metrics_realtime.rs b/crates/ecstore/src/metrics_realtime.rs index 730a6172..a0f711e1 100644 --- a/crates/ecstore/src/metrics_realtime.rs +++ b/crates/ecstore/src/metrics_realtime.rs @@ -12,25 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; - -use chrono::Utc; -use rustfs_common::{ - globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr}, - heal_channel::DriveState, - metrics::globalMetrics, -}; -use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics}; -use rustfs_utils::os::get_drive_stats; -use serde::{Deserialize, Serialize}; -use tracing::{debug, info}; - use crate::{ admin_server_info::get_local_server_property, new_object_layer_fn, store_api::StorageAPI, // utils::os::get_drive_stats, }; +use chrono::Utc; +use rustfs_common::{ + globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr}, + heal_channel::DriveState, + metrics::global_metrics, +}; +use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics}; +use rustfs_utils::os::get_drive_stats; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use tracing::{debug, info}; #[derive(Debug, Default, Serialize, Deserialize)] pub struct CollectMetricsOpts { @@ -118,7 +116,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) if types.contains(&MetricType::SCANNER) { debug!("start get scanner metrics"); - let metrics = globalMetrics.report().await; + let metrics = global_metrics().report().await; real_time_metrics.aggregated.scanner = Some(metrics); } diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 9fe7ffe5..a39cf9e1 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -72,8 +72,7 @@ use tokio::select; use tokio::sync::RwLock; use tokio::time::sleep; use tokio_util::sync::CancellationToken; -use tracing::{debug, info}; -use tracing::{error, warn}; +use tracing::{debug, error, info, instrument, warn}; use uuid::Uuid; const MAX_UPLOADS_LIST: usize = 10000; @@ -110,7 +109,7 @@ pub struct ECStore { impl ECStore { #[allow(clippy::new_ret_no_self)] - #[tracing::instrument(level = "debug", skip(endpoint_pools))] + #[instrument(level = "debug", skip(endpoint_pools))] pub async fn new(address: SocketAddr, endpoint_pools: EndpointServerPools, ctx: CancellationToken) -> Result> { // let layouts = DisksLayout::from_volumes(endpoints.as_slice())?; @@ -275,6 +274,7 @@ impl ECStore { Ok(ec) } + #[instrument(level = "debug", skip(self, rx))] pub async fn init(self: &Arc, rx: CancellationToken) -> Result<()> { GLOBAL_BOOT_TIME.get_or_init(|| async { SystemTime::now() }).await; @@ -461,6 +461,7 @@ impl ECStore { // Ok(ress) // } + #[instrument(level = "debug", skip(self))] async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> { let mut futures = Vec::new(); for sets in self.pools.iter() { @@ -1077,7 +1078,7 @@ impl Clone for PoolObjInfo { #[async_trait::async_trait] impl ObjectIO for ECStore { - #[tracing::instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self))] async fn get_object_reader( &self, bucket: &str, @@ -1107,7 +1108,7 @@ impl ObjectIO for ECStore { .get_object_reader(bucket, object.as_str(), range, h, &opts) .await } - #[tracing::instrument(level = "debug", skip(self, data))] + #[instrument(level = "debug", skip(self, data))] async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result { check_put_object_args(bucket, object)?; @@ -1144,7 +1145,7 @@ lazy_static! { #[async_trait::async_trait] impl StorageAPI for ECStore { - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn backend_info(&self) -> rustfs_madmin::BackendInfo { let (standard_sc_parity, rr_sc_parity) = { if let Some(sc) = GLOBAL_STORAGE_CLASS.get() { @@ -1189,7 +1190,7 @@ impl StorageAPI for ECStore { ..Default::default() } } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn storage_info(&self) -> rustfs_madmin::StorageInfo { let Some(notification_sy) = get_global_notification_sys() else { return rustfs_madmin::StorageInfo::default(); @@ -1197,7 +1198,7 @@ impl StorageAPI for ECStore { notification_sy.storage_info(self).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn local_storage_info(&self) -> rustfs_madmin::StorageInfo { let mut futures = Vec::with_capacity(self.pools.len()); @@ -1217,7 +1218,7 @@ impl StorageAPI for ECStore { rustfs_madmin::StorageInfo { backend, disks } } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { if !is_meta_bucketname(bucket) { if let Err(err) = check_valid_bucket_name_strict(bucket) { @@ -1265,7 +1266,7 @@ impl StorageAPI for ECStore { Ok(()) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result { let mut info = self.peer_sys.get_bucket_info(bucket, opts).await?; @@ -1277,7 +1278,7 @@ impl StorageAPI for ECStore { Ok(info) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn list_bucket(&self, opts: &BucketOptions) -> Result> { // TODO: opts.cached @@ -1292,7 +1293,7 @@ impl StorageAPI for ECStore { } Ok(buckets) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> { if is_meta_bucketname(bucket) { return Err(StorageError::BucketNameInvalid(bucket.to_string())); @@ -1327,7 +1328,7 @@ impl StorageAPI for ECStore { // @start_after as marker when continuation_token empty // @delimiter default="/", empty when recursive // @max_keys limit - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn list_objects_v2( self: Arc, bucket: &str, @@ -1342,7 +1343,7 @@ impl StorageAPI for ECStore { .await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn list_object_versions( self: Arc, bucket: &str, @@ -1367,7 +1368,7 @@ impl StorageAPI for ECStore { self.walk_internal(rx, bucket, prefix, result, opts).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { check_object_args(bucket, object)?; @@ -1385,7 +1386,7 @@ impl StorageAPI for ECStore { } // TODO: review - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn copy_object( &self, src_bucket: &str, @@ -1452,7 +1453,7 @@ impl StorageAPI for ECStore { "put_object_reader is none".to_owned(), )) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { check_del_obj_args(bucket, object)?; @@ -1526,7 +1527,7 @@ impl StorageAPI for ECStore { Err(StorageError::ObjectNotFound(bucket.to_owned(), object.to_owned())) } // TODO: review - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn delete_objects( &self, bucket: &str, @@ -1709,7 +1710,7 @@ impl StorageAPI for ECStore { // Ok((del_objects, del_errs)) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn list_object_parts( &self, bucket: &str, @@ -1750,7 +1751,7 @@ impl StorageAPI for ECStore { Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned())) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn list_multipart_uploads( &self, bucket: &str, @@ -1802,7 +1803,7 @@ impl StorageAPI for ECStore { }) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { check_new_multipart_args(bucket, object)?; @@ -1834,7 +1835,7 @@ impl StorageAPI for ECStore { self.pools[idx].new_multipart_upload(bucket, object, opts).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn add_partial(&self, bucket: &str, object: &str, version_id: &str) -> Result<()> { let object = encode_dir_object(object); @@ -1849,7 +1850,7 @@ impl StorageAPI for ECStore { let _ = self.pools[idx].add_partial(bucket, object.as_str(), version_id).await; Ok(()) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn transition_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { let object = encode_dir_object(object); if self.single_pool() { @@ -1863,7 +1864,7 @@ impl StorageAPI for ECStore { self.pools[idx].transition_object(bucket, &object, opts).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn restore_transitioned_object(self: Arc, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { let object = encode_dir_object(object); if self.single_pool() { @@ -1880,7 +1881,7 @@ impl StorageAPI for ECStore { .await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn copy_object_part( &self, src_bucket: &str, @@ -1902,7 +1903,7 @@ impl StorageAPI for ECStore { unimplemented!() } - #[tracing::instrument(skip(self, data))] + #[instrument(skip(self, data))] async fn put_object_part( &self, bucket: &str, @@ -1944,7 +1945,7 @@ impl StorageAPI for ECStore { Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned())) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn get_multipart_info( &self, bucket: &str, @@ -1976,7 +1977,7 @@ impl StorageAPI for ECStore { Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned())) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn abort_multipart_upload(&self, bucket: &str, object: &str, upload_id: &str, opts: &ObjectOptions) -> Result<()> { check_abort_multipart_args(bucket, object, upload_id)?; @@ -2007,7 +2008,7 @@ impl StorageAPI for ECStore { Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned())) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn complete_multipart_upload( self: Arc, bucket: &str, @@ -2050,7 +2051,7 @@ impl StorageAPI for ECStore { Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned())) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn get_disks(&self, pool_idx: usize, set_idx: usize) -> Result>> { if pool_idx < self.pools.len() && set_idx < self.pools[pool_idx].disk_set.len() { self.pools[pool_idx].disk_set[set_idx].get_disks(0, 0).await @@ -2059,7 +2060,7 @@ impl StorageAPI for ECStore { } } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn set_drive_counts(&self) -> Vec { let mut counts = vec![0; self.pools.len()]; @@ -2068,7 +2069,7 @@ impl StorageAPI for ECStore { } counts } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { let object = encode_dir_object(object); if self.single_pool() { @@ -2082,7 +2083,7 @@ impl StorageAPI for ECStore { self.pools[idx].put_object_metadata(bucket, object.as_str(), &opts).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn get_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { let object = encode_dir_object(object); @@ -2095,7 +2096,7 @@ impl StorageAPI for ECStore { Ok(oi.user_tags) } - #[tracing::instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self))] async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result { let object = encode_dir_object(object); @@ -2108,7 +2109,7 @@ impl StorageAPI for ECStore { self.pools[idx].put_object_tags(bucket, object.as_str(), tags, opts).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> { check_del_obj_args(bucket, object)?; @@ -2122,7 +2123,7 @@ impl StorageAPI for ECStore { Ok(()) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn delete_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { let object = encode_dir_object(object); @@ -2135,7 +2136,7 @@ impl StorageAPI for ECStore { self.pools[idx].delete_object_tags(bucket, object.as_str(), opts).await } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)> { info!("heal_format"); let mut r = HealResultItem { @@ -2170,13 +2171,13 @@ impl StorageAPI for ECStore { Ok((r, None)) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result { let res = self.peer_sys.heal_bucket(bucket, opts).await?; Ok(res) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn heal_object( &self, bucket: &str, @@ -2253,7 +2254,7 @@ impl StorageAPI for ECStore { Ok((HealResultItem::default(), Some(Error::FileNotFound))) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn get_pool_and_set(&self, id: &str) -> Result<(Option, Option, Option)> { for (pool_idx, pool) in self.pools.iter().enumerate() { for (set_idx, set) in pool.format.erasure.sets.iter().enumerate() { @@ -2268,7 +2269,7 @@ impl StorageAPI for ECStore { Err(Error::DiskNotFound) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()> { let object = encode_dir_object(object); if self.single_pool() { @@ -2473,7 +2474,7 @@ fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Re check_multipart_object_args(bucket, object, upload_id) } -#[tracing::instrument(level = "debug")] +#[instrument(level = "debug")] fn check_put_object_args(bucket: &str, object: &str) -> Result<()> { if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() { return Err(StorageError::BucketNameInvalid(bucket.to_string())); diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index 2dd71992..499f80c0 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -134,7 +134,7 @@ pub struct GetObjectReader { } impl GetObjectReader { - #[tracing::instrument(level = "debug", skip(reader))] + #[tracing::instrument(level = "debug", skip(reader, rs, opts, _h))] pub fn new( reader: Box, rs: Option, diff --git a/crates/ecstore/src/tier/tier.rs b/crates/ecstore/src/tier/tier.rs index 00a2919e..1d11ad9e 100644 --- a/crates/ecstore/src/tier/tier.rs +++ b/crates/ecstore/src/tier/tier.rs @@ -481,7 +481,7 @@ async fn new_and_save_tiering_config(api: Arc) -> Result) -> std::result::Result { let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR, TIER_CONFIG_FILE); let data = read_config(api.clone(), config_file.as_str()).await; diff --git a/crates/filemeta/src/error.rs b/crates/filemeta/src/error.rs index a156cad6..37f9cdcf 100644 --- a/crates/filemeta/src/error.rs +++ b/crates/filemeta/src/error.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +/// FileMeta error type and Result alias. +/// This module defines a custom error type `Error` for handling various +/// error scenarios related to file metadata operations. It also provides +/// a `Result` type alias for convenience. pub type Result = core::result::Result; #[derive(thiserror::Error, Debug)] diff --git a/crates/filemeta/src/fileinfo.rs b/crates/filemeta/src/fileinfo.rs index 467a4873..e30c2a2c 100644 --- a/crates/filemeta/src/fileinfo.rs +++ b/crates/filemeta/src/fileinfo.rs @@ -12,17 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::filemeta::TRANSITION_COMPLETE; -use crate::error::{Error, Result}; -use crate::{ReplicationState, ReplicationStatusType, VersionPurgeStatusType}; +use crate::{Error, ReplicationState, ReplicationStatusType, Result, TRANSITION_COMPLETE, VersionPurgeStatusType}; use bytes::Bytes; use rmp_serde::Serializer; use rustfs_utils::HashAlgorithm; use rustfs_utils::http::headers::{RESERVED_METADATA_PREFIX_LOWER, RUSTFS_HEALING}; use s3s::dto::{RestoreStatus, Timestamp}; use s3s::header::X_AMZ_RESTORE; -use serde::Deserialize; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use uuid::Uuid; diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index 5d10d759..c63ae87f 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; -use crate::fileinfo::{ErasureAlgo, ErasureInfo, FileInfo, FileInfoVersions, ObjectPartInfo, RawFileInfo}; -use crate::filemeta_inline::InlineData; use crate::{ - ReplicationState, ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map, + ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState, + ReplicationStatusType, Result, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map, }; use byteorder::ByteOrder; use bytes::Bytes; @@ -3402,7 +3400,7 @@ mod test { ("tabs", "col1\tcol2\tcol3"), ("quotes", "\"quoted\" and 'single'"), ("backslashes", "path\\to\\file"), - ("mixed", "Mixed: Chinese,English, 123, !@#$%"), + ("mixed", "Mixed: Chinese, English, 123, !@#$%"), ]; for (key, value) in special_cases { @@ -3424,7 +3422,7 @@ mod test { ("tabs", "col1\tcol2\tcol3"), ("quotes", "\"quoted\" and 'single'"), ("backslashes", "path\\to\\file"), - ("mixed", "Mixed: Chinese,English, 123, !@#$%"), + ("mixed", "Mixed: Chinese, English, 123, !@#$%"), ] { assert_eq!(obj2.meta_user.get(key), Some(&expected_value.to_string())); } diff --git a/crates/filemeta/src/lib.rs b/crates/filemeta/src/lib.rs index dc7fa4fd..3fca95ea 100644 --- a/crates/filemeta/src/lib.rs +++ b/crates/filemeta/src/lib.rs @@ -13,7 +13,7 @@ // limitations under the License. mod error; -pub mod fileinfo; +mod fileinfo; mod filemeta; mod filemeta_inline; // pub mod headers; diff --git a/crates/filemeta/src/metacache.rs b/crates/filemeta/src/metacache.rs index c4f83e47..c07de472 100644 --- a/crates/filemeta/src/metacache.rs +++ b/crates/filemeta/src/metacache.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::{Error, Result}; -use crate::{FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, VersionType, merge_file_meta_versions}; +use crate::{Error, FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, merge_file_meta_versions}; use rmp::Marker; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; diff --git a/crates/filemeta/src/replication.rs b/crates/filemeta/src/replication.rs index e81cc60c..1f3358c8 100644 --- a/crates/filemeta/src/replication.rs +++ b/crates/filemeta/src/replication.rs @@ -1,3 +1,17 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use bytes::Bytes; use core::fmt; use regex::Regex; diff --git a/crates/filemeta/src/test_data.rs b/crates/filemeta/src/test_data.rs index cf274255..e6448c69 100644 --- a/crates/filemeta/src/test_data.rs +++ b/crates/filemeta/src/test_data.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::Result; -use crate::filemeta::*; +use crate::{ChecksumAlgo, FileMeta, FileMetaShallowVersion, FileMetaVersion, MetaDeleteMarker, MetaObject, Result, VersionType}; use std::collections::HashMap; use time::OffsetDateTime; use uuid::Uuid; @@ -257,6 +256,7 @@ pub fn create_xlmeta_with_inline_data() -> Result> { #[cfg(test)] mod tests { use super::*; + use crate::FileMeta; #[test] fn test_create_real_xlmeta() { diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 507455e7..13781e20 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -50,6 +50,7 @@ opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_ex serde = { workspace = true } smallvec = { workspace = true, features = ["serde"] } tracing = { workspace = true, features = ["std", "attributes"] } +tracing-appender = { workspace = true } tracing-error = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] } diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 9507fd4b..e145da71 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -74,6 +74,8 @@ pub struct OtelGuard { logger_provider: Option, // Add a flexi_logger handle to keep the logging alive _flexi_logger_handles: Option, + // WorkerGuard for writing tracing files + _tracing_guard: Option, } // Implement debug manually and avoid relying on all fields to implement debug @@ -84,6 +86,7 @@ impl std::fmt::Debug for OtelGuard { .field("meter_provider", &self.meter_provider.is_some()) .field("logger_provider", &self.logger_provider.is_some()) .field("_flexi_logger_handles", &self._flexi_logger_handles.is_some()) + .field("_tracing_guard", &self._tracing_guard.is_some()) .finish() } } @@ -277,13 +280,18 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { .with_thread_names(true) .with_thread_ids(true) .with_file(true) - .with_line_number(true); - - // Only add full span events tracking in the development environment - if !is_production { - layer = layer.with_span_events(FmtSpan::FULL); - } + .with_line_number(true) + .json() + .with_current_span(true) + .with_span_list(true); + let span_event = if is_production { + FmtSpan::CLOSE + } else { + // Only add full span events tracking in the development environment + FmtSpan::FULL + }; + layer = layer.with_span_events(span_event); layer.with_filter(build_env_filter(logger_level, None)) }; @@ -321,6 +329,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { meter_provider: Some(meter_provider), logger_provider: Some(logger_provider), _flexi_logger_handles: flexi_logger_handle, + _tracing_guard: None, }; } @@ -352,6 +361,47 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { } } + if endpoint.is_empty() && !is_production { + // Create a file appender (rolling by day), add the -tracing suffix to the file name to avoid conflicts + let file_appender = tracing_appender::rolling::hourly(log_directory, format!("{log_filename}-tracing.log")); + let (nb_writer, guard) = tracing_appender::non_blocking(file_appender); + + let enable_color = std::io::stdout().is_terminal(); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_timer(LocalTime::rfc_3339()) + .with_target(true) + .with_ansi(enable_color) + .with_thread_names(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true) + .with_writer(nb_writer) // Specify writing file + .json() + .with_current_span(true) + .with_span_list(true) + .with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id + + let env_filter = build_env_filter(logger_level, None); + + // Use registry() to register fmt_layer directly to ensure trace_id is output to the log + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with(fmt_layer) + .init(); + + info!("Tracing telemetry initialized for non-production with trace_id logging."); + IS_OBSERVABILITY_ENABLED.set(false).ok(); + + return OtelGuard { + tracer_provider: None, + meter_provider: None, + logger_provider: None, + _flexi_logger_handles: None, + _tracing_guard: Some(guard), + }; + } + // Build log cutting conditions let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { // Cut by time and size at the same time @@ -447,7 +497,6 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { // Environment-aware stdout configuration flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter); - // Only add stdout formatting and startup messages in non-production environments if !is_production { flexi_logger_builder = flexi_logger_builder @@ -496,6 +545,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { meter_provider: None, logger_provider: None, _flexi_logger_handles: flexi_logger_handle, + _tracing_guard: None, } } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 6c59cb94..62dceacc 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -79,7 +79,7 @@ tokio-stream.workspace = true tokio-util.workspace = true tonic = { workspace = true } tower.workspace = true -tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors", "catch-panic", "timeout", "limit"] } +tower-http = { workspace = true, features = ["trace", "compression-full", "cors", "catch-panic", "timeout", "limit", "request-id"] } # Serialization and Data Formats bytes = { workspace = true } @@ -112,6 +112,7 @@ mime_guess = { workspace = true } pin-project-lite.workspace = true rust-embed = { workspace = true, features = ["interpolate-folder-path"] } s3s.workspace = true +scopeguard.workspace = true shadow-rs = { workspace = true, features = ["build", "metadata"] } sysinfo = { workspace = true, features = ["multithread"] } thiserror = { workspace = true } diff --git a/rustfs/src/admin/console.rs b/rustfs/src/admin/console.rs index 517f110f..e467af56 100644 --- a/rustfs/src/admin/console.rs +++ b/rustfs/src/admin/console.rs @@ -14,26 +14,31 @@ use crate::config::build; use crate::license::get_license; -use axum::Json; -use axum::body::Body; -use axum::response::{IntoResponse, Response}; -use axum::{Router, extract::Request, middleware, routing::get}; +use axum::{ + Json, Router, + body::Body, + extract::Request, + middleware, + response::{IntoResponse, Response}, + routing::get, +}; use axum_extra::extract::Host; use axum_server::tls_rustls::RustlsConfig; -use http::{HeaderMap, HeaderName, StatusCode, Uri}; -use http::{HeaderValue, Method}; +use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri}; use mime_guess::from_path; use rust_embed::RustEmbed; use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use serde::Serialize; use serde_json::json; -use std::io::Result; -use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; -use std::sync::OnceLock; -use std::time::Duration; +use std::{ + io::Result, + net::{IpAddr, SocketAddr}, + sync::{Arc, OnceLock}, + time::Duration, +}; use tokio_rustls::rustls::ServerConfig; use tower_http::catch_panic::CatchPanicLayer; +use tower_http::compression::CompressionLayer; use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use tower_http::limit::RequestBodyLimitLayer; use tower_http::timeout::TimeoutLayer; @@ -274,7 +279,6 @@ async fn console_logging_middleware(req: Request, next: axum::middleware::Next) let method = req.method().clone(); let uri = req.uri().clone(); let start = std::time::Instant::now(); - let response = next.run(req).await; let duration = start.elapsed(); @@ -409,6 +413,8 @@ fn setup_console_middleware_stack( app = app .layer(CatchPanicLayer::new()) .layer(TraceLayer::new_for_http()) + // Compress responses + .layer(CompressionLayer::new()) .layer(middleware::from_fn(console_logging_middleware)) .layer(cors_layer) // Add timeout layer - convert auth_timeout from seconds to Duration diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index 8fb4f67f..c7887b0d 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -28,7 +28,7 @@ use std::net::SocketAddr; use std::path::Path; use tokio::net::lookup_host; use tokio::time::{Duration, sleep}; -use tracing::{debug, error, info, warn}; +use tracing::{Span, debug, error, info, warn}; use url::Url; #[derive(Debug, Deserialize)] @@ -121,6 +121,8 @@ pub struct NotificationTarget {} #[async_trait::async_trait] impl Operation for NotificationTarget { async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + let span = Span::current(); + let _enter = span.enter(); // 1. Analyze query parameters let (target_type, target_name) = extract_target_params(¶ms)?; @@ -274,6 +276,9 @@ impl Operation for NotificationTarget { let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); header.insert(CONTENT_LENGTH, "0".parse().unwrap()); + if let Some(v) = req.headers.get("x-request-id") { + header.insert("x-request-id", v.clone()); + } Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) } } @@ -283,6 +288,8 @@ pub struct ListNotificationTargets {} #[async_trait::async_trait] impl Operation for ListNotificationTargets { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + let span = Span::current(); + let _enter = span.enter(); debug!("ListNotificationTargets call start request params: {:?}", req.uri.query()); // 1. Permission verification @@ -320,6 +327,9 @@ impl Operation for ListNotificationTargets { debug!("ListNotificationTargets call end, response data length: {}", data.len(),); let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + if let Some(v) = req.headers.get("x-request-id") { + header.insert("x-request-id", v.clone()); + } Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) } } @@ -329,6 +339,8 @@ pub struct ListTargetsArns {} #[async_trait::async_trait] impl Operation for ListTargetsArns { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + let span = Span::current(); + let _enter = span.enter(); debug!("ListTargetsArns call start request params: {:?}", req.uri.query()); // 1. Permission verification @@ -364,6 +376,9 @@ impl Operation for ListTargetsArns { debug!("ListTargetsArns call end, response data length: {}", data.len(),); let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + if let Some(v) = req.headers.get("x-request-id") { + header.insert("x-request-id", v.clone()); + } Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) } } @@ -373,6 +388,8 @@ pub struct RemoveNotificationTarget {} #[async_trait::async_trait] impl Operation for RemoveNotificationTarget { async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + let span = Span::current(); + let _enter = span.enter(); // 1. Analyze query parameters let (target_type, target_name) = extract_target_params(¶ms)?; @@ -398,6 +415,9 @@ impl Operation for RemoveNotificationTarget { let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); header.insert(CONTENT_LENGTH, "0".parse().unwrap()); + if let Some(v) = req.headers.get("x-request-id") { + header.insert("x-request-id", v.clone()); + } Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 1e75f2bd..bc19148e 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -184,7 +184,7 @@ async fn run(opt: config::Opt) -> Result<()> { ); if eps.drives_per_set > 1 { - warn!("WARNING: Host local has more than 0 drives of set. A host failure will result in data becoming unavailable."); + warn!(target: "rustfs::main::run","WARNING: Host local has more than 0 drives of set. A host failure will result in data becoming unavailable."); } } diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index da1d4ad2..cb6c450b 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -43,7 +43,9 @@ use tokio_rustls::TlsAcceptor; use tonic::{Request, Status, metadata::MetadataValue}; use tower::ServiceBuilder; use tower_http::catch_panic::CatchPanicLayer; +use tower_http::compression::CompressionLayer; use tower_http::cors::{AllowOrigin, Any, CorsLayer}; +use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}; use tower_http::trace::TraceLayer; use tracing::{Span, debug, error, info, instrument, warn}; @@ -448,11 +450,18 @@ fn process_connection( let service = hybrid(s3_service, rpc_service); let hybrid_service = ServiceBuilder::new() + .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid)) .layer(CatchPanicLayer::new()) .layer( TraceLayer::new_for_http() .make_span_with(|request: &HttpRequest<_>| { + let trace_id = request + .headers() + .get(http::header::HeaderName::from_static("x-request-id")) + .and_then(|v| v.to_str().ok()) + .unwrap_or("unknown"); let span = tracing::info_span!("http-request", + trace_id = %trace_id, status_code = tracing::field::Empty, method = %request.method(), uri = %request.uri(), @@ -466,7 +475,8 @@ fn process_connection( span }) - .on_request(|request: &HttpRequest<_>, _span: &Span| { + .on_request(|request: &HttpRequest<_>, span: &Span| { + let _enter = span.enter(); debug!("http started method: {}, url path: {}", request.method(), request.uri().path()); let labels = [ ("key_request_method", format!("{}", request.method())), @@ -474,23 +484,31 @@ fn process_connection( ]; counter!("rustfs_api_requests_total", &labels).increment(1); }) - .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { - _span.record("http response status_code", tracing::field::display(response.status())); + .on_response(|response: &Response<_>, latency: Duration, span: &Span| { + span.record("status_code", tracing::field::display(response.status())); + let _enter = span.enter(); + histogram!("request.latency.ms").record(latency.as_millis() as f64); debug!("http response generated in {:?}", latency) }) - .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { + .on_body_chunk(|chunk: &Bytes, latency: Duration, span: &Span| { + let _enter = span.enter(); histogram!("request.body.len").record(chunk.len() as f64); debug!("http body sending {} bytes in {:?}", chunk.len(), latency); }) - .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { + .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span| { + let _enter = span.enter(); debug!("http stream closed after {:?}", stream_duration) }) - .on_failure(|_error, latency: Duration, _span: &Span| { + .on_failure(|_error, latency: Duration, span: &Span| { + let _enter = span.enter(); counter!("rustfs_api_requests_failure_total").increment(1); debug!("http request failure error: {:?} in {:?}", _error, latency) }), ) + .layer(PropagateRequestIdLayer::x_request_id()) .layer(cors_layer) + // Compress responses + .layer(CompressionLayer::new()) .option_layer(if is_console { Some(RedirectLayer) } else { None }) .service(service); diff --git a/rustfs/src/server/layer.rs b/rustfs/src/server/layer.rs index f77f1252..f324d06b 100644 --- a/rustfs/src/server/layer.rs +++ b/rustfs/src/server/layer.rs @@ -49,9 +49,9 @@ where { type Response = Response>; type Error = Box; - type Future = Pin> + Send>>; + type Future = Pin> + Send>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 91a10845..cfaf7441 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -77,7 +77,7 @@ use rustfs_ecstore::{ }, }; use rustfs_filemeta::REPLICATE_INCOMING_DELETE; -use rustfs_filemeta::fileinfo::{ObjectPartInfo, RestoreStatusOps}; +use rustfs_filemeta::{ObjectPartInfo, RestoreStatusOps}; use rustfs_filemeta::{ReplicationStatusType, ReplicationType, VersionPurgeStatusType}; use rustfs_kms::{ DataKey, diff --git a/scripts/run.sh b/scripts/run.sh index 411950d2..ae327d0d 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -58,7 +58,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9001" #export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name #export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version export RUSTFS_OBS_ENVIRONMENT=develop # Environment name -export RUSTFS_OBS_LOGGER_LEVEL=debug # Log level, supports trace, debug, info, warn, error +export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"