Compare commits

..

1 Commits

Author SHA1 Message Date
weisd
0b80074270 todo 2025-11-04 17:12:58 +08:00
72 changed files with 689 additions and 2234 deletions

3
.gitignore vendored
View File

@@ -22,5 +22,4 @@ profile.json
.secrets
*.go
*.pb
*.svg
deploy/logs/*.log.*
*.svg

View File

@@ -8,14 +8,12 @@ The workspace root hosts shared dependencies in `Cargo.toml`. The service binary
## Build, Test, and Development Commands
Run `cargo check --all-targets` for fast validation. Build release binaries via `cargo build --release` or the pipeline-aligned `make build`. Use `./build-rustfs.sh --dev` for iterative development and `./build-rustfs.sh --platform <target>` for cross-compiles. Prefer `make pre-commit` before pushing to cover formatting, clippy, checks, and tests.
Always ensure `cargo fmt --all --check`, `cargo test --workspace --exclude e2e_test`, and `cargo clippy --all-targets --all-features -- -D warnings` complete successfully after each code change to keep the tree healthy and warning-free.
## Coding Style & Naming Conventions
Formatting follows the repo `rustfmt.toml` (130-column width). Use `snake_case` for items, `PascalCase` for types, and `SCREAMING_SNAKE_CASE` for constants. Avoid `unwrap()` or `expect()` outside tests; bubble errors with `Result` and crate-specific `thiserror` types. Keep async code non-blocking and offload CPU-heavy work with `tokio::task::spawn_blocking` when necessary.
## Testing Guidelines
Co-locate unit tests with their modules and give behavior-led names such as `handles_expired_token`. Integration suites belong in each crates `tests/` directory, while exhaustive end-to-end scenarios live in `crates/e2e_test/`. Run `cargo test --workspace --exclude e2e_test` during iteration, `cargo nextest run --all --exclude e2e_test` when available, and finish with `cargo test --all` before requesting review. Use `NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY=` for KMS e2e tests.
When fixing bugs or adding features, include regression tests that capture the new behavior so future changes cannot silently break it.
## Commit & Pull Request Guidelines
Work on feature branches (e.g., `feat/...`) after syncing `main`. Follow Conventional Commits under 72 characters (e.g., `feat: add kms key rotation`). Each commit must compile, format cleanly, and pass `make pre-commit`. Open PRs with a concise summary, note verification commands, link relevant issues, and wait for reviewer approval.

125
Cargo.lock generated
View File

@@ -474,7 +474,6 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c"
dependencies = [
"brotli 7.0.0",
"bzip2 0.5.2",
"flate2",
"futures-core",
@@ -877,7 +876,7 @@ dependencies = [
"hyper-util",
"pin-project-lite",
"rustls 0.21.12",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-native-certs 0.8.2",
"rustls-pki-types",
"tokio",
@@ -1058,9 +1057,9 @@ dependencies = [
[[package]]
name = "axum-extra"
version = "0.12.1"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5136e6c5e7e7978fe23e9876fb924af2c0f84c72127ac6ac17e7c46f457d362c"
checksum = "460c45604cb25834835e3b4d3468510322852783dac36261d642424d75562ff3"
dependencies = [
"axum",
"axum-core",
@@ -1091,7 +1090,7 @@ dependencies = [
"hyper 1.7.0",
"hyper-util",
"pin-project-lite",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"tokio",
@@ -1276,17 +1275,6 @@ dependencies = [
"syn 2.0.108",
]
[[package]]
name = "brotli"
version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor 4.0.3",
]
[[package]]
name = "brotli"
version = "8.0.2"
@@ -1295,17 +1283,7 @@ checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor 5.0.0",
]
[[package]]
name = "brotli-decompressor"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
@@ -1455,9 +1433,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.2.44"
version = "1.2.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3"
checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -3606,7 +3584,7 @@ dependencies = [
"http 1.3.1",
"reqwest",
"rustc_version",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-pemfile 2.2.0",
"serde",
"serde_json",
@@ -4177,7 +4155,7 @@ dependencies = [
"hyper 1.7.0",
"hyper-util",
"log",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-native-certs 0.8.2",
"rustls-pki-types",
"tokio",
@@ -4472,9 +4450,9 @@ dependencies = [
[[package]]
name = "iri-string"
version = "0.7.9"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397"
checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2"
dependencies = [
"memchr",
"serde",
@@ -5219,10 +5197,11 @@ dependencies = [
[[package]]
name = "num-bigint-dig"
version = "0.8.5"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82c79c15c05d4bf82b6f5ef163104cc81a760d8e874d38ac50ab67c8877b647b"
checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
dependencies = [
"byteorder",
"lazy_static",
"libm",
"num-integer",
@@ -5680,7 +5659,7 @@ dependencies = [
"arrow-schema",
"arrow-select",
"base64 0.22.1",
"brotli 8.0.2",
"brotli",
"bytes",
"chrono",
"flate2",
@@ -6003,9 +5982,9 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppmd-rust"
version = "1.3.0"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d558c559f0450f16f2a27a1f017ef38468c1090c9ce63c8e51366232d53717b4"
checksum = "c834641d8ad1b348c9ee86dec3b9840d805acd5f24daa5f90c788951a52ff59b"
[[package]]
name = "pprof"
@@ -6293,7 +6272,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls 0.23.35",
"rustls 0.23.34",
"socket2 0.6.1",
"thiserror 2.0.17",
"tokio",
@@ -6313,7 +6292,7 @@ dependencies = [
"rand 0.9.2",
"ring",
"rustc-hash",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-pki-types",
"slab",
"thiserror 2.0.17",
@@ -6569,7 +6548,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-pki-types",
"serde",
"serde_json",
@@ -6635,9 +6614,9 @@ dependencies = [
[[package]]
name = "rmcp"
version = "0.8.4"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2c66318b30535ccd0d3b39afaa4240d6b5a35328fb7672a28e3386c97472805"
checksum = "1fdad1258f7259fdc0f2dfc266939c82c3b5d1fd72bcde274d600cdc27e60243"
dependencies = [
"base64 0.22.1",
"chrono",
@@ -6645,7 +6624,7 @@ dependencies = [
"paste",
"pin-project-lite",
"rmcp-macros",
"schemars 1.0.5",
"schemars 1.0.4",
"serde",
"serde_json",
"thiserror 2.0.17",
@@ -6656,9 +6635,9 @@ dependencies = [
[[package]]
name = "rmcp-macros"
version = "0.8.4"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a19193e0d69bb1c96324b1b4daec078d4c8e76d734e53404c107437858a4d2"
checksum = "ede0589a208cc7ce81d1be68aa7e74b917fcd03c81528408bab0457e187dcd9b"
dependencies = [
"darling 0.21.3",
"proc-macro2",
@@ -6845,9 +6824,8 @@ dependencies = [
"rustfs-targets",
"rustfs-utils",
"rustfs-zip",
"rustls 0.23.35",
"rustls 0.23.34",
"s3s",
"scopeguard",
"serde",
"serde_json",
"serde_urlencoded",
@@ -6951,6 +6929,7 @@ version = "0.0.5"
dependencies = [
"async-trait",
"chrono",
"lazy_static",
"path-clean",
"rmp-serde",
"rustfs-filemeta",
@@ -7044,7 +7023,7 @@ dependencies = [
"rustfs-signer",
"rustfs-utils",
"rustfs-workers",
"rustls 0.23.35",
"rustls 0.23.34",
"s3s",
"serde",
"serde_json",
@@ -7188,7 +7167,7 @@ dependencies = [
"clap",
"mime_guess",
"rmcp",
"schemars 1.0.5",
"schemars 1.0.4",
"serde",
"serde_json",
"tokio",
@@ -7249,7 +7228,6 @@ dependencies = [
"thiserror 2.0.17",
"tokio",
"tracing",
"tracing-appender",
"tracing-error",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -7403,7 +7381,7 @@ version = "0.0.5"
dependencies = [
"base64-simd",
"blake3",
"brotli 8.0.2",
"brotli",
"bytes",
"convert_case",
"crc32fast",
@@ -7424,7 +7402,7 @@ dependencies = [
"rand 0.9.2",
"regex",
"rustfs-config",
"rustls 0.23.35",
"rustls 0.23.34",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"s3s",
@@ -7536,9 +7514,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.35"
version = "0.23.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7"
dependencies = [
"aws-lc-rs",
"log",
@@ -7740,9 +7718,9 @@ dependencies = [
[[package]]
name = "schemars"
version = "1.0.5"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1317c3bf3e7df961da95b0a56a172a02abead31276215a0497241a7624b487ce"
checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
dependencies = [
"chrono",
"dyn-clone",
@@ -7754,9 +7732,9 @@ dependencies = [
[[package]]
name = "schemars_derive"
version = "1.0.5"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f760a6150d45dd66ec044983c124595ae76912e77ed0b44124cb3e415cce5d9"
checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80"
dependencies = [
"proc-macro2",
"quote",
@@ -7995,7 +7973,7 @@ dependencies = [
"indexmap 1.9.3",
"indexmap 2.12.0",
"schemars 0.9.0",
"schemars 1.0.5",
"schemars 1.0.4",
"serde_core",
"serde_json",
"serde_with_macros",
@@ -8913,7 +8891,7 @@ version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61"
dependencies = [
"rustls 0.23.35",
"rustls 0.23.34",
"tokio",
]
@@ -9108,7 +9086,6 @@ dependencies = [
"tower-layer",
"tower-service",
"tracing",
"uuid",
]
[[package]]
@@ -9135,18 +9112,6 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror 1.0.69",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.30"
@@ -9307,9 +9272,9 @@ checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-ident"
version = "1.0.22"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
checksum = "462eeb75aeb73aea900253ce739c8e18a67423fadf006037cd3ff27e82748a06"
[[package]]
name = "unicode-segmentation"
@@ -9601,9 +9566,9 @@ dependencies = [
[[package]]
name = "webpki-roots"
version = "1.0.4"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e"
checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8"
dependencies = [
"rustls-pki-types",
]
@@ -10206,9 +10171,9 @@ checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2"
[[package]]
name = "zopfli"
version = "0.8.3"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f05cd8797d63865425ff89b5c4a48804f35ba0ce8d125800027ad6017d2b5249"
checksum = "edfc5ee405f504cd4984ecc6f14d02d55cfda60fa4b689434ef4102aae150cd7"
dependencies = [
"bumpalo",
"crc32fast",

View File

@@ -98,7 +98,7 @@ async-compression = { version = "0.4.19" }
async-recursion = "1.1.1"
async-trait = "0.1.89"
axum = "0.8.6"
axum-extra = "0.12.1"
axum-extra = "0.12.0"
axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"], default-features = false }
futures = "0.3.31"
futures-core = "0.3.31"
@@ -129,13 +129,13 @@ flatbuffers = "25.9.23"
form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.3"
rmcp = { version = "0.8.4" }
rmcp = { version = "0.8.3" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.0.5"
schemars = "1.0.4"
# Cryptography and Security
aes-gcm = { version = "0.10.3", features = ["std"] }
@@ -150,7 +150,7 @@ hmac = "0.12.1"
jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] }
pbkdf2 = "0.12.2"
rsa = { version = "0.9.8" }
rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls = { version = "0.23.34", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pemfile = "2.2.0"
rustls-pki-types = "1.13.0"
sha1 = "0.10.6"
@@ -190,7 +190,6 @@ glob = "0.3.3"
google-cloud-storage = "1.2.0"
google-cloud-auth = "1.1.0"
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
heed = { version = "0.22.0" }
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
ipnetwork = { version = "0.21.1", features = ["serde"] }
@@ -226,7 +225,6 @@ rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.9.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.12.0-rc.3", features = ["minio"] }
scopeguard = "1.2.0"
serial_test = "3.2.0"
shadow-rs = { version = "1.4.0", default-features = false }
siphasher = "1.0.1"
@@ -243,7 +241,6 @@ tempfile = "3.23.0"
test-case = "3.3.1"
thiserror = "2.0.17"
tracing = { version = "0.1.41" }
tracing-appender = "0.2.3"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
@@ -280,7 +277,7 @@ mimalloc = "0.1"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rustfs-mcp", "tokio-test", "scopeguard"]
ignored = ["rustfs", "rustfs-mcp", "tokio-test"]
[profile.release]
opt-level = 3

View File

@@ -40,4 +40,4 @@ serde_json = { workspace = true }
serial_test = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }
heed = { workspace = true }
heed = "0.22.0"

View File

@@ -14,10 +14,6 @@
use thiserror::Error;
/// Custom error type for AHM operations
/// This enum defines various error variants that can occur during
/// the execution of AHM-related tasks, such as I/O errors, storage errors,
/// configuration errors, and specific errors related to healing operations.
#[derive(Debug, Error)]
pub enum Error {
#[error("I/O error: {0}")]
@@ -89,13 +85,9 @@ pub enum Error {
ProgressTrackingFailed { message: String },
}
/// A specialized Result type for AHM operations
///This type is a convenient alias for results returned by functions in the AHM crate,
/// using the custom Error type defined above.
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl Error {
/// Create an Other error from any error type
pub fn other<E>(error: E) -> Self
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,

View File

@@ -12,19 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::heal::{
manager::HealManager,
task::{HealOptions, HealPriority, HealRequest, HealType},
utils,
};
use crate::{Error, Result};
use rustfs_common::heal_channel::{
HealChannelCommand, HealChannelPriority, HealChannelReceiver, HealChannelRequest, HealChannelResponse, HealScanMode,
publish_heal_response,
};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, error, info};
use tracing::{error, info};
/// Heal channel processor
pub struct HealChannelProcessor {
@@ -61,7 +60,7 @@ impl HealChannelProcessor {
}
}
None => {
debug!("Heal channel receiver closed, stopping processor");
info!("Heal channel receiver closed, stopping processor");
break;
}
}
@@ -100,6 +99,7 @@ impl HealChannelProcessor {
Ok(task_id) => {
info!("Successfully submitted heal request: {} as task: {}", request.id, task_id);
// Send success response
let response = HealChannelResponse {
request_id: request.id,
success: true,
@@ -107,7 +107,9 @@ impl HealChannelProcessor {
error: None,
};
self.publish_response(response);
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send heal response: {}", e);
}
}
Err(e) => {
error!("Failed to submit heal request: {} - {}", request.id, e);
@@ -120,7 +122,9 @@ impl HealChannelProcessor {
error: Some(e.to_string()),
};
self.publish_response(response);
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send heal error response: {}", e);
}
}
}
@@ -140,7 +144,9 @@ impl HealChannelProcessor {
error: None,
};
self.publish_response(response);
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send query response: {}", e);
}
Ok(())
}
@@ -158,7 +164,9 @@ impl HealChannelProcessor {
error: None,
};
self.publish_response(response);
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send cancel response: {}", e);
}
Ok(())
}
@@ -166,12 +174,9 @@ impl HealChannelProcessor {
/// Convert channel request to heal request
fn convert_to_heal_request(&self, request: HealChannelRequest) -> Result<HealRequest> {
let heal_type = if let Some(disk_id) = &request.disk {
let set_disk_id = utils::normalize_set_disk_id(disk_id).ok_or_else(|| Error::InvalidHealType {
heal_type: format!("erasure-set({disk_id})"),
})?;
HealType::ErasureSet {
buckets: vec![],
set_disk_id,
set_disk_id: disk_id.clone(),
}
} else if let Some(prefix) = &request.object_prefix {
if !prefix.is_empty() {
@@ -221,332 +226,8 @@ impl HealChannelProcessor {
Ok(HealRequest::new(heal_type, options, priority))
}
fn publish_response(&self, response: HealChannelResponse) {
// Try to send to local channel first, but don't block broadcast on failure
if let Err(e) = self.response_sender.send(response.clone()) {
error!("Failed to enqueue heal response locally: {}", e);
}
// Always attempt to broadcast, even if local send failed
// Use the original response for broadcast; local send uses a clone
if let Err(e) = publish_heal_response(response) {
error!("Failed to broadcast heal response: {}", e);
}
}
/// Get response sender for external use
pub fn get_response_sender(&self) -> mpsc::UnboundedSender<HealChannelResponse> {
self.response_sender.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::heal::storage::HealStorageAPI;
use rustfs_common::heal_channel::{HealChannelPriority, HealChannelRequest, HealScanMode};
use std::sync::Arc;
// Mock storage for testing
struct MockStorage;
#[async_trait::async_trait]
impl HealStorageAPI for MockStorage {
async fn get_object_meta(
&self,
_bucket: &str,
_object: &str,
) -> crate::Result<Option<rustfs_ecstore::store_api::ObjectInfo>> {
Ok(None)
}
async fn get_object_data(&self, _bucket: &str, _object: &str) -> crate::Result<Option<Vec<u8>>> {
Ok(None)
}
async fn put_object_data(&self, _bucket: &str, _object: &str, _data: &[u8]) -> crate::Result<()> {
Ok(())
}
async fn delete_object(&self, _bucket: &str, _object: &str) -> crate::Result<()> {
Ok(())
}
async fn verify_object_integrity(&self, _bucket: &str, _object: &str) -> crate::Result<bool> {
Ok(true)
}
async fn ec_decode_rebuild(&self, _bucket: &str, _object: &str) -> crate::Result<Vec<u8>> {
Ok(vec![])
}
async fn get_disk_status(
&self,
_endpoint: &rustfs_ecstore::disk::endpoint::Endpoint,
) -> crate::Result<crate::heal::storage::DiskStatus> {
Ok(crate::heal::storage::DiskStatus::Ok)
}
async fn format_disk(&self, _endpoint: &rustfs_ecstore::disk::endpoint::Endpoint) -> crate::Result<()> {
Ok(())
}
async fn get_bucket_info(&self, _bucket: &str) -> crate::Result<Option<rustfs_ecstore::store_api::BucketInfo>> {
Ok(None)
}
async fn heal_bucket_metadata(&self, _bucket: &str) -> crate::Result<()> {
Ok(())
}
async fn list_buckets(&self) -> crate::Result<Vec<rustfs_ecstore::store_api::BucketInfo>> {
Ok(vec![])
}
async fn object_exists(&self, _bucket: &str, _object: &str) -> crate::Result<bool> {
Ok(false)
}
async fn get_object_size(&self, _bucket: &str, _object: &str) -> crate::Result<Option<u64>> {
Ok(None)
}
async fn get_object_checksum(&self, _bucket: &str, _object: &str) -> crate::Result<Option<String>> {
Ok(None)
}
async fn heal_object(
&self,
_bucket: &str,
_object: &str,
_version_id: Option<&str>,
_opts: &rustfs_common::heal_channel::HealOpts,
) -> crate::Result<(rustfs_madmin::heal_commands::HealResultItem, Option<crate::Error>)> {
Ok((rustfs_madmin::heal_commands::HealResultItem::default(), None))
}
async fn heal_bucket(
&self,
_bucket: &str,
_opts: &rustfs_common::heal_channel::HealOpts,
) -> crate::Result<rustfs_madmin::heal_commands::HealResultItem> {
Ok(rustfs_madmin::heal_commands::HealResultItem::default())
}
async fn heal_format(
&self,
_dry_run: bool,
) -> crate::Result<(rustfs_madmin::heal_commands::HealResultItem, Option<crate::Error>)> {
Ok((rustfs_madmin::heal_commands::HealResultItem::default(), None))
}
async fn list_objects_for_heal(&self, _bucket: &str, _prefix: &str) -> crate::Result<Vec<String>> {
Ok(vec![])
}
async fn get_disk_for_resume(&self, _set_disk_id: &str) -> crate::Result<rustfs_ecstore::disk::DiskStore> {
Err(crate::Error::other("Not implemented in mock"))
}
}
fn create_test_heal_manager() -> Arc<HealManager> {
let storage: Arc<dyn HealStorageAPI> = Arc::new(MockStorage);
Arc::new(HealManager::new(storage, None))
}
#[test]
fn test_heal_channel_processor_new() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
// Verify processor is created successfully
let _sender = processor.get_response_sender();
// If we can get the sender, processor was created correctly
}
#[tokio::test]
async fn test_convert_to_heal_request_bucket() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
disk: None,
priority: HealChannelPriority::Normal,
scan_mode: None,
remove_corrupted: None,
recreate_missing: None,
update_parity: None,
recursive: None,
dry_run: None,
timeout_seconds: None,
pool_index: None,
set_index: None,
force_start: false,
};
let heal_request = processor.convert_to_heal_request(channel_request).unwrap();
assert!(matches!(heal_request.heal_type, HealType::Bucket { .. }));
assert_eq!(heal_request.priority, HealPriority::Normal);
}
#[tokio::test]
async fn test_convert_to_heal_request_object() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: Some("test-object".to_string()),
disk: None,
priority: HealChannelPriority::High,
scan_mode: Some(HealScanMode::Deep),
remove_corrupted: Some(true),
recreate_missing: Some(true),
update_parity: Some(true),
recursive: Some(false),
dry_run: Some(false),
timeout_seconds: Some(300),
pool_index: Some(0),
set_index: Some(1),
force_start: false,
};
let heal_request = processor.convert_to_heal_request(channel_request).unwrap();
assert!(matches!(heal_request.heal_type, HealType::Object { .. }));
assert_eq!(heal_request.priority, HealPriority::High);
assert_eq!(heal_request.options.scan_mode, HealScanMode::Deep);
assert!(heal_request.options.remove_corrupted);
assert!(heal_request.options.recreate_missing);
}
#[tokio::test]
async fn test_convert_to_heal_request_erasure_set() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
disk: Some("pool_0_set_1".to_string()),
priority: HealChannelPriority::Critical,
scan_mode: None,
remove_corrupted: None,
recreate_missing: None,
update_parity: None,
recursive: None,
dry_run: None,
timeout_seconds: None,
pool_index: None,
set_index: None,
force_start: false,
};
let heal_request = processor.convert_to_heal_request(channel_request).unwrap();
assert!(matches!(heal_request.heal_type, HealType::ErasureSet { .. }));
assert_eq!(heal_request.priority, HealPriority::Urgent);
}
#[tokio::test]
async fn test_convert_to_heal_request_invalid_disk_id() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
disk: Some("invalid-disk-id".to_string()),
priority: HealChannelPriority::Normal,
scan_mode: None,
remove_corrupted: None,
recreate_missing: None,
update_parity: None,
recursive: None,
dry_run: None,
timeout_seconds: None,
pool_index: None,
set_index: None,
force_start: false,
};
let result = processor.convert_to_heal_request(channel_request);
assert!(result.is_err());
}
#[tokio::test]
async fn test_convert_to_heal_request_priority_mapping() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let priorities = vec![
(HealChannelPriority::Low, HealPriority::Low),
(HealChannelPriority::Normal, HealPriority::Normal),
(HealChannelPriority::High, HealPriority::High),
(HealChannelPriority::Critical, HealPriority::Urgent),
];
for (channel_priority, expected_heal_priority) in priorities {
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
disk: None,
priority: channel_priority,
scan_mode: None,
remove_corrupted: None,
recreate_missing: None,
update_parity: None,
recursive: None,
dry_run: None,
timeout_seconds: None,
pool_index: None,
set_index: None,
force_start: false,
};
let heal_request = processor.convert_to_heal_request(channel_request).unwrap();
assert_eq!(heal_request.priority, expected_heal_priority);
}
}
#[tokio::test]
async fn test_convert_to_heal_request_force_start() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
disk: None,
priority: HealChannelPriority::Normal,
scan_mode: None,
remove_corrupted: Some(false),
recreate_missing: Some(false),
update_parity: Some(false),
recursive: None,
dry_run: None,
timeout_seconds: None,
pool_index: None,
set_index: None,
force_start: true, // Should override the above false values
};
let heal_request = processor.convert_to_heal_request(channel_request).unwrap();
assert!(heal_request.options.remove_corrupted);
assert!(heal_request.options.recreate_missing);
assert!(heal_request.options.update_parity);
}
#[tokio::test]
async fn test_convert_to_heal_request_empty_object_prefix() {
let heal_manager = create_test_heal_manager();
let processor = HealChannelProcessor::new(heal_manager);
let channel_request = HealChannelRequest {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: Some("".to_string()), // Empty prefix should be treated as bucket heal
disk: None,
priority: HealChannelPriority::Normal,
scan_mode: None,
remove_corrupted: None,
recreate_missing: None,
update_parity: None,
recursive: None,
dry_run: None,
timeout_seconds: None,
pool_index: None,
set_index: None,
force_start: false,
};
let heal_request = processor.convert_to_heal_request(channel_request).unwrap();
assert!(matches!(heal_request.heal_type, HealType::Bucket { .. }));
}
}

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use crate::heal::{
progress::HealProgress,
resume::{CheckpointManager, ResumeManager, ResumeUtils},
storage::HealStorageAPI,
};
use crate::{Error, Result};
use futures::future::join_all;
use rustfs_common::heal_channel::{HealOpts, HealScanMode};
use rustfs_ecstore::disk::DiskStore;
@@ -56,7 +56,7 @@ impl ErasureSetHealer {
let task_id = self.get_or_create_task_id(set_disk_id).await?;
// 2. initialize or resume resume state
let (resume_manager, checkpoint_manager) = self.initialize_resume_state(&task_id, set_disk_id, buckets).await?;
let (resume_manager, checkpoint_manager) = self.initialize_resume_state(&task_id, buckets).await?;
// 3. execute heal with resume
let result = self
@@ -77,38 +77,25 @@ impl ErasureSetHealer {
}
/// get or create task id
async fn get_or_create_task_id(&self, set_disk_id: &str) -> Result<String> {
async fn get_or_create_task_id(&self, _set_disk_id: &str) -> Result<String> {
// check if there are resumable tasks
let resumable_tasks = ResumeUtils::get_resumable_tasks(&self.disk).await?;
for task_id in resumable_tasks {
match ResumeManager::load_from_disk(self.disk.clone(), &task_id).await {
Ok(manager) => {
let state = manager.get_state().await;
if state.set_disk_id == set_disk_id && ResumeUtils::can_resume_task(&self.disk, &task_id).await {
info!("Found resumable task: {} for set {}", task_id, set_disk_id);
return Ok(task_id);
}
}
Err(e) => {
warn!("Failed to load resume state for task {}: {}", task_id, e);
}
if ResumeUtils::can_resume_task(&self.disk, &task_id).await {
info!("Found resumable task: {}", task_id);
return Ok(task_id);
}
}
// create new task id
let task_id = format!("{}_{}", set_disk_id, ResumeUtils::generate_task_id());
let task_id = ResumeUtils::generate_task_id();
info!("Created new heal task: {}", task_id);
Ok(task_id)
}
/// initialize or resume resume state
async fn initialize_resume_state(
&self,
task_id: &str,
set_disk_id: &str,
buckets: &[String],
) -> Result<(ResumeManager, CheckpointManager)> {
async fn initialize_resume_state(&self, task_id: &str, buckets: &[String]) -> Result<(ResumeManager, CheckpointManager)> {
// check if resume state exists
if ResumeManager::has_resume_state(&self.disk, task_id).await {
info!("Loading existing resume state for task: {}", task_id);
@@ -124,14 +111,8 @@ impl ErasureSetHealer {
} else {
info!("Creating new resume state for task: {}", task_id);
let resume_manager = ResumeManager::new(
self.disk.clone(),
task_id.to_string(),
"erasure_set".to_string(),
set_disk_id.to_string(),
buckets.to_vec(),
)
.await?;
let resume_manager =
ResumeManager::new(self.disk.clone(), task_id.to_string(), "erasure_set".to_string(), buckets.to_vec()).await?;
let checkpoint_manager = CheckpointManager::new(self.disk.clone(), task_id.to_string()).await?;
@@ -181,7 +162,6 @@ impl ErasureSetHealer {
let bucket_result = self
.heal_bucket_with_resume(
bucket,
bucket_idx,
&mut current_object_index,
&mut processed_objects,
&mut successful_objects,
@@ -202,7 +182,7 @@ impl ErasureSetHealer {
// check cancel status
if self.cancel_token.is_cancelled() {
warn!("Heal task cancelled");
info!("Heal task cancelled");
return Err(Error::TaskCancelled);
}
@@ -234,7 +214,6 @@ impl ErasureSetHealer {
async fn heal_bucket_with_resume(
&self,
bucket: &str,
bucket_index: usize,
current_object_index: &mut usize,
processed_objects: &mut u64,
successful_objects: &mut u64,
@@ -243,7 +222,7 @@ impl ErasureSetHealer {
resume_manager: &ResumeManager,
checkpoint_manager: &CheckpointManager,
) -> Result<()> {
info!(target: "rustfs:ahm:heal_bucket_with_resume" ,"Starting heal for bucket: {} from object index {}", bucket, current_object_index);
info!("Starting heal for bucket: {} from object index {}", bucket, current_object_index);
// 1. get bucket info
let _bucket_info = match self.storage.get_bucket_info(bucket).await? {
@@ -281,7 +260,7 @@ impl ErasureSetHealer {
if !object_exists {
info!(
target: "rustfs:ahm:heal_bucket_with_resume" ,"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)",
"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)",
bucket, object
);
checkpoint_manager.add_processed_object(object.clone()).await?;
@@ -327,9 +306,7 @@ impl ErasureSetHealer {
// save checkpoint periodically
if obj_idx % 100 == 0 {
checkpoint_manager
.update_position(bucket_index, *current_object_index)
.await?;
checkpoint_manager.update_position(0, *current_object_index).await?;
}
}
@@ -360,10 +337,7 @@ impl ErasureSetHealer {
let cancel_token = self.cancel_token.clone();
async move {
let _permit = semaphore
.acquire()
.await
.map_err(|e| Error::other(format!("Failed to acquire semaphore for bucket heal: {}", e)))?;
let _permit = semaphore.acquire().await.unwrap();
if cancel_token.is_cancelled() {
return Err(Error::TaskCancelled);
@@ -458,10 +432,7 @@ impl ErasureSetHealer {
let semaphore = semaphore.clone();
async move {
let _permit = semaphore
.acquire()
.await
.map_err(|e| Error::other(format!("Failed to acquire semaphore for object heal: {}", e)))?;
let _permit = semaphore.acquire().await.unwrap();
match storage.heal_object(&bucket, &object, None, &heal_opts).await {
Ok((_result, None)) => {

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::heal::{HealOptions, HealPriority, HealRequest, HealType};
use crate::{Error, Result};
use crate::heal::task::{HealOptions, HealPriority, HealRequest, HealType};
use rustfs_ecstore::disk::endpoint::Endpoint;
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
@@ -105,7 +104,7 @@ pub enum HealEvent {
impl HealEvent {
/// Convert HealEvent to HealRequest
pub fn to_heal_request(&self) -> Result<HealRequest> {
pub fn to_heal_request(&self) -> HealRequest {
match self {
HealEvent::ObjectCorruption {
bucket,
@@ -113,7 +112,7 @@ impl HealEvent {
version_id,
severity,
..
} => Ok(HealRequest::new(
} => HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
@@ -121,13 +120,13 @@ impl HealEvent {
},
HealOptions::default(),
Self::severity_to_priority(severity),
)),
),
HealEvent::ObjectMissing {
bucket,
object,
version_id,
..
} => Ok(HealRequest::new(
} => HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
@@ -135,38 +134,34 @@ impl HealEvent {
},
HealOptions::default(),
HealPriority::High,
)),
HealEvent::MetadataCorruption { bucket, object, .. } => Ok(HealRequest::new(
),
HealEvent::MetadataCorruption { bucket, object, .. } => HealRequest::new(
HealType::Metadata {
bucket: bucket.clone(),
object: object.clone(),
},
HealOptions::default(),
HealPriority::High,
)),
),
HealEvent::DiskStatusChange { endpoint, .. } => {
// Convert disk status change to erasure set heal
// Note: This requires access to storage to get bucket list, which is not available here
// The actual bucket list will need to be provided by the caller or retrieved differently
let set_disk_id = crate::heal::utils::format_set_disk_id_from_i32(endpoint.pool_idx, endpoint.set_idx)
.ok_or_else(|| Error::InvalidHealType {
heal_type: format!("erasure-set(pool={}, set={})", endpoint.pool_idx, endpoint.set_idx),
})?;
Ok(HealRequest::new(
HealRequest::new(
HealType::ErasureSet {
buckets: vec![], // Empty bucket list - caller should populate this
set_disk_id,
set_disk_id: format!("{}_{}", endpoint.pool_idx, endpoint.set_idx),
},
HealOptions::default(),
HealPriority::High,
))
)
}
HealEvent::ECDecodeFailure {
bucket,
object,
version_id,
..
} => Ok(HealRequest::new(
} => HealRequest::new(
HealType::ECDecode {
bucket: bucket.clone(),
object: object.clone(),
@@ -174,13 +169,13 @@ impl HealEvent {
},
HealOptions::default(),
HealPriority::Urgent,
)),
),
HealEvent::ChecksumMismatch {
bucket,
object,
version_id,
..
} => Ok(HealRequest::new(
} => HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
@@ -188,19 +183,17 @@ impl HealEvent {
},
HealOptions::default(),
HealPriority::High,
)),
HealEvent::BucketMetadataCorruption { bucket, .. } => Ok(HealRequest::new(
HealType::Bucket { bucket: bucket.clone() },
HealOptions::default(),
HealPriority::High,
)),
HealEvent::MRFMetadataCorruption { meta_path, .. } => Ok(HealRequest::new(
),
HealEvent::BucketMetadataCorruption { bucket, .. } => {
HealRequest::new(HealType::Bucket { bucket: bucket.clone() }, HealOptions::default(), HealPriority::High)
}
HealEvent::MRFMetadataCorruption { meta_path, .. } => HealRequest::new(
HealType::MRF {
meta_path: meta_path.clone(),
},
HealOptions::default(),
HealPriority::High,
)),
),
}
}
@@ -364,319 +357,3 @@ impl Default for HealEventHandler {
Self::new(1000)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::heal::task::{HealPriority, HealType};
#[test]
fn test_heal_event_object_corruption_to_request() {
let event = HealEvent::ObjectCorruption {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::Object { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_object_missing_to_request() {
let event = HealEvent::ObjectMissing {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: Some("v1".to_string()),
expected_locations: vec![0, 1],
available_locations: vec![2, 3],
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::Object { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_metadata_corruption_to_request() {
let event = HealEvent::MetadataCorruption {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
corruption_type: CorruptionType::MetadataCorruption,
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::Metadata { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_ec_decode_failure_to_request() {
let event = HealEvent::ECDecodeFailure {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
missing_shards: vec![0, 1],
available_shards: vec![2, 3, 4],
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::ECDecode { .. }));
assert_eq!(request.priority, HealPriority::Urgent);
}
#[test]
fn test_heal_event_checksum_mismatch_to_request() {
let event = HealEvent::ChecksumMismatch {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
expected_checksum: "abc123".to_string(),
actual_checksum: "def456".to_string(),
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::Object { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_bucket_metadata_corruption_to_request() {
let event = HealEvent::BucketMetadataCorruption {
bucket: "test-bucket".to_string(),
corruption_type: CorruptionType::MetadataCorruption,
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::Bucket { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_mrf_metadata_corruption_to_request() {
let event = HealEvent::MRFMetadataCorruption {
meta_path: "test-bucket/test-object".to_string(),
corruption_type: CorruptionType::MetadataCorruption,
};
let request = event.to_heal_request().unwrap();
assert!(matches!(request.heal_type, HealType::MRF { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_severity_to_priority() {
let event_low = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::Low,
};
let request = event_low.to_heal_request().unwrap();
assert_eq!(request.priority, HealPriority::Low);
let event_medium = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::Medium,
};
let request = event_medium.to_heal_request().unwrap();
assert_eq!(request.priority, HealPriority::Normal);
let event_high = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
let request = event_high.to_heal_request().unwrap();
assert_eq!(request.priority, HealPriority::High);
let event_critical = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::Critical,
};
let request = event_critical.to_heal_request().unwrap();
assert_eq!(request.priority, HealPriority::Urgent);
}
#[test]
fn test_heal_event_description() {
let event = HealEvent::ObjectCorruption {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
let desc = event.description();
assert!(desc.contains("Object corruption detected"));
assert!(desc.contains("test-bucket/test-object"));
assert!(desc.contains("DataCorruption"));
}
#[test]
fn test_heal_event_severity() {
let event = HealEvent::ECDecodeFailure {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
missing_shards: vec![],
available_shards: vec![],
};
assert_eq!(event.severity(), Severity::Critical);
let event = HealEvent::ObjectMissing {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
expected_locations: vec![],
available_locations: vec![],
};
assert_eq!(event.severity(), Severity::High);
}
#[test]
fn test_heal_event_handler_new() {
let handler = HealEventHandler::new(10);
assert_eq!(handler.event_count(), 0);
assert_eq!(handler.max_events, 10);
}
#[test]
fn test_heal_event_handler_default() {
let handler = HealEventHandler::default();
assert_eq!(handler.max_events, 1000);
}
#[test]
fn test_heal_event_handler_add_event() {
let mut handler = HealEventHandler::new(3);
let event = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
handler.add_event(event.clone());
assert_eq!(handler.event_count(), 1);
handler.add_event(event.clone());
handler.add_event(event.clone());
assert_eq!(handler.event_count(), 3);
}
#[test]
fn test_heal_event_handler_max_events() {
let mut handler = HealEventHandler::new(2);
let event = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
handler.add_event(event.clone());
handler.add_event(event.clone());
handler.add_event(event.clone()); // Should remove oldest
assert_eq!(handler.event_count(), 2);
}
#[test]
fn test_heal_event_handler_get_events() {
let mut handler = HealEventHandler::new(10);
let event = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
handler.add_event(event.clone());
handler.add_event(event.clone());
let events = handler.get_events();
assert_eq!(events.len(), 2);
}
#[test]
fn test_heal_event_handler_clear_events() {
let mut handler = HealEventHandler::new(10);
let event = HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
};
handler.add_event(event);
assert_eq!(handler.event_count(), 1);
handler.clear_events();
assert_eq!(handler.event_count(), 0);
}
#[test]
fn test_heal_event_handler_filter_by_severity() {
let mut handler = HealEventHandler::new(10);
handler.add_event(HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::Low,
});
handler.add_event(HealEvent::ECDecodeFailure {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
missing_shards: vec![],
available_shards: vec![],
});
let high_severity = handler.filter_by_severity(Severity::High);
assert_eq!(high_severity.len(), 1); // Only ECDecodeFailure is Critical >= High
}
#[test]
fn test_heal_event_handler_filter_by_type() {
let mut handler = HealEventHandler::new(10);
handler.add_event(HealEvent::ObjectCorruption {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
corruption_type: CorruptionType::DataCorruption,
severity: Severity::High,
});
handler.add_event(HealEvent::ObjectMissing {
bucket: "test".to_string(),
object: "test".to_string(),
version_id: None,
expected_locations: vec![],
available_locations: vec![],
});
let corruption_events = handler.filter_by_type("ObjectCorruption");
assert_eq!(corruption_events.len(), 1);
let missing_events = handler.filter_by_type("ObjectMissing");
assert_eq!(missing_events.len(), 1);
}
}

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use crate::heal::{
progress::{HealProgress, HealStatistics},
storage::HealStorageAPI,
task::{HealOptions, HealPriority, HealRequest, HealTask, HealTaskStatus, HealType},
};
use crate::{Error, Result};
use rustfs_ecstore::disk::DiskAPI;
use rustfs_ecstore::disk::error::DiskError;
use rustfs_ecstore::global::GLOBAL_LOCAL_DISK_MAP;
@@ -310,36 +310,17 @@ impl HealManager {
// Create erasure set heal requests for each endpoint
for ep in endpoints {
let Some(set_disk_id) =
crate::heal::utils::format_set_disk_id_from_i32(ep.pool_idx, ep.set_idx)
else {
warn!("Skipping endpoint {} without valid pool/set index", ep);
continue;
};
// skip if already queued or healing
// Use consistent lock order: queue first, then active_heals to avoid deadlock
let mut skip = false;
{
let queue = heal_queue.lock().await;
if queue.iter().any(|req| {
matches!(
&req.heal_type,
crate::heal::task::HealType::ErasureSet { set_disk_id: queued_id, .. }
if queued_id == &set_disk_id
)
}) {
if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::ErasureSet { set_disk_id, .. } if set_disk_id == &format!("{}_{}", ep.pool_idx, ep.set_idx))) {
skip = true;
}
}
if !skip {
let active = active_heals.lock().await;
if active.values().any(|task| {
matches!(
&task.heal_type,
crate::heal::task::HealType::ErasureSet { set_disk_id: active_id, .. }
if active_id == &set_disk_id
)
}) {
if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::ErasureSet { set_disk_id, .. } if set_disk_id == &format!("{}_{}", ep.pool_idx, ep.set_idx))) {
skip = true;
}
}
@@ -349,10 +330,11 @@ impl HealManager {
}
// enqueue erasure set heal request for this disk
let set_disk_id = format!("pool_{}_set_{}", ep.pool_idx, ep.set_idx);
let req = HealRequest::new(
HealType::ErasureSet {
buckets: buckets.clone(),
set_disk_id: set_disk_id.clone(),
set_disk_id: set_disk_id.clone()
},
HealOptions::default(),
HealPriority::Normal,

View File

@@ -20,7 +20,6 @@ pub mod progress;
pub mod resume;
pub mod storage;
pub mod task;
pub mod utils;
pub use erasure_healer::ErasureSetHealer;
pub use manager::HealManager;

View File

@@ -146,244 +146,3 @@ impl HealStatistics {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_heal_progress_new() {
let progress = HealProgress::new();
assert_eq!(progress.objects_scanned, 0);
assert_eq!(progress.objects_healed, 0);
assert_eq!(progress.objects_failed, 0);
assert_eq!(progress.bytes_processed, 0);
assert_eq!(progress.progress_percentage, 0.0);
assert!(progress.start_time.is_some());
assert!(progress.last_update_time.is_some());
assert!(progress.current_object.is_none());
}
#[test]
fn test_heal_progress_update_progress() {
let mut progress = HealProgress::new();
progress.update_progress(10, 8, 2, 1024);
assert_eq!(progress.objects_scanned, 10);
assert_eq!(progress.objects_healed, 8);
assert_eq!(progress.objects_failed, 2);
assert_eq!(progress.bytes_processed, 1024);
// Progress percentage should be calculated based on healed/total
// total = scanned + healed + failed = 10 + 8 + 2 = 20
// healed/total = 8/20 = 0.4 = 40%
assert!((progress.progress_percentage - 40.0).abs() < 0.001);
assert!(progress.last_update_time.is_some());
}
#[test]
fn test_heal_progress_update_progress_zero_total() {
let mut progress = HealProgress::new();
progress.update_progress(0, 0, 0, 0);
assert_eq!(progress.progress_percentage, 0.0);
}
#[test]
fn test_heal_progress_update_progress_all_healed() {
let mut progress = HealProgress::new();
// When scanned=0, healed=10, failed=0: total=10, progress = 10/10 = 100%
progress.update_progress(0, 10, 0, 2048);
// All healed, should be 100%
assert!((progress.progress_percentage - 100.0).abs() < 0.001);
}
#[test]
fn test_heal_progress_set_current_object() {
let mut progress = HealProgress::new();
let initial_time = progress.last_update_time;
// Small delay to ensure time difference
std::thread::sleep(std::time::Duration::from_millis(10));
progress.set_current_object(Some("test-bucket/test-object".to_string()));
assert_eq!(progress.current_object, Some("test-bucket/test-object".to_string()));
assert!(progress.last_update_time.is_some());
// last_update_time should be updated
assert_ne!(progress.last_update_time, initial_time);
}
#[test]
fn test_heal_progress_set_current_object_none() {
let mut progress = HealProgress::new();
progress.set_current_object(Some("test".to_string()));
progress.set_current_object(None);
assert!(progress.current_object.is_none());
}
#[test]
fn test_heal_progress_is_completed_by_percentage() {
let mut progress = HealProgress::new();
progress.update_progress(10, 10, 0, 1024);
assert!(progress.is_completed());
}
#[test]
fn test_heal_progress_is_completed_by_processed() {
let mut progress = HealProgress::new();
progress.objects_scanned = 10;
progress.objects_healed = 8;
progress.objects_failed = 2;
// healed + failed = 8 + 2 = 10 >= scanned = 10
assert!(progress.is_completed());
}
#[test]
fn test_heal_progress_is_not_completed() {
let mut progress = HealProgress::new();
progress.objects_scanned = 10;
progress.objects_healed = 5;
progress.objects_failed = 2;
// healed + failed = 5 + 2 = 7 < scanned = 10
assert!(!progress.is_completed());
}
#[test]
fn test_heal_progress_get_success_rate() {
let mut progress = HealProgress::new();
progress.objects_healed = 8;
progress.objects_failed = 2;
// success_rate = 8 / (8 + 2) * 100 = 80%
assert!((progress.get_success_rate() - 80.0).abs() < 0.001);
}
#[test]
fn test_heal_progress_get_success_rate_zero_total() {
let progress = HealProgress::new();
// No healed or failed objects
assert_eq!(progress.get_success_rate(), 0.0);
}
#[test]
fn test_heal_progress_get_success_rate_all_success() {
let mut progress = HealProgress::new();
progress.objects_healed = 10;
progress.objects_failed = 0;
assert!((progress.get_success_rate() - 100.0).abs() < 0.001);
}
#[test]
fn test_heal_statistics_new() {
let stats = HealStatistics::new();
assert_eq!(stats.total_tasks, 0);
assert_eq!(stats.successful_tasks, 0);
assert_eq!(stats.failed_tasks, 0);
assert_eq!(stats.running_tasks, 0);
assert_eq!(stats.total_objects_healed, 0);
assert_eq!(stats.total_bytes_healed, 0);
}
#[test]
fn test_heal_statistics_default() {
let stats = HealStatistics::default();
assert_eq!(stats.total_tasks, 0);
assert_eq!(stats.successful_tasks, 0);
assert_eq!(stats.failed_tasks, 0);
}
#[test]
fn test_heal_statistics_update_task_completion_success() {
let mut stats = HealStatistics::new();
let initial_time = stats.last_update_time;
std::thread::sleep(std::time::Duration::from_millis(10));
stats.update_task_completion(true);
assert_eq!(stats.successful_tasks, 1);
assert_eq!(stats.failed_tasks, 0);
assert!(stats.last_update_time > initial_time);
}
#[test]
fn test_heal_statistics_update_task_completion_failure() {
let mut stats = HealStatistics::new();
stats.update_task_completion(false);
assert_eq!(stats.successful_tasks, 0);
assert_eq!(stats.failed_tasks, 1);
}
#[test]
fn test_heal_statistics_update_running_tasks() {
let mut stats = HealStatistics::new();
let initial_time = stats.last_update_time;
std::thread::sleep(std::time::Duration::from_millis(10));
stats.update_running_tasks(5);
assert_eq!(stats.running_tasks, 5);
assert!(stats.last_update_time > initial_time);
}
#[test]
fn test_heal_statistics_add_healed_objects() {
let mut stats = HealStatistics::new();
let initial_time = stats.last_update_time;
std::thread::sleep(std::time::Duration::from_millis(10));
stats.add_healed_objects(10, 10240);
assert_eq!(stats.total_objects_healed, 10);
assert_eq!(stats.total_bytes_healed, 10240);
assert!(stats.last_update_time > initial_time);
}
#[test]
fn test_heal_statistics_add_healed_objects_accumulative() {
let mut stats = HealStatistics::new();
stats.add_healed_objects(5, 5120);
stats.add_healed_objects(3, 3072);
assert_eq!(stats.total_objects_healed, 8);
assert_eq!(stats.total_bytes_healed, 8192);
}
#[test]
fn test_heal_statistics_get_success_rate() {
let mut stats = HealStatistics::new();
stats.successful_tasks = 8;
stats.failed_tasks = 2;
// success_rate = 8 / (8 + 2) * 100 = 80%
assert!((stats.get_success_rate() - 80.0).abs() < 0.001);
}
#[test]
fn test_heal_statistics_get_success_rate_zero_total() {
let stats = HealStatistics::new();
assert_eq!(stats.get_success_rate(), 0.0);
}
#[test]
fn test_heal_statistics_get_success_rate_all_success() {
let mut stats = HealStatistics::new();
stats.successful_tasks = 10;
stats.failed_tasks = 0;
assert!((stats.get_success_rate() - 100.0).abs() < 0.001);
}
#[test]
fn test_heal_statistics_get_success_rate_all_failure() {
let mut stats = HealStatistics::new();
stats.successful_tasks = 0;
stats.failed_tasks = 5;
assert_eq!(stats.get_success_rate(), 0.0);
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Error, Result};
use crate::error::{Error, Result};
use rustfs_ecstore::disk::{BUCKET_META_PREFIX, DiskAPI, DiskStore, RUSTFS_META_BUCKET};
use serde::{Deserialize, Serialize};
use std::path::Path;
@@ -27,12 +27,6 @@ const RESUME_STATE_FILE: &str = "ahm_resume_state.json";
const RESUME_PROGRESS_FILE: &str = "ahm_progress.json";
const RESUME_CHECKPOINT_FILE: &str = "ahm_checkpoint.json";
/// Helper function to convert Path to &str, returning an error if conversion fails
fn path_to_str(path: &Path) -> Result<&str> {
path.to_str()
.ok_or_else(|| Error::other(format!("Invalid UTF-8 path: {:?}", path)))
}
/// resume state
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeState {
@@ -40,9 +34,6 @@ pub struct ResumeState {
pub task_id: String,
/// task type
pub task_type: String,
/// set disk identifier (for erasure set tasks)
#[serde(default)]
pub set_disk_id: String,
/// start time
pub start_time: u64,
/// last update time
@@ -76,13 +67,12 @@ pub struct ResumeState {
}
impl ResumeState {
pub fn new(task_id: String, task_type: String, set_disk_id: String, buckets: Vec<String>) -> Self {
pub fn new(task_id: String, task_type: String, buckets: Vec<String>) -> Self {
Self {
task_id,
task_type,
set_disk_id,
start_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
last_update: SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
start_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
last_update: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
completed: false,
total_objects: 0,
processed_objects: 0,
@@ -104,13 +94,13 @@ impl ResumeState {
self.successful_objects = successful;
self.failed_objects = failed;
self.skipped_objects = skipped;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn set_current_item(&mut self, bucket: Option<String>, object: Option<String>) {
self.current_bucket = bucket;
self.current_object = object;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn complete_bucket(&mut self, bucket: &str) {
@@ -120,22 +110,22 @@ impl ResumeState {
if let Some(pos) = self.pending_buckets.iter().position(|b| b == bucket) {
self.pending_buckets.remove(pos);
}
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn mark_completed(&mut self) {
self.completed = true;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn set_error(&mut self, error: String) {
self.error_message = Some(error);
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn increment_retry(&mut self) {
self.retry_count += 1;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn can_retry(&self) -> bool {
@@ -166,14 +156,8 @@ pub struct ResumeManager {
impl ResumeManager {
/// create new resume manager
pub async fn new(
disk: DiskStore,
task_id: String,
task_type: String,
set_disk_id: String,
buckets: Vec<String>,
) -> Result<Self> {
let state = ResumeState::new(task_id, task_type, set_disk_id, buckets);
pub async fn new(disk: DiskStore, task_id: String, task_type: String, buckets: Vec<String>) -> Result<Self> {
let state = ResumeState::new(task_id, task_type, buckets);
let manager = Self {
disk,
state: Arc::new(RwLock::new(state)),
@@ -200,11 +184,8 @@ impl ResumeManager {
/// check if resume state exists
pub async fn has_resume_state(disk: &DiskStore, task_id: &str) -> bool {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}"));
match path_to_str(&file_path) {
Ok(path_str) => match disk.read_all(RUSTFS_META_BUCKET, path_str).await {
Ok(data) => !data.is_empty(),
Err(_) => false,
},
match disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()).await {
Ok(data) => !data.is_empty(),
Err(_) => false,
}
}
@@ -273,15 +254,18 @@ impl ResumeManager {
let checkpoint_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
// ignore delete errors, files may not exist
if let Ok(path_str) = path_to_str(&state_file) {
let _ = self.disk.delete(RUSTFS_META_BUCKET, path_str, Default::default()).await;
}
if let Ok(path_str) = path_to_str(&progress_file) {
let _ = self.disk.delete(RUSTFS_META_BUCKET, path_str, Default::default()).await;
}
if let Ok(path_str) = path_to_str(&checkpoint_file) {
let _ = self.disk.delete(RUSTFS_META_BUCKET, path_str, Default::default()).await;
}
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, state_file.to_str().unwrap(), Default::default())
.await;
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, progress_file.to_str().unwrap(), Default::default())
.await;
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, checkpoint_file.to_str().unwrap(), Default::default())
.await;
info!("Cleaned up resume state for task: {}", task_id);
Ok(())
@@ -296,9 +280,8 @@ impl ResumeManager {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", state.task_id, RESUME_STATE_FILE));
let path_str = path_to_str(&file_path)?;
self.disk
.write_all(RUSTFS_META_BUCKET, path_str, state_data.into())
.write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), state_data.into())
.await
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to save resume state: {e}"),
@@ -312,8 +295,7 @@ impl ResumeManager {
async fn read_state_file(disk: &DiskStore, task_id: &str) -> Result<Vec<u8>> {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}"));
let path_str = path_to_str(&file_path)?;
disk.read_all(RUSTFS_META_BUCKET, path_str)
disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap())
.await
.map(|bytes| bytes.to_vec())
.map_err(|e| Error::TaskExecutionFailed {
@@ -345,7 +327,7 @@ impl ResumeCheckpoint {
pub fn new(task_id: String) -> Self {
Self {
task_id,
checkpoint_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
checkpoint_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
current_bucket_index: 0,
current_object_index: 0,
processed_objects: Vec::new(),
@@ -357,7 +339,7 @@ impl ResumeCheckpoint {
pub fn update_position(&mut self, bucket_index: usize, object_index: usize) {
self.current_bucket_index = bucket_index;
self.current_object_index = object_index;
self.checkpoint_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.checkpoint_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn add_processed_object(&mut self, object: String) {
@@ -415,11 +397,8 @@ impl CheckpointManager {
/// check if checkpoint exists
pub async fn has_checkpoint(disk: &DiskStore, task_id: &str) -> bool {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
match path_to_str(&file_path) {
Ok(path_str) => match disk.read_all(RUSTFS_META_BUCKET, path_str).await {
Ok(data) => !data.is_empty(),
Err(_) => false,
},
match disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()).await {
Ok(data) => !data.is_empty(),
Err(_) => false,
}
}
@@ -467,9 +446,10 @@ impl CheckpointManager {
let task_id = &checkpoint.task_id;
let checkpoint_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
if let Ok(path_str) = path_to_str(&checkpoint_file) {
let _ = self.disk.delete(RUSTFS_META_BUCKET, path_str, Default::default()).await;
}
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, checkpoint_file.to_str().unwrap(), Default::default())
.await;
info!("Cleaned up checkpoint for task: {}", task_id);
Ok(())
@@ -484,9 +464,8 @@ impl CheckpointManager {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", checkpoint.task_id, RESUME_CHECKPOINT_FILE));
let path_str = path_to_str(&file_path)?;
self.disk
.write_all(RUSTFS_META_BUCKET, path_str, checkpoint_data.into())
.write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), checkpoint_data.into())
.await
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to save checkpoint: {e}"),
@@ -500,8 +479,7 @@ impl CheckpointManager {
async fn read_checkpoint_file(disk: &DiskStore, task_id: &str) -> Result<Vec<u8>> {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
let path_str = path_to_str(&file_path)?;
disk.read_all(RUSTFS_META_BUCKET, path_str)
disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap())
.await
.map(|bytes| bytes.to_vec())
.map_err(|e| Error::TaskExecutionFailed {
@@ -584,7 +562,7 @@ mod tests {
async fn test_resume_state_creation() {
let task_id = ResumeUtils::generate_task_id();
let buckets = vec!["bucket1".to_string(), "bucket2".to_string()];
let state = ResumeState::new(task_id.clone(), "erasure_set".to_string(), "pool_0_set_0".to_string(), buckets);
let state = ResumeState::new(task_id.clone(), "erasure_set".to_string(), buckets);
assert_eq!(state.task_id, task_id);
assert_eq!(state.task_type, "erasure_set");
@@ -597,7 +575,7 @@ mod tests {
async fn test_resume_state_progress() {
let task_id = ResumeUtils::generate_task_id();
let buckets = vec!["bucket1".to_string()];
let mut state = ResumeState::new(task_id, "erasure_set".to_string(), "pool_0_set_0".to_string(), buckets);
let mut state = ResumeState::new(task_id, "erasure_set".to_string(), buckets);
state.update_progress(10, 8, 1, 1);
assert_eq!(state.processed_objects, 10);
@@ -617,7 +595,7 @@ mod tests {
async fn test_resume_state_bucket_completion() {
let task_id = ResumeUtils::generate_task_id();
let buckets = vec!["bucket1".to_string(), "bucket2".to_string()];
let mut state = ResumeState::new(task_id, "erasure_set".to_string(), "pool_0_set_0".to_string(), buckets);
let mut state = ResumeState::new(task_id, "erasure_set".to_string(), buckets);
assert_eq!(state.pending_buckets.len(), 2);
assert_eq!(state.completed_buckets.len(), 0);
@@ -672,7 +650,6 @@ mod tests {
let state = ResumeState::new(
task_id.clone(),
"erasure_set".to_string(),
"pool_0_set_0".to_string(),
vec!["bucket1".to_string(), "bucket2".to_string()],
);

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Error, Result};
use crate::error::{Error, Result};
use async_trait::async_trait;
use rustfs_common::heal_channel::{HealOpts, HealScanMode};
use rustfs_ecstore::{
@@ -179,10 +179,7 @@ impl HealStorageAPI for ECStoreHealStorage {
"Object data exceeds cap ({} bytes), aborting full read to prevent OOM: {}/{}",
MAX_READ_BYTES, bucket, object
);
return Err(Error::other(format!(
"Object too large: {} bytes (max: {} bytes) for {}/{}",
n_read, MAX_READ_BYTES, bucket, object
)));
return Ok(None);
}
}
Err(e) => {
@@ -518,7 +515,21 @@ impl HealStorageAPI for ECStoreHealStorage {
debug!("Getting disk for resume: {}", set_disk_id);
// Parse set_disk_id to extract pool and set indices
let (pool_idx, set_idx) = crate::heal::utils::parse_set_disk_id(set_disk_id)?;
// Format: "pool_{pool_idx}_set_{set_idx}"
let parts: Vec<&str> = set_disk_id.split('_').collect();
if parts.len() != 4 || parts[0] != "pool" || parts[2] != "set" {
return Err(Error::TaskExecutionFailed {
message: format!("Invalid set_disk_id format: {set_disk_id}"),
});
}
let pool_idx: usize = parts[1].parse().map_err(|_| Error::TaskExecutionFailed {
message: format!("Invalid pool index in set_disk_id: {set_disk_id}"),
})?;
let set_idx: usize = parts[3].parse().map_err(|_| Error::TaskExecutionFailed {
message: format!("Invalid set index in set_disk_id: {set_disk_id}"),
})?;
// Get the first available disk from the set
let disks = self

View File

@@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::heal::{ErasureSetHealer, progress::HealProgress, storage::HealStorageAPI};
use crate::{Error, Result};
use crate::error::{Error, Result};
use crate::heal::ErasureSetHealer;
use crate::heal::{progress::HealProgress, storage::HealStorageAPI};
use rustfs_common::heal_channel::{HealOpts, HealScanMode};
use serde::{Deserialize, Serialize};
use std::{
future::Future,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use uuid::Uuid;
@@ -197,8 +195,6 @@ pub struct HealTask {
pub started_at: Arc<RwLock<Option<SystemTime>>>,
/// Completed time
pub completed_at: Arc<RwLock<Option<SystemTime>>>,
/// Task start instant for timeout calculation (monotonic)
task_start_instant: Arc<RwLock<Option<Instant>>>,
/// Cancel token
pub cancel_token: tokio_util::sync::CancellationToken,
/// Storage layer interface
@@ -216,73 +212,20 @@ impl HealTask {
created_at: request.created_at,
started_at: Arc::new(RwLock::new(None)),
completed_at: Arc::new(RwLock::new(None)),
task_start_instant: Arc::new(RwLock::new(None)),
cancel_token: tokio_util::sync::CancellationToken::new(),
storage,
}
}
async fn remaining_timeout(&self) -> Result<Option<Duration>> {
if let Some(total) = self.options.timeout {
let start_instant = { *self.task_start_instant.read().await };
if let Some(started_at) = start_instant {
let elapsed = started_at.elapsed();
if elapsed >= total {
return Err(Error::TaskTimeout);
}
return Ok(Some(total - elapsed));
}
Ok(Some(total))
} else {
Ok(None)
}
}
async fn check_control_flags(&self) -> Result<()> {
if self.cancel_token.is_cancelled() {
return Err(Error::TaskCancelled);
}
// Only interested in propagating an error if the timeout has expired;
// the actual Duration value is not needed here
let _ = self.remaining_timeout().await?;
Ok(())
}
async fn await_with_control<F, T>(&self, fut: F) -> Result<T>
where
F: Future<Output = Result<T>> + Send,
T: Send,
{
let cancel_token = self.cancel_token.clone();
if let Some(remaining) = self.remaining_timeout().await? {
if remaining.is_zero() {
return Err(Error::TaskTimeout);
}
let mut fut = Box::pin(fut);
tokio::select! {
_ = cancel_token.cancelled() => Err(Error::TaskCancelled),
_ = tokio::time::sleep(remaining) => Err(Error::TaskTimeout),
result = &mut fut => result,
}
} else {
tokio::select! {
_ = cancel_token.cancelled() => Err(Error::TaskCancelled),
result = fut => result,
}
}
}
pub async fn execute(&self) -> Result<()> {
// update status and timestamps atomically to avoid race conditions
let now = SystemTime::now();
let start_instant = Instant::now();
// update status to running
{
let mut status = self.status.write().await;
let mut started_at = self.started_at.write().await;
let mut task_start_instant = self.task_start_instant.write().await;
*status = HealTaskStatus::Running;
*started_at = Some(now);
*task_start_instant = Some(start_instant);
}
{
let mut started_at = self.started_at.write().await;
*started_at = Some(SystemTime::now());
}
info!("Starting heal task: {} with type: {:?}", self.id, self.heal_type);
@@ -317,16 +260,6 @@ impl HealTask {
*status = HealTaskStatus::Completed;
info!("Heal task completed successfully: {}", self.id);
}
Err(Error::TaskCancelled) => {
let mut status = self.status.write().await;
*status = HealTaskStatus::Cancelled;
info!("Heal task was cancelled: {}", self.id);
}
Err(Error::TaskTimeout) => {
let mut status = self.status.write().await;
*status = HealTaskStatus::Timeout;
warn!("Heal task timed out: {}", self.id);
}
Err(e) => {
let mut status = self.status.write().await;
*status = HealTaskStatus::Failed { error: e.to_string() };
@@ -366,8 +299,7 @@ impl HealTask {
// Step 1: Check if object exists and get metadata
info!("Step 1: Checking object existence and metadata");
self.check_control_flags().await?;
let object_exists = self.await_with_control(self.storage.object_exists(bucket, object)).await?;
let object_exists = self.storage.object_exists(bucket, object).await?;
if !object_exists {
warn!("Object does not exist: {}/{}", bucket, object);
if self.options.recreate_missing {
@@ -399,11 +331,7 @@ impl HealTask {
set: self.options.set_index,
};
let heal_result = self
.await_with_control(self.storage.heal_object(bucket, object, version_id, &heal_opts))
.await;
match heal_result {
match self.storage.heal_object(bucket, object, version_id, &heal_opts).await {
Ok((result, error)) => {
if let Some(e) = error {
// Check if this is a "File not found" error during delete operations
@@ -426,7 +354,7 @@ impl HealTask {
if self.options.remove_corrupted {
warn!("Removing corrupted object: {}/{}", bucket, object);
if !self.options.dry_run {
self.await_with_control(self.storage.delete_object(bucket, object)).await?;
self.storage.delete_object(bucket, object).await?;
info!("Successfully deleted corrupted object: {}/{}", bucket, object);
} else {
info!("Dry run mode - would delete corrupted object: {}/{}", bucket, object);
@@ -460,8 +388,6 @@ impl HealTask {
}
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
// Check if this is a "File not found" error during delete operations
let error_msg = format!("{e}");
@@ -483,7 +409,7 @@ impl HealTask {
if self.options.remove_corrupted {
warn!("Removing corrupted object: {}/{}", bucket, object);
if !self.options.dry_run {
self.await_with_control(self.storage.delete_object(bucket, object)).await?;
self.storage.delete_object(bucket, object).await?;
info!("Successfully deleted corrupted object: {}/{}", bucket, object);
} else {
info!("Dry run mode - would delete corrupted object: {}/{}", bucket, object);
@@ -519,10 +445,7 @@ impl HealTask {
set: None,
};
match self
.await_with_control(self.storage.heal_object(bucket, object, version_id, &heal_opts))
.await
{
match self.storage.heal_object(bucket, object, version_id, &heal_opts).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("Failed to recreate missing object: {}/{} - {}", bucket, object, e);
@@ -540,8 +463,6 @@ impl HealTask {
}
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
error!("Failed to recreate missing object: {}/{} - {}", bucket, object, e);
Err(Error::TaskExecutionFailed {
@@ -563,8 +484,7 @@ impl HealTask {
// Step 1: Check if bucket exists
info!("Step 1: Checking bucket existence");
self.check_control_flags().await?;
let bucket_exists = self.await_with_control(self.storage.get_bucket_info(bucket)).await?.is_some();
let bucket_exists = self.storage.get_bucket_info(bucket).await?.is_some();
if !bucket_exists {
warn!("Bucket does not exist: {}", bucket);
return Err(Error::TaskExecutionFailed {
@@ -591,9 +511,7 @@ impl HealTask {
set: self.options.set_index,
};
let heal_result = self.await_with_control(self.storage.heal_bucket(bucket, &heal_opts)).await;
match heal_result {
match self.storage.heal_bucket(bucket, &heal_opts).await {
Ok(result) => {
info!("Bucket heal completed successfully: {} ({} drives)", bucket, result.after.drives.len());
@@ -603,8 +521,6 @@ impl HealTask {
}
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
error!("Bucket heal failed: {} - {}", bucket, e);
{
@@ -630,8 +546,7 @@ impl HealTask {
// Step 1: Check if object exists
info!("Step 1: Checking object existence");
self.check_control_flags().await?;
let object_exists = self.await_with_control(self.storage.object_exists(bucket, object)).await?;
let object_exists = self.storage.object_exists(bucket, object).await?;
if !object_exists {
warn!("Object does not exist: {}/{}", bucket, object);
return Err(Error::TaskExecutionFailed {
@@ -658,11 +573,7 @@ impl HealTask {
set: self.options.set_index,
};
let heal_result = self
.await_with_control(self.storage.heal_object(bucket, object, None, &heal_opts))
.await;
match heal_result {
match self.storage.heal_object(bucket, object, None, &heal_opts).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("Metadata heal failed: {}/{} - {}", bucket, object, e);
@@ -688,8 +599,6 @@ impl HealTask {
}
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
error!("Metadata heal failed: {}/{} - {}", bucket, object, e);
{
@@ -738,11 +647,7 @@ impl HealTask {
set: None,
};
let heal_result = self
.await_with_control(self.storage.heal_object(bucket, &object, None, &heal_opts))
.await;
match heal_result {
match self.storage.heal_object(bucket, &object, None, &heal_opts).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("MRF heal failed: {} - {}", meta_path, e);
@@ -763,8 +668,6 @@ impl HealTask {
}
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
error!("MRF heal failed: {} - {}", meta_path, e);
{
@@ -790,8 +693,7 @@ impl HealTask {
// Step 1: Check if object exists
info!("Step 1: Checking object existence");
self.check_control_flags().await?;
let object_exists = self.await_with_control(self.storage.object_exists(bucket, object)).await?;
let object_exists = self.storage.object_exists(bucket, object).await?;
if !object_exists {
warn!("Object does not exist: {}/{}", bucket, object);
return Err(Error::TaskExecutionFailed {
@@ -818,11 +720,7 @@ impl HealTask {
set: None,
};
let heal_result = self
.await_with_control(self.storage.heal_object(bucket, object, version_id, &heal_opts))
.await;
match heal_result {
match self.storage.heal_object(bucket, object, version_id, &heal_opts).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("EC decode heal failed: {}/{} - {}", bucket, object, e);
@@ -850,8 +748,6 @@ impl HealTask {
}
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
error!("EC decode heal failed: {}/{} - {}", bucket, object, e);
{
@@ -877,7 +773,7 @@ impl HealTask {
let buckets = if buckets.is_empty() {
info!("No buckets specified, listing all buckets");
let bucket_infos = self.await_with_control(self.storage.list_buckets()).await?;
let bucket_infos = self.storage.list_buckets().await?;
bucket_infos.into_iter().map(|info| info.name).collect()
} else {
buckets
@@ -885,9 +781,7 @@ impl HealTask {
// Step 1: Perform disk format heal using ecstore
info!("Step 1: Performing disk format heal using ecstore");
let format_result = self.await_with_control(self.storage.heal_format(self.options.dry_run)).await;
match format_result {
match self.storage.heal_format(self.options.dry_run).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("Disk format heal failed: {} - {}", set_disk_id, e);
@@ -906,8 +800,6 @@ impl HealTask {
result.after.drives.len()
);
}
Err(Error::TaskCancelled) => return Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => return Err(Error::TaskTimeout),
Err(e) => {
error!("Disk format heal failed: {} - {}", set_disk_id, e);
{
@@ -927,9 +819,7 @@ impl HealTask {
// Step 2: Get disk for resume functionality
info!("Step 2: Getting disk for resume functionality");
let disk = self
.await_with_control(self.storage.get_disk_for_resume(&set_disk_id))
.await?;
let disk = self.storage.get_disk_for_resume(&set_disk_id).await?;
{
let mut progress = self.progress.write().await;
@@ -937,18 +827,9 @@ impl HealTask {
}
// Step 3: Heal bucket structure
// Check control flags before each iteration to ensure timely cancellation.
// Each heal_bucket call may handle timeout/cancellation internally, see its implementation for details.
for bucket in buckets.iter() {
// Check control flags before starting each bucket heal
self.check_control_flags().await?;
// heal_bucket internally uses await_with_control for timeout/cancellation handling
if let Err(err) = self.heal_bucket(bucket).await {
// Check if error is due to cancellation or timeout
if matches!(err, Error::TaskCancelled | Error::TaskTimeout) {
return Err(err);
}
info!("Bucket heal failed: {}", err.to_string());
info!("{}", err.to_string());
}
}
@@ -975,8 +856,6 @@ impl HealTask {
info!("Erasure set heal completed successfully: {} ({} buckets)", set_disk_id, buckets.len());
Ok(())
}
Err(Error::TaskCancelled) => Err(Error::TaskCancelled),
Err(Error::TaskTimeout) => Err(Error::TaskTimeout),
Err(e) => {
error!("Erasure set heal failed: {} - {}", set_disk_id, e);
Err(Error::TaskExecutionFailed {

View File

@@ -1,110 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Error, Result};
/// Prefix for pool index in set disk identifiers.
const POOL_PREFIX: &str = "pool";
/// Prefix for set index in set disk identifiers.
const SET_PREFIX: &str = "set";
/// Format a set disk identifier using unsigned indices.
pub fn format_set_disk_id(pool_idx: usize, set_idx: usize) -> String {
format!("{POOL_PREFIX}_{pool_idx}_{SET_PREFIX}_{set_idx}")
}
/// Format a set disk identifier from signed indices.
pub fn format_set_disk_id_from_i32(pool_idx: i32, set_idx: i32) -> Option<String> {
if pool_idx < 0 || set_idx < 0 {
None
} else {
Some(format_set_disk_id(pool_idx as usize, set_idx as usize))
}
}
/// Normalise external set disk identifiers into the canonical format.
pub fn normalize_set_disk_id(raw: &str) -> Option<String> {
if raw.starts_with(&format!("{POOL_PREFIX}_")) {
Some(raw.to_string())
} else {
parse_compact_set_disk_id(raw).map(|(pool, set)| format_set_disk_id(pool, set))
}
}
/// Parse a canonical set disk identifier into pool/set indices.
pub fn parse_set_disk_id(raw: &str) -> Result<(usize, usize)> {
let parts: Vec<&str> = raw.split('_').collect();
if parts.len() != 4 || parts[0] != POOL_PREFIX || parts[2] != SET_PREFIX {
return Err(Error::TaskExecutionFailed {
message: format!("Invalid set_disk_id format: {raw}"),
});
}
let pool_idx = parts[1].parse::<usize>().map_err(|_| Error::TaskExecutionFailed {
message: format!("Invalid pool index in set_disk_id: {raw}"),
})?;
let set_idx = parts[3].parse::<usize>().map_err(|_| Error::TaskExecutionFailed {
message: format!("Invalid set index in set_disk_id: {raw}"),
})?;
Ok((pool_idx, set_idx))
}
fn parse_compact_set_disk_id(raw: &str) -> Option<(usize, usize)> {
let (pool, set) = raw.split_once('_')?;
let pool_idx = pool.parse::<usize>().ok()?;
let set_idx = set.parse::<usize>().ok()?;
Some((pool_idx, set_idx))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_from_unsigned_indices() {
assert_eq!(format_set_disk_id(1, 2), "pool_1_set_2");
}
#[test]
fn format_from_signed_indices() {
assert_eq!(format_set_disk_id_from_i32(3, 4), Some("pool_3_set_4".into()));
assert_eq!(format_set_disk_id_from_i32(-1, 4), None);
}
#[test]
fn normalize_compact_identifier() {
assert_eq!(normalize_set_disk_id("3_5"), Some("pool_3_set_5".to_string()));
}
#[test]
fn normalize_prefixed_identifier() {
assert_eq!(normalize_set_disk_id("pool_7_set_1"), Some("pool_7_set_1".to_string()));
}
#[test]
fn normalize_invalid_identifier() {
assert_eq!(normalize_set_disk_id("invalid"), None);
}
#[test]
fn parse_prefixed_identifier() {
assert_eq!(parse_set_disk_id("pool_9_set_3").unwrap(), (9, 3));
}
#[test]
fn parse_invalid_identifier() {
assert!(parse_set_disk_id("bad").is_err());
assert!(parse_set_disk_id("pool_X_set_1").is_err());
}
}

View File

@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod error;
use std::sync::{Arc, OnceLock};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
pub mod error;
pub mod heal;
pub mod scanner;
pub use error::{Error, Result};
pub use heal::{HealManager, HealOptions, HealPriority, HealRequest, HealType, channel::HealChannelProcessor};
pub use scanner::Scanner;
use std::sync::{Arc, OnceLock};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
// Global cancellation token for AHM services (scanner and other background tasks)
static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock<CancellationToken> = OnceLock::new();

View File

@@ -12,16 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::scanner::node_scanner::ScanProgress;
use crate::{Error, Result};
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use super::node_scanner::ScanProgress;
use crate::{Error, error::Result};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CheckpointData {
pub version: u32,

View File

@@ -12,39 +12,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// IO throttling component is integrated into NodeScanner
use crate::{
Error, HealRequest, Result, get_ahm_services_cancel_token,
heal::HealManager,
scanner::{
BucketMetrics, DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, DiskMetrics, MetricsCollector,
NodeScanner, NodeScannerConfig, ScannerMetrics,
lifecycle::ScannerItem,
local_scan::{self, LocalObjectRecord, LocalScanOutcome},
},
};
use rustfs_common::data_usage::{DataUsageInfo, SizeSummary};
use rustfs_common::metrics::{Metric, Metrics, global_metrics};
use rustfs_ecstore::{
self as ecstore, StorageAPI,
bucket::versioning::VersioningApi,
bucket::versioning_sys::BucketVersioningSys,
data_usage::{aggregate_local_snapshots, store_data_usage_in_backend},
disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
set_disk::SetDisks,
store_api::ObjectInfo,
};
use rustfs_filemeta::{MetacacheReader, VersionType};
use s3s::dto::{BucketVersioningStatus, VersioningConfiguration};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};
use time::OffsetDateTime;
use ecstore::{
disk::{Disk, DiskAPI, DiskStore, WalkDirOptions},
set_disk::SetDisks,
};
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_ecstore::{
self as ecstore, StorageAPI,
data_usage::{aggregate_local_snapshots, store_data_usage_in_backend},
};
use rustfs_filemeta::{MetacacheReader, VersionType};
use s3s::dto::{BucketVersioningStatus, VersioningConfiguration};
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use super::metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics};
use super::node_scanner::{NodeScanner, NodeScannerConfig};
use super::stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig};
// IO throttling component is integrated into NodeScanner
use crate::heal::HealManager;
use crate::scanner::lifecycle::ScannerItem;
use crate::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome};
use crate::{
HealRequest,
error::{Error, Result},
get_ahm_services_cancel_token,
};
use rustfs_common::data_usage::{DataUsageInfo, SizeSummary};
use rustfs_common::metrics::{Metric, Metrics, globalMetrics};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
use uuid;
/// Custom scan mode enum for AHM scanner
@@ -765,7 +772,7 @@ impl Scanner {
/// Get global metrics from common crate
pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics {
global_metrics().report().await
(*globalMetrics).report().await
}
/// Perform a single scan cycle using optimized node scanner
@@ -795,7 +802,7 @@ impl Scanner {
cycle_completed: vec![chrono::Utc::now()],
started: chrono::Utc::now(),
};
global_metrics().set_cycle(Some(cycle_info)).await;
(*globalMetrics).set_cycle(Some(cycle_info)).await;
self.metrics.set_current_cycle(self.state.read().await.current_cycle);
self.metrics.increment_total_cycles();

View File

@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::atomic::{AtomicU64, Ordering},
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tracing::info;
/// Scanner metrics

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::Result;
use crate::scanner::LoadLevel;
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
sync::{
@@ -23,10 +20,15 @@ use std::{
},
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use super::node_scanner::LoadLevel;
use crate::error::Result;
/// IO monitor config
#[derive(Debug, Clone)]
pub struct IOMonitorConfig {

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::scanner::LoadLevel;
use std::{
sync::{
Arc,
@@ -20,9 +19,12 @@ use std::{
},
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use super::node_scanner::LoadLevel;
/// IO throttler config
#[derive(Debug, Clone)]
pub struct IOThrottlerConfig {

View File

@@ -12,28 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::error::Result;
use rustfs_common::data_usage::SizeSummary;
use rustfs_common::metrics::IlmAction;
use rustfs_ecstore::bucket::{
lifecycle::{
bucket_lifecycle_audit::LcEventSrc,
bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle},
lifecycle,
lifecycle::Lifecycle,
},
metadata_sys::get_object_lock_config,
object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion},
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
use rustfs_ecstore::bucket::lifecycle::{
bucket_lifecycle_audit::LcEventSrc,
bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle},
lifecycle,
lifecycle::Lifecycle,
};
use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config;
use rustfs_ecstore::bucket::object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
use rustfs_filemeta::FileInfo;
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use time::OffsetDateTime;
use tracing::info;

View File

@@ -1,18 +1,16 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use serde_json::{from_slice, to_vec};
use tokio::{fs, task};
use tracing::warn;
use walkdir::WalkDir;
use crate::error::{Error, Result};
use crate::{Error, Result};
use rustfs_common::data_usage::DiskUsageStatus;
use rustfs_ecstore::data_usage::{
LocalUsageSnapshot, LocalUsageSnapshotMeta, data_usage_state_dir, ensure_data_usage_layout, snapshot_file_name,
@@ -22,15 +20,6 @@ use rustfs_ecstore::disk::DiskAPI;
use rustfs_ecstore::store::ECStore;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::{FileInfo, FileMeta, FileMetaVersion, VersionType};
use serde::{Deserialize, Serialize};
use serde_json::{from_slice, to_vec};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::{fs, task};
use tracing::warn;
use walkdir::WalkDir;
const STATE_FILE_EXTENSION: &str = "";

View File

@@ -12,19 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::scanner::node_scanner::{BucketStats, DiskStats, LocalScanStats};
use crate::{Error, Result};
use rustfs_common::data_usage::DataUsageInfo;
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
sync::Arc,
sync::atomic::{AtomicU64, Ordering},
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use rustfs_common::data_usage::DataUsageInfo;
use super::node_scanner::{BucketStats, DiskStats, LocalScanStats};
use crate::{Error, error::Result};
/// local stats manager
pub struct LocalStatsManager {
/// node id

View File

@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::atomic::{AtomicU64, Ordering},
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tracing::info;
/// Scanner metrics

View File

@@ -27,10 +27,8 @@ pub mod stats_aggregator;
pub use checkpoint::{CheckpointData, CheckpointInfo, CheckpointManager};
pub use data_scanner::{ScanMode, Scanner, ScannerConfig, ScannerState};
pub use io_monitor::{AdvancedIOMonitor, IOMetrics, IOMonitorConfig};
pub use io_throttler::{AdvancedIOThrottler, IOThrottlerConfig, MetricsSnapshot, ResourceAllocation, ThrottleDecision};
pub use io_throttler::{AdvancedIOThrottler, IOThrottlerConfig, ResourceAllocation, ThrottleDecision};
pub use local_stats::{BatchScanResult, LocalStatsManager, ScanResultEntry, StatsSummary};
pub use metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics};
pub use metrics::ScannerMetrics;
pub use node_scanner::{IOMonitor, IOThrottler, LoadLevel, LocalScanStats, NodeScanner, NodeScannerConfig};
pub use stats_aggregator::{
AggregatedStats, DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, NodeClient, NodeInfo,
};
pub use stats_aggregator::{AggregatedStats, DecentralizedStatsAggregator, NodeClient, NodeInfo};

View File

@@ -12,15 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::Result;
use crate::scanner::{
AdvancedIOMonitor, AdvancedIOThrottler, BatchScanResult, CheckpointManager, IOMonitorConfig, IOThrottlerConfig,
LocalStatsManager, MetricsSnapshot, ScanResultEntry,
};
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_ecstore::StorageAPI;
use rustfs_ecstore::disk::{DiskAPI, DiskStore};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
@@ -30,10 +21,22 @@ use std::{
},
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_ecstore::StorageAPI;
use rustfs_ecstore::disk::{DiskAPI, DiskStore}; // Add this import
use super::checkpoint::CheckpointManager;
use super::io_monitor::{AdvancedIOMonitor, IOMonitorConfig};
use super::io_throttler::{AdvancedIOThrottler, IOThrottlerConfig, MetricsSnapshot};
use super::local_stats::{BatchScanResult, LocalStatsManager, ScanResultEntry};
use crate::error::Result;
/// SystemTime serde
mod system_time_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};

View File

@@ -12,21 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::scanner::{
local_stats::StatsSummary,
node_scanner::{BucketStats, LoadLevel, ScanProgress},
};
use crate::{Error, Result};
use rustfs_common::data_usage::DataUsageInfo;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use rustfs_common::data_usage::DataUsageInfo;
use super::{
local_stats::StatsSummary,
node_scanner::{BucketStats, LoadLevel, ScanProgress},
};
use crate::{Error, error::Result};
/// node client config
#[derive(Debug, Clone)]
pub struct NodeClientConfig {

View File

@@ -1,275 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::heal::{
event::{HealEvent, Severity},
task::{HealPriority, HealType},
utils,
};
#[test]
fn test_heal_event_to_heal_request_no_panic() {
use rustfs_ecstore::disk::endpoint::Endpoint;
// Test that invalid pool/set indices don't cause panic
// Create endpoint using try_from or similar method
let endpoint_result = Endpoint::try_from("http://localhost:9000");
if let Ok(mut endpoint) = endpoint_result {
endpoint.pool_idx = -1;
endpoint.set_idx = -1;
endpoint.disk_idx = 0;
let event = HealEvent::DiskStatusChange {
endpoint,
old_status: "ok".to_string(),
new_status: "offline".to_string(),
};
// Should return error instead of panicking
let result = event.to_heal_request();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Invalid heal type"));
}
}
#[test]
fn test_heal_event_to_heal_request_valid_indices() {
use rustfs_ecstore::disk::endpoint::Endpoint;
// Test that valid indices work correctly
let endpoint_result = Endpoint::try_from("http://localhost:9000");
if let Ok(mut endpoint) = endpoint_result {
endpoint.pool_idx = 0;
endpoint.set_idx = 1;
endpoint.disk_idx = 0;
let event = HealEvent::DiskStatusChange {
endpoint,
old_status: "ok".to_string(),
new_status: "offline".to_string(),
};
let result = event.to_heal_request();
assert!(result.is_ok());
let request = result.unwrap();
assert!(matches!(request.heal_type, HealType::ErasureSet { .. }));
}
}
#[test]
fn test_heal_event_object_corruption() {
let event = HealEvent::ObjectCorruption {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
corruption_type: rustfs_ahm::heal::event::CorruptionType::DataCorruption,
severity: Severity::High,
};
let result = event.to_heal_request();
assert!(result.is_ok());
let request = result.unwrap();
assert!(matches!(request.heal_type, HealType::Object { .. }));
assert_eq!(request.priority, HealPriority::High);
}
#[test]
fn test_heal_event_ec_decode_failure() {
let event = HealEvent::ECDecodeFailure {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
missing_shards: vec![0, 1],
available_shards: vec![2, 3],
};
let result = event.to_heal_request();
assert!(result.is_ok());
let request = result.unwrap();
assert!(matches!(request.heal_type, HealType::ECDecode { .. }));
assert_eq!(request.priority, HealPriority::Urgent);
}
#[test]
fn test_format_set_disk_id_from_i32_negative() {
// Test that negative indices return None
assert!(utils::format_set_disk_id_from_i32(-1, 0).is_none());
assert!(utils::format_set_disk_id_from_i32(0, -1).is_none());
assert!(utils::format_set_disk_id_from_i32(-1, -1).is_none());
}
#[test]
fn test_format_set_disk_id_from_i32_valid() {
// Test that valid indices return Some
let result = utils::format_set_disk_id_from_i32(0, 1);
assert!(result.is_some());
assert_eq!(result.unwrap(), "pool_0_set_1");
}
#[test]
fn test_resume_state_timestamp_handling() {
use rustfs_ahm::heal::resume::ResumeState;
// Test that ResumeState creation doesn't panic even if system time is before epoch
// This is a theoretical test - in practice, system time should never be before epoch
// But we want to ensure unwrap_or_default handles edge cases
let state = ResumeState::new(
"test-task".to_string(),
"test-type".to_string(),
"pool_0_set_1".to_string(),
vec!["bucket1".to_string()],
);
// Verify fields are initialized (u64 is always >= 0)
// The important thing is that unwrap_or_default prevents panic
let _ = state.start_time;
let _ = state.last_update;
}
#[test]
fn test_resume_checkpoint_timestamp_handling() {
use rustfs_ahm::heal::resume::ResumeCheckpoint;
// Test that ResumeCheckpoint creation doesn't panic
let checkpoint = ResumeCheckpoint::new("test-task".to_string());
// Verify field is initialized (u64 is always >= 0)
// The important thing is that unwrap_or_default prevents panic
let _ = checkpoint.checkpoint_time;
}
#[test]
fn test_path_to_str_helper() {
use std::path::Path;
// Test that path conversion handles non-UTF-8 paths gracefully
// Note: This is a compile-time test - actual non-UTF-8 paths are hard to construct in Rust
// The helper function should properly handle the conversion
let valid_path = Path::new("test/path");
assert!(valid_path.to_str().is_some());
}
#[test]
fn test_heal_task_status_atomic_update() {
use rustfs_ahm::heal::storage::HealStorageAPI;
use rustfs_ahm::heal::task::{HealOptions, HealRequest, HealTask, HealTaskStatus};
use std::sync::Arc;
// Mock storage for testing
struct MockStorage;
#[async_trait::async_trait]
impl HealStorageAPI for MockStorage {
async fn get_object_meta(
&self,
_bucket: &str,
_object: &str,
) -> rustfs_ahm::Result<Option<rustfs_ecstore::store_api::ObjectInfo>> {
Ok(None)
}
async fn get_object_data(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<Option<Vec<u8>>> {
Ok(None)
}
async fn put_object_data(&self, _bucket: &str, _object: &str, _data: &[u8]) -> rustfs_ahm::Result<()> {
Ok(())
}
async fn delete_object(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<()> {
Ok(())
}
async fn verify_object_integrity(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<bool> {
Ok(true)
}
async fn ec_decode_rebuild(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<Vec<u8>> {
Ok(vec![])
}
async fn get_disk_status(
&self,
_endpoint: &rustfs_ecstore::disk::endpoint::Endpoint,
) -> rustfs_ahm::Result<rustfs_ahm::heal::storage::DiskStatus> {
Ok(rustfs_ahm::heal::storage::DiskStatus::Ok)
}
async fn format_disk(&self, _endpoint: &rustfs_ecstore::disk::endpoint::Endpoint) -> rustfs_ahm::Result<()> {
Ok(())
}
async fn get_bucket_info(&self, _bucket: &str) -> rustfs_ahm::Result<Option<rustfs_ecstore::store_api::BucketInfo>> {
Ok(None)
}
async fn heal_bucket_metadata(&self, _bucket: &str) -> rustfs_ahm::Result<()> {
Ok(())
}
async fn list_buckets(&self) -> rustfs_ahm::Result<Vec<rustfs_ecstore::store_api::BucketInfo>> {
Ok(vec![])
}
async fn object_exists(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<bool> {
Ok(false)
}
async fn get_object_size(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<Option<u64>> {
Ok(None)
}
async fn get_object_checksum(&self, _bucket: &str, _object: &str) -> rustfs_ahm::Result<Option<String>> {
Ok(None)
}
async fn heal_object(
&self,
_bucket: &str,
_object: &str,
_version_id: Option<&str>,
_opts: &rustfs_common::heal_channel::HealOpts,
) -> rustfs_ahm::Result<(rustfs_madmin::heal_commands::HealResultItem, Option<rustfs_ahm::Error>)> {
Ok((rustfs_madmin::heal_commands::HealResultItem::default(), None))
}
async fn heal_bucket(
&self,
_bucket: &str,
_opts: &rustfs_common::heal_channel::HealOpts,
) -> rustfs_ahm::Result<rustfs_madmin::heal_commands::HealResultItem> {
Ok(rustfs_madmin::heal_commands::HealResultItem::default())
}
async fn heal_format(
&self,
_dry_run: bool,
) -> rustfs_ahm::Result<(rustfs_madmin::heal_commands::HealResultItem, Option<rustfs_ahm::Error>)> {
Ok((rustfs_madmin::heal_commands::HealResultItem::default(), None))
}
async fn list_objects_for_heal(&self, _bucket: &str, _prefix: &str) -> rustfs_ahm::Result<Vec<String>> {
Ok(vec![])
}
async fn get_disk_for_resume(&self, _set_disk_id: &str) -> rustfs_ahm::Result<rustfs_ecstore::disk::DiskStore> {
Err(rustfs_ahm::Error::other("Not implemented in mock"))
}
}
// Create a heal request and task
let request = HealRequest::new(
HealType::Object {
bucket: "test-bucket".to_string(),
object: "test-object".to_string(),
version_id: None,
},
HealOptions::default(),
HealPriority::Normal,
);
let storage: Arc<dyn HealStorageAPI> = Arc::new(MockStorage);
let task = HealTask::from_request(request, storage);
// Verify initial status
let status = tokio::runtime::Runtime::new().unwrap().block_on(task.get_status());
assert_eq!(status, HealTaskStatus::Pending);
// The task should have task_start_instant field initialized
// This is an internal detail, but we can verify it doesn't cause issues
// by checking that the task can be created successfully
// Note: We can't directly access private fields, but creation without panic
// confirms the fix works
}

View File

@@ -25,11 +25,9 @@ use rustfs_ecstore::{
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
};
use serial_test::serial;
use std::{
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use std::sync::Once;
use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::info;

View File

@@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{sync::Arc, time::Duration};
use tempfile::TempDir;
use rustfs_ahm::scanner::{
io_throttler::MetricsSnapshot,
local_stats::StatsSummary,
node_scanner::{LoadLevel, NodeScanner, NodeScannerConfig},
stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, NodeInfo},
};
use scanner_optimization_tests::{PerformanceBenchmark, create_test_scanner};
use std::{sync::Arc, time::Duration};
use tempfile::TempDir;
mod scanner_optimization_tests;
use scanner_optimization_tests::{PerformanceBenchmark, create_test_scanner};
#[tokio::test]
async fn test_end_to_end_scanner_lifecycle() {
let temp_dir = TempDir::new().unwrap();
@@ -242,32 +245,21 @@ async fn test_performance_impact_measurement() {
io_monitor.start().await.unwrap();
// Baseline test: no scanner load - measure multiple times for stability
const MEASUREMENT_COUNT: usize = 5;
let mut baseline_measurements = Vec::new();
for _ in 0..MEASUREMENT_COUNT {
let duration = measure_workload(10_000, Duration::ZERO).await;
baseline_measurements.push(duration);
}
// Use median to reduce impact of outliers
baseline_measurements.sort();
let median_idx = baseline_measurements.len() / 2;
let baseline_duration = baseline_measurements[median_idx].max(Duration::from_millis(20));
// Baseline test: no scanner load
let baseline_duration = measure_workload(5_000, Duration::ZERO).await.max(Duration::from_millis(10));
// Simulate scanner activity
scanner.update_business_metrics(50, 500, 0, 25).await;
tokio::time::sleep(Duration::from_millis(200)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Performance test: with scanner load - measure multiple times for stability
let mut scanner_measurements = Vec::new();
for _ in 0..MEASUREMENT_COUNT {
let duration = measure_workload(10_000, Duration::ZERO).await;
scanner_measurements.push(duration);
}
scanner_measurements.sort();
let median_idx = scanner_measurements.len() / 2;
let with_scanner_duration = scanner_measurements[median_idx].max(baseline_duration);
// Performance test: with scanner load
let with_scanner_duration_raw = measure_workload(5_000, Duration::from_millis(2)).await;
let with_scanner_duration = if with_scanner_duration_raw <= baseline_duration {
baseline_duration + Duration::from_millis(2)
} else {
with_scanner_duration_raw
};
// Calculate performance impact
let baseline_ns = baseline_duration.as_nanos().max(1) as f64;
@@ -289,9 +281,8 @@ async fn test_performance_impact_measurement() {
println!(" Impact percentage: {impact_percentage:.2}%");
println!(" Meets optimization goals: {}", benchmark.meets_optimization_goals());
// Verify optimization target (business impact < 50%)
// Note: In test environment, allow higher threshold due to system load variability
// In production, the actual impact should be much lower (< 10%)
// Verify optimization target (business impact < 10%)
// Note: In real environment this test may need longer time and real load
assert!(impact_percentage < 50.0, "Performance impact too high: {impact_percentage:.2}%");
io_monitor.stop().await;

View File

@@ -23,16 +23,16 @@ use rustfs_ecstore::{
store_api::{MakeBucketOptions, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader, StorageAPI},
};
use serial_test::serial;
use std::{
borrow::Cow,
path::PathBuf,
sync::{Arc, Once, OnceLock},
};
//use heed_traits::Comparator;
use time::OffsetDateTime;
use std::borrow::Cow;
use std::sync::Once;
use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use tracing::warn;
use tracing::{debug, info};
//use heed_traits::Comparator;
use time::OffsetDateTime;
use uuid::Uuid;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();

View File

@@ -24,11 +24,9 @@ use rustfs_ecstore::{
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use serial_test::serial;
use std::{
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use std::sync::Once;
use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::info;

View File

@@ -12,23 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs, net::SocketAddr, sync::Arc, sync::OnceLock, time::Duration};
use tempfile::TempDir;
use tokio_util::sync::CancellationToken;
use serial_test::serial;
use rustfs_ahm::heal::manager::HealConfig;
use rustfs_ahm::scanner::{
Scanner,
data_scanner::ScanMode,
node_scanner::{LoadLevel, NodeScanner, NodeScannerConfig},
};
use rustfs_ecstore::disk::endpoint::Endpoint;
use rustfs_ecstore::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints};
use rustfs_ecstore::store::ECStore;
use rustfs_ecstore::{
StorageAPI,
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{MakeBucketOptions, ObjectIO, PutObjReader},
};
use serial_test::serial;
use std::{fs, net::SocketAddr, sync::Arc, sync::OnceLock, time::Duration};
use tempfile::TempDir;
use tokio_util::sync::CancellationToken;
// Global test environment cache to avoid repeated initialization
static GLOBAL_TEST_ENV: OnceLock<(Vec<std::path::PathBuf>, Arc<ECStore>)> = OnceLock::new();

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use tempfile::TempDir;
use rustfs_ahm::scanner::{
checkpoint::{CheckpointData, CheckpointManager},
io_monitor::{AdvancedIOMonitor, IOMonitorConfig},
@@ -20,8 +23,6 @@ use rustfs_ahm::scanner::{
node_scanner::{LoadLevel, NodeScanner, NodeScannerConfig, ScanProgress},
stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig},
};
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_checkpoint_manager_save_and_load() {

View File

@@ -28,6 +28,7 @@ categories = ["web-programming", "development-tools", "data-structures"]
workspace = true
[dependencies]
lazy_static = { workspace = true}
tokio = { workspace = true }
tonic = { workspace = true }
uuid = { workspace = true }

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::last_minute::{self};
use std::collections::HashMap;
use crate::last_minute::{self};
pub struct ReplicationLatency {
// Delays for single and multipart PUT requests
upload_histogram: last_minute::LastMinuteHistogram,

View File

@@ -14,10 +14,10 @@
use path_clean::PathClean;
use serde::{Deserialize, Serialize};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::path::Path;
use std::{
collections::{HashMap, HashSet},
hash::{DefaultHasher, Hash, Hasher},
path::Path,
time::SystemTime,
};

View File

@@ -16,6 +16,7 @@
use std::collections::HashMap;
use std::sync::LazyLock;
use tokio::sync::RwLock;
use tonic::transport::Channel;

View File

@@ -18,7 +18,7 @@ use std::{
fmt::{self, Display},
sync::OnceLock,
};
use tokio::sync::{broadcast, mpsc};
use tokio::sync::mpsc;
use uuid::Uuid;
pub const HEAL_DELETE_DANGLING: bool = true;
@@ -192,11 +192,6 @@ pub type HealChannelReceiver = mpsc::UnboundedReceiver<HealChannelCommand>;
/// Global heal channel sender
static GLOBAL_HEAL_CHANNEL_SENDER: OnceLock<HealChannelSender> = OnceLock::new();
type HealResponseSender = broadcast::Sender<HealChannelResponse>;
/// Global heal response broadcaster
static GLOBAL_HEAL_RESPONSE_SENDER: OnceLock<HealResponseSender> = OnceLock::new();
/// Initialize global heal channel
pub fn init_heal_channel() -> HealChannelReceiver {
let (tx, rx) = mpsc::unbounded_channel();
@@ -223,23 +218,6 @@ pub async fn send_heal_command(command: HealChannelCommand) -> Result<(), String
}
}
fn heal_response_sender() -> &'static HealResponseSender {
GLOBAL_HEAL_RESPONSE_SENDER.get_or_init(|| {
let (tx, _rx) = broadcast::channel(1024);
tx
})
}
/// Publish a heal response to subscribers.
pub fn publish_heal_response(response: HealChannelResponse) -> Result<(), broadcast::error::SendError<HealChannelResponse>> {
heal_response_sender().send(response).map(|_| ())
}
/// Subscribe to heal responses.
pub fn subscribe_heal_responses() -> broadcast::Receiver<HealChannelResponse> {
heal_response_sender().subscribe()
}
/// Send heal start request
pub async fn send_heal_request(request: HealChannelRequest) -> Result<(), String> {
send_heal_command(HealChannelCommand::Start(request)).await
@@ -437,20 +415,3 @@ pub async fn send_heal_disk(set_disk_id: String, priority: Option<HealChannelPri
};
send_heal_request(req).await
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn heal_response_broadcast_reaches_subscriber() {
let mut receiver = subscribe_heal_responses();
let response = create_heal_response("req-1".to_string(), true, None, None);
publish_heal_response(response.clone()).expect("publish should succeed");
let received = receiver.recv().await.expect("should receive heal response");
assert_eq!(received.request_id, response.request_id);
assert!(received.success);
}
}

View File

@@ -27,11 +27,11 @@ struct TimedAction {
#[allow(dead_code)]
impl TimedAction {
// Avg returns the average time spent on the action.
pub fn avg(&self) -> Option<Duration> {
pub fn avg(&self) -> Option<std::time::Duration> {
if self.count == 0 {
return None;
}
Some(Duration::from_nanos(self.acc_time / self.count))
Some(std::time::Duration::from_nanos(self.acc_time / self.count))
}
// AvgBytes returns the average bytes processed.
@@ -860,7 +860,7 @@ impl LastMinuteHistogram {
}
}
pub fn add(&mut self, size: i64, t: Duration) {
pub fn add(&mut self, size: i64, t: std::time::Duration) {
let index = size_to_tag(size);
self.histogram[index].add(&t);
}

View File

@@ -12,21 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::last_minute::{AccElem, LastMinuteLatency};
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use std::{
collections::HashMap,
fmt::Display,
pin::Pin,
sync::{
Arc, OnceLock,
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use tokio::sync::{Mutex, RwLock};
use crate::last_minute::{AccElem, LastMinuteLatency};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IlmAction {
NoneAction = 0,
@@ -71,10 +73,8 @@ impl Display for IlmAction {
}
}
pub static GLOBAL_METRICS: OnceLock<Arc<Metrics>> = OnceLock::new();
pub fn global_metrics() -> &'static Arc<Metrics> {
GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new()))
lazy_static! {
pub static ref globalMetrics: Arc<Metrics> = Arc::new(Metrics::new());
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
@@ -294,13 +294,13 @@ impl Metrics {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task for this)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
globalMetrics.latency[metric_index].add(duration).await;
});
}
@@ -319,13 +319,13 @@ impl Metrics {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics with size (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add_size(duration, size).await;
globalMetrics.latency[metric_index].add_size(duration, size).await;
});
}
}
@@ -339,13 +339,13 @@ impl Metrics {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
globalMetrics.latency[metric_index].add(duration).await;
});
}
}
@@ -360,13 +360,13 @@ impl Metrics {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(count as u64, Ordering::Relaxed);
globalMetrics.operations[metric].fetch_add(count as u64, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
globalMetrics.latency[metric_index].add(duration).await;
});
}
})
@@ -384,8 +384,8 @@ impl Metrics {
Box::new(move || {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
tokio::spawn(async move {
global_metrics().actions[a_clone].fetch_add(versions, Ordering::Relaxed);
global_metrics().actions_latency[a_clone].add(duration).await;
globalMetrics.actions[a_clone].fetch_add(versions, Ordering::Relaxed);
globalMetrics.actions_latency[a_clone].add(duration).await;
});
})
})
@@ -395,11 +395,11 @@ impl Metrics {
pub async fn inc_time(metric: Metric, duration: Duration) {
let metric = metric as usize;
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics
if (metric) < Metric::LastRealtime as usize {
global_metrics().latency[metric].add(duration).await;
globalMetrics.latency[metric].add(duration).await;
}
}
@@ -501,7 +501,7 @@ pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn,
let tracker_clone = Arc::clone(&tracker);
let disk_clone = disk_name.clone();
tokio::spawn(async move {
global_metrics().current_paths.write().await.insert(disk_clone, tracker_clone);
globalMetrics.current_paths.write().await.insert(disk_clone, tracker_clone);
});
let update_fn = {
@@ -520,7 +520,7 @@ pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn,
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
global_metrics().current_paths.write().await.remove(&disk_name);
globalMetrics.current_paths.write().await.remove(&disk_name);
})
})
};

View File

@@ -16,8 +16,8 @@
//! This module defines the configuration for audit systems, including
//! webhook and MQTT audit-related settings.
mod mqtt;
mod webhook;
pub(crate) mod mqtt;
pub(crate) mod webhook;
pub use mqtt::*;
pub use webhook::*;

View File

@@ -151,8 +151,8 @@ pub const DEFAULT_LOG_KEEP_FILES: u16 = 30;
/// This is the default log local logging enabled for rustfs.
/// It is used to enable or disable local logging of the application.
/// Default value: false
/// Environment variable: RUSTFS_OBS_LOGL_STDOUT_ENABLED
pub const DEFAULT_OBS_LOG_STDOUT_ENABLED: bool = false;
/// Environment variable: RUSTFS_OBS_LOCAL_LOGGING_ENABLED
pub const DEFAULT_LOG_LOCAL_LOGGING_ENABLED: bool = false;
/// Constant representing 1 Kibibyte (1024 bytes)
/// Default value: 1024

View File

@@ -22,7 +22,7 @@ pub const ENV_OBS_SERVICE_NAME: &str = "RUSTFS_OBS_SERVICE_NAME";
pub const ENV_OBS_SERVICE_VERSION: &str = "RUSTFS_OBS_SERVICE_VERSION";
pub const ENV_OBS_ENVIRONMENT: &str = "RUSTFS_OBS_ENVIRONMENT";
pub const ENV_OBS_LOGGER_LEVEL: &str = "RUSTFS_OBS_LOGGER_LEVEL";
pub const ENV_OBS_LOG_STDOUT_ENABLED: &str = "RUSTFS_OBS_LOG_STDOUT_ENABLED";
pub const ENV_OBS_LOCAL_LOGGING_ENABLED: &str = "RUSTFS_OBS_LOCAL_LOGGING_ENABLED";
pub const ENV_OBS_LOG_DIRECTORY: &str = "RUSTFS_OBS_LOG_DIRECTORY";
pub const ENV_OBS_LOG_FILENAME: &str = "RUSTFS_OBS_LOG_FILENAME";
pub const ENV_OBS_LOG_ROTATION_SIZE_MB: &str = "RUSTFS_OBS_LOG_ROTATION_SIZE_MB";
@@ -47,6 +47,12 @@ pub const DEFAULT_OBS_LOG_MESSAGE_CAPA: usize = 32768;
/// Default values for flush interval in milliseconds
pub const DEFAULT_OBS_LOG_FLUSH_MS: u64 = 200;
/// Audit logger queue capacity environment variable key
pub const ENV_AUDIT_LOGGER_QUEUE_CAPACITY: &str = "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY";
/// Default values for observability configuration
pub const DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY: usize = 10000;
/// Default values for observability configuration
// ### Supported Environment Values
// - `production` - Secure file-only logging
@@ -72,16 +78,18 @@ mod tests {
assert_eq!(ENV_OBS_SERVICE_VERSION, "RUSTFS_OBS_SERVICE_VERSION");
assert_eq!(ENV_OBS_ENVIRONMENT, "RUSTFS_OBS_ENVIRONMENT");
assert_eq!(ENV_OBS_LOGGER_LEVEL, "RUSTFS_OBS_LOGGER_LEVEL");
assert_eq!(ENV_OBS_LOG_STDOUT_ENABLED, "RUSTFS_OBS_LOG_STDOUT_ENABLED");
assert_eq!(ENV_OBS_LOCAL_LOGGING_ENABLED, "RUSTFS_OBS_LOCAL_LOGGING_ENABLED");
assert_eq!(ENV_OBS_LOG_DIRECTORY, "RUSTFS_OBS_LOG_DIRECTORY");
assert_eq!(ENV_OBS_LOG_FILENAME, "RUSTFS_OBS_LOG_FILENAME");
assert_eq!(ENV_OBS_LOG_ROTATION_SIZE_MB, "RUSTFS_OBS_LOG_ROTATION_SIZE_MB");
assert_eq!(ENV_OBS_LOG_ROTATION_TIME, "RUSTFS_OBS_LOG_ROTATION_TIME");
assert_eq!(ENV_OBS_LOG_KEEP_FILES, "RUSTFS_OBS_LOG_KEEP_FILES");
assert_eq!(ENV_AUDIT_LOGGER_QUEUE_CAPACITY, "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY");
}
#[test]
fn test_default_values() {
assert_eq!(DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY, 10000);
assert_eq!(DEFAULT_OBS_ENVIRONMENT_PRODUCTION, "production");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test");

View File

@@ -18,24 +18,7 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use crate::bucket::lifecycle::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc};
use crate::bucket::lifecycle::lifecycle::{self, ExpirationOptions, Lifecycle, TransitionOptions};
use crate::bucket::lifecycle::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats};
use crate::bucket::lifecycle::tier_sweeper::{Jentry, delete_object_from_remote_tier};
use crate::bucket::object_lock::objectlock_sys::enforce_retention_for_deletion;
use crate::bucket::{metadata_sys::get_lifecycle_config, versioning_sys::BucketVersioningSys};
use crate::client::object_api_utils::new_getobjectreader;
use crate::error::Error;
use crate::error::StorageError;
use crate::error::{error_resp_to_object_err, is_err_object_not_found, is_err_version_not_found, is_network_or_host_down};
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id};
use crate::store::ECStore;
use crate::store_api::StorageAPI;
use crate::store_api::{GetObjectReader, HTTPRangeSpec, ObjectInfo, ObjectOptions, ObjectToDelete};
use crate::tier::warm_backend::WarmBackendGetOpts;
use async_channel::{Receiver as A_Receiver, Sender as A_Sender, bounded};
use bytes::BytesMut;
use futures::Future;
@@ -44,15 +27,10 @@ use lazy_static::lazy_static;
use rustfs_common::data_usage::TierStats;
use rustfs_common::heal_channel::rep_has_active_rules;
use rustfs_common::metrics::{IlmAction, Metrics};
use rustfs_filemeta::{NULL_VERSION_ID, RestoreStatusOps, is_restored_object_on_disk};
use rustfs_filemeta::fileinfo::{NULL_VERSION_ID, RestoreStatusOps, is_restored_object_on_disk};
use rustfs_utils::path::encode_dir_object;
use rustfs_utils::string::strings_has_prefix_fold;
use s3s::Body;
use s3s::dto::{
BucketLifecycleConfiguration, DefaultRetention, ReplicationConfiguration, RestoreRequest, RestoreRequestType, RestoreStatus,
ServerSideEncryption, Timestamp,
};
use s3s::header::{X_AMZ_RESTORE, X_AMZ_SERVER_SIDE_ENCRYPTION, X_AMZ_STORAGE_CLASS};
use sha2::{Digest, Sha256};
use std::any::Any;
use std::collections::HashMap;
@@ -69,6 +47,31 @@ use tracing::{debug, error, info};
use uuid::Uuid;
use xxhash_rust::xxh64;
//use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
//use rustfs_notify::{initialize, notification_system};
use super::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc};
use super::lifecycle::{self, ExpirationOptions, Lifecycle, TransitionOptions};
use super::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats};
use super::tier_sweeper::{Jentry, delete_object_from_remote_tier};
use crate::bucket::object_lock::objectlock_sys::enforce_retention_for_deletion;
use crate::bucket::{metadata_sys::get_lifecycle_config, versioning_sys::BucketVersioningSys};
use crate::client::object_api_utils::new_getobjectreader;
use crate::error::Error;
use crate::error::{error_resp_to_object_err, is_err_object_not_found, is_err_version_not_found, is_network_or_host_down};
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id};
use crate::store::ECStore;
use crate::store_api::StorageAPI;
use crate::store_api::{GetObjectReader, HTTPRangeSpec, ObjectInfo, ObjectOptions, ObjectToDelete};
use crate::tier::warm_backend::WarmBackendGetOpts;
use s3s::dto::{
BucketLifecycleConfiguration, DefaultRetention, ReplicationConfiguration, RestoreRequest, RestoreRequestType, RestoreStatus,
ServerSideEncryption, Timestamp,
};
use s3s::header::{X_AMZ_RESTORE, X_AMZ_SERVER_SIDE_ENCRYPTION, X_AMZ_STORAGE_CLASS};
pub type TimeFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type TraceFn =
Arc<dyn Fn(String, HashMap<String, String>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;

View File

@@ -21,12 +21,13 @@
use http::HeaderMap;
use s3s::dto::ETag;
use std::pin::Pin;
use std::{collections::HashMap, io::Cursor, sync::Arc};
use tokio::io::BufReader;
use crate::error::ErrorResponse;
use crate::store_api::{GetObjectReader, HTTPRangeSpec, ObjectInfo, ObjectOptions};
use rustfs_filemeta::ObjectPartInfo;
use rustfs_filemeta::fileinfo::ObjectPartInfo;
use rustfs_rio::HashReader;
use s3s::S3ErrorCode;

View File

@@ -1085,9 +1085,16 @@ impl LocalDisk {
*item = "".to_owned();
if entry.ends_with(STORAGE_FORMAT_FILE) {
let metadata = self
let metadata = match self
.read_metadata(self.get_object_path(bucket, format!("{}/{}", &current, &entry).as_str())?)
.await?;
.await
{
Ok(res) => res,
Err(err) => {
warn!("scan dir read_metadata error, continue {:?}", err);
continue;
}
};
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
let name = entry.trim_end_matches(SLASH_SEPARATOR);
@@ -1189,6 +1196,9 @@ impl LocalDisk {
// }
}
Err(err) => {
if err == Error::DiskNotDir {
continue;
}
if err == Error::FileNotFound || err == Error::IsNotRegular {
// NOT an object, append to stack (with slash)
// If dirObject, but no metadata (which is unexpected) we skip it.
@@ -1203,7 +1213,7 @@ impl LocalDisk {
};
}
while let Some(dir) = dir_stack.pop() {
while let Some(dir) = dir_stack.last() {
if opts.limit > 0 && *objs_returned >= opts.limit {
return Ok(());
}
@@ -1215,10 +1225,11 @@ impl LocalDisk {
.await?;
if opts.recursive {
if let Err(er) = Box::pin(self.scan_dir(dir, prefix.clone(), opts, out, objs_returned)).await {
if let Err(er) = Box::pin(self.scan_dir(dir.clone(), prefix.clone(), opts, out, objs_returned)).await {
warn!("scan_dir err {:?}", &er);
}
}
dir_stack.pop();
}
Ok(())

View File

@@ -25,7 +25,6 @@ use tracing::warn;
use super::error::DiskError;
/// Check path length according to OS limits.
pub fn check_path_length(path_name: &str) -> Result<()> {
// Apple OS X path length is limited to 1016
if cfg!(target_os = "macos") && path_name.len() > 1016 {
@@ -65,10 +64,6 @@ pub fn check_path_length(path_name: &str) -> Result<()> {
Ok(())
}
/// Check if the given disk path is the root disk.
/// On Windows, always return false.
/// On Unix, compare the disk paths.
#[tracing::instrument(level = "debug", skip_all)]
pub fn is_root_disk(disk_path: &str, root_disk: &str) -> Result<bool> {
if cfg!(target_os = "windows") {
return Ok(false);
@@ -77,8 +72,6 @@ pub fn is_root_disk(disk_path: &str, root_disk: &str) -> Result<bool> {
rustfs_utils::os::same_disk(disk_path, root_disk).map_err(|e| to_file_error(e).into())
}
/// Create a directory and all its parent components if they are missing.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn make_dir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Path>) -> Result<()> {
check_path_length(path.as_ref().to_string_lossy().to_string().as_str())?;
@@ -89,16 +82,11 @@ pub async fn make_dir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Path>) ->
Ok(())
}
/// Check if a directory is empty.
/// Only reads one entry to determine if the directory is empty.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn is_empty_dir(path: impl AsRef<Path>) -> bool {
read_dir(path.as_ref(), 1).await.is_ok_and(|v| v.is_empty())
}
// read_dir count read limit. when count == 0 unlimit.
/// Return file names in the directory.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> std::io::Result<Vec<String>> {
let mut entries = fs::read_dir(path.as_ref()).await?;
@@ -209,10 +197,6 @@ pub async fn reliable_mkdir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Pat
Ok(())
}
/// Create a directory and all its parent components if they are missing.
/// Without recursion support, fall back to create_dir_all
/// This function will not create directories under base_dir.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>) -> io::Result<()> {
if !base_dir.as_ref().to_string_lossy().is_empty() && base_dir.as_ref().starts_with(dir_path.as_ref()) {
return Ok(());
@@ -241,9 +225,6 @@ pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>
Ok(())
}
/// Check if a file exists.
/// Returns true if the file exists, false otherwise.
#[tracing::instrument(level = "debug", skip_all)]
pub fn file_exists(path: impl AsRef<Path>) -> bool {
std::fs::metadata(path.as_ref()).map(|_| true).unwrap_or(false)
}

View File

@@ -12,23 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use chrono::Utc;
use rustfs_common::{
globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr},
heal_channel::DriveState,
metrics::globalMetrics,
};
use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};
use rustfs_utils::os::get_drive_stats;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};
use crate::{
admin_server_info::get_local_server_property,
new_object_layer_fn,
store_api::StorageAPI,
// utils::os::get_drive_stats,
};
use chrono::Utc;
use rustfs_common::{
globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr},
heal_channel::DriveState,
metrics::global_metrics,
};
use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};
use rustfs_utils::os::get_drive_stats;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info};
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct CollectMetricsOpts {
@@ -116,7 +118,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts)
if types.contains(&MetricType::SCANNER) {
debug!("start get scanner metrics");
let metrics = global_metrics().report().await;
let metrics = globalMetrics.report().await;
real_time_metrics.aggregated.scanner = Some(metrics);
}

View File

@@ -72,7 +72,8 @@ use tokio::select;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, info};
use tracing::{error, warn};
use uuid::Uuid;
const MAX_UPLOADS_LIST: usize = 10000;
@@ -109,7 +110,7 @@ pub struct ECStore {
impl ECStore {
#[allow(clippy::new_ret_no_self)]
#[instrument(level = "debug", skip(endpoint_pools))]
#[tracing::instrument(level = "debug", skip(endpoint_pools))]
pub async fn new(address: SocketAddr, endpoint_pools: EndpointServerPools, ctx: CancellationToken) -> Result<Arc<Self>> {
// let layouts = DisksLayout::from_volumes(endpoints.as_slice())?;
@@ -274,7 +275,6 @@ impl ECStore {
Ok(ec)
}
#[instrument(level = "debug", skip(self, rx))]
pub async fn init(self: &Arc<Self>, rx: CancellationToken) -> Result<()> {
GLOBAL_BOOT_TIME.get_or_init(|| async { SystemTime::now() }).await;
@@ -461,7 +461,6 @@ impl ECStore {
// Ok(ress)
// }
#[instrument(level = "debug", skip(self))]
async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> {
let mut futures = Vec::new();
for sets in self.pools.iter() {
@@ -1078,7 +1077,7 @@ impl Clone for PoolObjInfo {
#[async_trait::async_trait]
impl ObjectIO for ECStore {
#[instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn get_object_reader(
&self,
bucket: &str,
@@ -1108,7 +1107,7 @@ impl ObjectIO for ECStore {
.get_object_reader(bucket, object.as_str(), range, h, &opts)
.await
}
#[instrument(level = "debug", skip(self, data))]
#[tracing::instrument(level = "debug", skip(self, data))]
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
check_put_object_args(bucket, object)?;
@@ -1145,7 +1144,7 @@ lazy_static! {
#[async_trait::async_trait]
impl StorageAPI for ECStore {
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
let (standard_sc_parity, rr_sc_parity) = {
if let Some(sc) = GLOBAL_STORAGE_CLASS.get() {
@@ -1190,7 +1189,7 @@ impl StorageAPI for ECStore {
..Default::default()
}
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn storage_info(&self) -> rustfs_madmin::StorageInfo {
let Some(notification_sy) = get_global_notification_sys() else {
return rustfs_madmin::StorageInfo::default();
@@ -1198,7 +1197,7 @@ impl StorageAPI for ECStore {
notification_sy.storage_info(self).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn local_storage_info(&self) -> rustfs_madmin::StorageInfo {
let mut futures = Vec::with_capacity(self.pools.len());
@@ -1218,7 +1217,7 @@ impl StorageAPI for ECStore {
rustfs_madmin::StorageInfo { backend, disks }
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
if !is_meta_bucketname(bucket) {
if let Err(err) = check_valid_bucket_name_strict(bucket) {
@@ -1266,7 +1265,7 @@ impl StorageAPI for ECStore {
Ok(())
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let mut info = self.peer_sys.get_bucket_info(bucket, opts).await?;
@@ -1278,7 +1277,7 @@ impl StorageAPI for ECStore {
Ok(info)
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
// TODO: opts.cached
@@ -1293,7 +1292,7 @@ impl StorageAPI for ECStore {
}
Ok(buckets)
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
if is_meta_bucketname(bucket) {
return Err(StorageError::BucketNameInvalid(bucket.to_string()));
@@ -1328,7 +1327,7 @@ impl StorageAPI for ECStore {
// @start_after as marker when continuation_token empty
// @delimiter default="/", empty when recursive
// @max_keys limit
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn list_objects_v2(
self: Arc<Self>,
bucket: &str,
@@ -1343,7 +1342,7 @@ impl StorageAPI for ECStore {
.await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn list_object_versions(
self: Arc<Self>,
bucket: &str,
@@ -1368,7 +1367,7 @@ impl StorageAPI for ECStore {
self.walk_internal(rx, bucket, prefix, result, opts).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
check_object_args(bucket, object)?;
@@ -1386,7 +1385,7 @@ impl StorageAPI for ECStore {
}
// TODO: review
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn copy_object(
&self,
src_bucket: &str,
@@ -1453,7 +1452,7 @@ impl StorageAPI for ECStore {
"put_object_reader is none".to_owned(),
))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
check_del_obj_args(bucket, object)?;
@@ -1527,7 +1526,7 @@ impl StorageAPI for ECStore {
Err(StorageError::ObjectNotFound(bucket.to_owned(), object.to_owned()))
}
// TODO: review
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn delete_objects(
&self,
bucket: &str,
@@ -1710,7 +1709,7 @@ impl StorageAPI for ECStore {
// Ok((del_objects, del_errs))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn list_object_parts(
&self,
bucket: &str,
@@ -1751,7 +1750,7 @@ impl StorageAPI for ECStore {
Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned()))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn list_multipart_uploads(
&self,
bucket: &str,
@@ -1803,7 +1802,7 @@ impl StorageAPI for ECStore {
})
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult> {
check_new_multipart_args(bucket, object)?;
@@ -1835,7 +1834,7 @@ impl StorageAPI for ECStore {
self.pools[idx].new_multipart_upload(bucket, object, opts).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn add_partial(&self, bucket: &str, object: &str, version_id: &str) -> Result<()> {
let object = encode_dir_object(object);
@@ -1850,7 +1849,7 @@ impl StorageAPI for ECStore {
let _ = self.pools[idx].add_partial(bucket, object.as_str(), version_id).await;
Ok(())
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn transition_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
let object = encode_dir_object(object);
if self.single_pool() {
@@ -1864,7 +1863,7 @@ impl StorageAPI for ECStore {
self.pools[idx].transition_object(bucket, &object, opts).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn restore_transitioned_object(self: Arc<Self>, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
let object = encode_dir_object(object);
if self.single_pool() {
@@ -1881,7 +1880,7 @@ impl StorageAPI for ECStore {
.await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn copy_object_part(
&self,
src_bucket: &str,
@@ -1903,7 +1902,7 @@ impl StorageAPI for ECStore {
unimplemented!()
}
#[instrument(skip(self, data))]
#[tracing::instrument(skip(self, data))]
async fn put_object_part(
&self,
bucket: &str,
@@ -1945,7 +1944,7 @@ impl StorageAPI for ECStore {
Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned()))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn get_multipart_info(
&self,
bucket: &str,
@@ -1977,7 +1976,7 @@ impl StorageAPI for ECStore {
Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned()))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn abort_multipart_upload(&self, bucket: &str, object: &str, upload_id: &str, opts: &ObjectOptions) -> Result<()> {
check_abort_multipart_args(bucket, object, upload_id)?;
@@ -2008,7 +2007,7 @@ impl StorageAPI for ECStore {
Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned()))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn complete_multipart_upload(
self: Arc<Self>,
bucket: &str,
@@ -2051,7 +2050,7 @@ impl StorageAPI for ECStore {
Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned()))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn get_disks(&self, pool_idx: usize, set_idx: usize) -> Result<Vec<Option<DiskStore>>> {
if pool_idx < self.pools.len() && set_idx < self.pools[pool_idx].disk_set.len() {
self.pools[pool_idx].disk_set[set_idx].get_disks(0, 0).await
@@ -2060,7 +2059,7 @@ impl StorageAPI for ECStore {
}
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
fn set_drive_counts(&self) -> Vec<usize> {
let mut counts = vec![0; self.pools.len()];
@@ -2069,7 +2068,7 @@ impl StorageAPI for ECStore {
}
counts
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
let object = encode_dir_object(object);
if self.single_pool() {
@@ -2083,7 +2082,7 @@ impl StorageAPI for ECStore {
self.pools[idx].put_object_metadata(bucket, object.as_str(), &opts).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn get_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<String> {
let object = encode_dir_object(object);
@@ -2096,7 +2095,7 @@ impl StorageAPI for ECStore {
Ok(oi.user_tags)
}
#[instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
let object = encode_dir_object(object);
@@ -2109,7 +2108,7 @@ impl StorageAPI for ECStore {
self.pools[idx].put_object_tags(bucket, object.as_str(), tags, opts).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> {
check_del_obj_args(bucket, object)?;
@@ -2123,7 +2122,7 @@ impl StorageAPI for ECStore {
Ok(())
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn delete_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
let object = encode_dir_object(object);
@@ -2136,7 +2135,7 @@ impl StorageAPI for ECStore {
self.pools[idx].delete_object_tags(bucket, object.as_str(), opts).await
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)> {
info!("heal_format");
let mut r = HealResultItem {
@@ -2171,13 +2170,13 @@ impl StorageAPI for ECStore {
Ok((r, None))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
let res = self.peer_sys.heal_bucket(bucket, opts).await?;
Ok(res)
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn heal_object(
&self,
bucket: &str,
@@ -2254,7 +2253,7 @@ impl StorageAPI for ECStore {
Ok((HealResultItem::default(), Some(Error::FileNotFound)))
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)> {
for (pool_idx, pool) in self.pools.iter().enumerate() {
for (set_idx, set) in pool.format.erasure.sets.iter().enumerate() {
@@ -2269,7 +2268,7 @@ impl StorageAPI for ECStore {
Err(Error::DiskNotFound)
}
#[instrument(skip(self))]
#[tracing::instrument(skip(self))]
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()> {
let object = encode_dir_object(object);
if self.single_pool() {
@@ -2474,7 +2473,7 @@ fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Re
check_multipart_object_args(bucket, object, upload_id)
}
#[instrument(level = "debug")]
#[tracing::instrument(level = "debug")]
fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(StorageError::BucketNameInvalid(bucket.to_string()));
@@ -2602,6 +2601,8 @@ pub async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool>
#[cfg(test)]
mod tests {
use crate::bucket::metadata_sys::init_bucket_metadata_sys;
use super::*;
// Test validation functions
@@ -2789,4 +2790,122 @@ mod tests {
assert!(check_put_object_args("", "test-object").is_err());
assert!(check_put_object_args("test-bucket", "").is_err());
}
#[tokio::test]
async fn test_ecstore_put_and_list_objects() {
use crate::disk::endpoint::Endpoint;
use crate::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints};
use std::path::PathBuf;
use tokio::fs;
let test_base_dir = format!("/tmp/rustfs_test_put_list_{}", Uuid::new_v4());
let temp_dir = PathBuf::from(&test_base_dir);
if temp_dir.exists() {
let _ = fs::remove_dir_all(&temp_dir).await;
}
fs::create_dir_all(&temp_dir).await.expect("Failed to create test directory");
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.expect("Failed to create disk directory");
}
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let disk_str = disk_path.to_str().expect("Invalid disk path");
let mut endpoint = Endpoint::try_from(disk_str).expect("Failed to create endpoint");
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
init_local_disks(endpoint_pools.clone())
.await
.expect("Failed to initialize local disks");
let server_addr: SocketAddr = "127.0.0.1:0".parse().expect("Invalid server address");
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.expect("Failed to create ECStore");
init_bucket_metadata_sys(ecstore.clone(), vec![]).await;
let bucket_name = "test-bucket";
ecstore
.make_bucket(bucket_name, &MakeBucketOptions::default())
.await
.expect("Failed to create bucket");
let test_objects = vec![
("object1.txt", b"Hello, World!".to_vec()),
("object2.txt", b"Test data for object 2".to_vec()),
("folder/object3.txt", b"Object in folder".to_vec()),
("folder/subfolder/object4.txt", b"Nested object".to_vec()),
];
for (object_name, data) in &test_objects {
let mut reader = PutObjReader::from_vec(data.clone());
let object_info = ecstore
.put_object(bucket_name, object_name, &mut reader, &ObjectOptions::default())
.await
.unwrap_or_else(|e| panic!("Failed to put object {}: {}", object_name, e));
assert_eq!(object_info.size, data.len() as i64, "Object size mismatch for {}", object_name);
assert_eq!(object_info.bucket, bucket_name);
}
let list_result = ecstore
.clone()
.list_objects_v2(bucket_name, "", None, None, 1000, false, None)
.await
.expect("Failed to list objects");
assert_eq!(list_result.objects.len(), test_objects.len(), "Number of objects mismatch");
let mut object_names: Vec<String> = list_result.objects.iter().map(|o| o.name.clone()).collect();
object_names.sort();
let mut expected_names: Vec<String> = test_objects.iter().map(|(n, _)| n.to_string()).collect();
expected_names.sort();
assert_eq!(object_names, expected_names, "Object names mismatch");
let prefix_result = ecstore
.clone()
.list_objects_v2(bucket_name, "folder/", None, None, 1000, false, None)
.await
.expect("Failed to list objects with prefix");
assert_eq!(prefix_result.objects.len(), 2, "Should find 2 objects with prefix 'folder/'");
assert!(prefix_result.objects.iter().all(|o| o.name.starts_with("folder/")));
let delimiter_result = ecstore
.clone()
.list_objects_v2(bucket_name, "", None, Some("/".to_string()), 1000, false, None)
.await
.expect("Failed to list objects with delimiter");
assert!(!delimiter_result.prefixes.is_empty() || !delimiter_result.objects.is_empty());
let _ = fs::remove_dir_all(&temp_dir).await;
}
}

View File

@@ -134,7 +134,7 @@ pub struct GetObjectReader {
}
impl GetObjectReader {
#[tracing::instrument(level = "debug", skip(reader, rs, opts, _h))]
#[tracing::instrument(level = "debug", skip(reader))]
pub fn new(
reader: Box<dyn AsyncRead + Unpin + Send + Sync>,
rs: Option<HTTPRangeSpec>,

View File

@@ -481,7 +481,7 @@ async fn new_and_save_tiering_config<S: StorageAPI>(api: Arc<S>) -> Result<TierC
Ok(cfg)
}
#[tracing::instrument(level = "debug", name = "load_tier_config", skip(api))]
#[tracing::instrument(level = "debug")]
async fn load_tier_config(api: Arc<ECStore>) -> std::result::Result<TierConfigMgr, std::io::Error> {
let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR, TIER_CONFIG_FILE);
let data = read_config(api.clone(), config_file.as_str()).await;

View File

@@ -12,10 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/// FileMeta error type and Result alias.
/// This module defines a custom error type `Error` for handling various
/// error scenarios related to file metadata operations. It also provides
/// a `Result` type alias for convenience.
pub type Result<T> = core::result::Result<T, Error>;
#[derive(thiserror::Error, Debug)]

View File

@@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Error, ReplicationState, ReplicationStatusType, Result, TRANSITION_COMPLETE, VersionPurgeStatusType};
use super::filemeta::TRANSITION_COMPLETE;
use crate::error::{Error, Result};
use crate::{ReplicationState, ReplicationStatusType, VersionPurgeStatusType};
use bytes::Bytes;
use rmp_serde::Serializer;
use rustfs_utils::HashAlgorithm;
use rustfs_utils::http::headers::{RESERVED_METADATA_PREFIX_LOWER, RUSTFS_HEALING};
use s3s::dto::{RestoreStatus, Timestamp};
use s3s::header::X_AMZ_RESTORE;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use uuid::Uuid;

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use crate::fileinfo::{ErasureAlgo, ErasureInfo, FileInfo, FileInfoVersions, ObjectPartInfo, RawFileInfo};
use crate::filemeta_inline::InlineData;
use crate::{
ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState,
ReplicationStatusType, Result, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
ReplicationState, ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
};
use byteorder::ByteOrder;
use bytes::Bytes;
@@ -3400,7 +3402,7 @@ mod test {
("tabs", "col1\tcol2\tcol3"),
("quotes", "\"quoted\" and 'single'"),
("backslashes", "path\\to\\file"),
("mixed", "Mixed: Chinese, English, 123, !@#$%"),
("mixed", "Mixed: ChineseEnglish, 123, !@#$%"),
];
for (key, value) in special_cases {
@@ -3422,7 +3424,7 @@ mod test {
("tabs", "col1\tcol2\tcol3"),
("quotes", "\"quoted\" and 'single'"),
("backslashes", "path\\to\\file"),
("mixed", "Mixed: Chinese, English, 123, !@#$%"),
("mixed", "Mixed: ChineseEnglish, 123, !@#$%"),
] {
assert_eq!(obj2.meta_user.get(key), Some(&expected_value.to_string()));
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
mod error;
mod fileinfo;
pub mod fileinfo;
mod filemeta;
mod filemeta_inline;
// pub mod headers;

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Error, FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, merge_file_meta_versions};
use crate::error::{Error, Result};
use crate::{FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, VersionType, merge_file_meta_versions};
use rmp::Marker;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;

View File

@@ -1,17 +1,3 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Bytes;
use core::fmt;
use regex::Regex;

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{ChecksumAlgo, FileMeta, FileMetaShallowVersion, FileMetaVersion, MetaDeleteMarker, MetaObject, Result, VersionType};
use crate::error::Result;
use crate::filemeta::*;
use std::collections::HashMap;
use time::OffsetDateTime;
use uuid::Uuid;
@@ -256,7 +257,6 @@ pub fn create_xlmeta_with_inline_data() -> Result<Vec<u8>> {
#[cfg(test)]
mod tests {
use super::*;
use crate::FileMeta;
#[test]
fn test_create_real_xlmeta() {

View File

@@ -50,7 +50,6 @@ opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_ex
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-appender = { workspace = true }
tracing-error = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }

View File

@@ -13,14 +13,13 @@
// limitations under the License.
use rustfs_config::observability::{
ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL,
ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME,
ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOGGER_LEVEL,
ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT,
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME,
DEFAULT_OBS_LOG_FILENAME, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION,
USE_STDOUT,
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_LOCAL_LOGGING_ENABLED, DEFAULT_LOG_ROTATION_SIZE_MB,
DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_FILENAME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
};
use rustfs_utils::dirs::get_log_directory_to_string;
use serde::{Deserialize, Serialize};
@@ -54,15 +53,15 @@ use std::env;
/// ```
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct OtelConfig {
pub endpoint: String, // Endpoint for metric collection
pub use_stdout: Option<bool>, // Output to stdout
pub sample_ratio: Option<f64>, // Trace sampling ratio
pub meter_interval: Option<u64>, // Metric collection interval
pub service_name: Option<String>, // Service name
pub service_version: Option<String>, // Service version
pub environment: Option<String>, // Environment
pub logger_level: Option<String>, // Logger level
pub log_stdout_enabled: Option<bool>, // Stdout logging enabled
pub endpoint: String, // Endpoint for metric collection
pub use_stdout: Option<bool>, // Output to stdout
pub sample_ratio: Option<f64>, // Trace sampling ratio
pub meter_interval: Option<u64>, // Metric collection interval
pub service_name: Option<String>, // Service name
pub service_version: Option<String>, // Service version
pub environment: Option<String>, // Environment
pub logger_level: Option<String>, // Logger level
pub local_logging_enabled: Option<bool>, // Local logging enabled
// Added flexi_logger related configurations
pub log_directory: Option<String>, // LOG FILE DIRECTORY
pub log_filename: Option<String>, // The name of the log file
@@ -118,10 +117,10 @@ impl OtelConfig {
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
log_stdout_enabled: env::var(ENV_OBS_LOG_STDOUT_ENABLED)
local_logging_enabled: env::var(ENV_OBS_LOCAL_LOGGING_ENABLED)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_OBS_LOG_STDOUT_ENABLED)),
.or(Some(DEFAULT_LOG_LOCAL_LOGGING_ENABLED)),
log_directory: Some(get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY)),
log_filename: env::var(ENV_OBS_LOG_FILENAME)
.ok()

View File

@@ -36,7 +36,7 @@ use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL,
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_LOCAL_LOGGING_ENABLED, ENVIRONMENT, METER_INTERVAL,
SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
@@ -50,7 +50,6 @@ use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use tracing::info;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
@@ -75,8 +74,6 @@ pub struct OtelGuard {
logger_provider: Option<SdkLoggerProvider>,
// Add a flexi_logger handle to keep the logging alive
_flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
// WorkerGuard for writing tracing files
_tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}
// Implement debug manually and avoid relying on all fields to implement debug
@@ -87,7 +84,6 @@ impl std::fmt::Debug for OtelGuard {
.field("meter_provider", &self.meter_provider.is_some())
.field("logger_provider", &self.logger_provider.is_some())
.field("_flexi_logger_handles", &self._flexi_logger_handles.is_some())
.field("_tracing_guard", &self._tracing_guard.is_some())
.finish()
}
}
@@ -281,18 +277,13 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json()
.with_current_span(true)
.with_span_list(true);
let span_event = if is_production {
FmtSpan::CLOSE
} else {
// Only add full span events tracking in the development environment
FmtSpan::FULL
};
.with_line_number(true);
// Only add full span events tracking in the development environment
if !is_production {
layer = layer.with_span_events(FmtSpan::FULL);
}
layer = layer.with_span_events(span_event);
layer.with_filter(build_env_filter(logger_level, None))
};
@@ -304,7 +295,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
.with(if config.local_logging_enabled.unwrap_or(DEFAULT_LOG_LOCAL_LOGGING_ENABLED) {
Some(fmt_layer)
} else {
None
@@ -330,7 +321,6 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
_flexi_logger_handles: flexi_logger_handle,
_tracing_guard: None,
};
}
@@ -362,70 +352,6 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
}
}
if endpoint.is_empty() && !is_production {
// Create a file appender (rolling by day), add the -tracing suffix to the file name to avoid conflicts
// let file_appender = tracing_appender::rolling::hourly(log_directory, format!("{log_filename}-tracing.log"));
let file_appender = RollingFileAppender::builder()
.rotation(Rotation::HOURLY) // rotate log files once every hour
.filename_prefix(format!("{log_filename}-tracing")) // log file names will be prefixed with `myapp.`
.filename_suffix("log") // log file names will be suffixed with `.log`
.build(log_directory) // try to build an appender that stores log files in `/var/log`
.expect("initializing rolling file appender failed");
let (nb_writer, guard) = tracing_appender::non_blocking(file_appender);
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(nb_writer) // Specify writing file
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id
let env_filter = build_env_filter(logger_level, None);
// Use registry() to register fmt_layer directly to ensure trace_id is output to the log
tracing_subscriber::registry()
.with(env_filter)
.with(ErrorLayer::default())
.with(fmt_layer)
.with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
let stdout_fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(std::io::stdout) // Specify writing file
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id;
Some(stdout_fmt_layer)
} else {
None
})
.init();
info!("Tracing telemetry initialized for non-production with trace_id logging.");
IS_OBSERVABILITY_ENABLED.set(false).ok();
return OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: None,
_tracing_guard: Some(guard),
};
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
@@ -521,6 +447,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// Environment-aware stdout configuration
flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter);
// Only add stdout formatting and startup messages in non-production environments
if !is_production {
flexi_logger_builder = flexi_logger_builder
@@ -569,7 +496,6 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
_tracing_guard: None,
}
}

View File

@@ -79,7 +79,7 @@ tokio-stream.workspace = true
tokio-util.workspace = true
tonic = { workspace = true }
tower.workspace = true
tower-http = { workspace = true, features = ["trace", "compression-full", "cors", "catch-panic", "timeout", "limit", "request-id"] }
tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors", "catch-panic", "timeout", "limit"] }
# Serialization and Data Formats
bytes = { workspace = true }
@@ -112,7 +112,6 @@ mime_guess = { workspace = true }
pin-project-lite.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
scopeguard.workspace = true
shadow-rs = { workspace = true, features = ["build", "metadata"] }
sysinfo = { workspace = true, features = ["multithread"] }
thiserror = { workspace = true }

View File

@@ -14,31 +14,26 @@
use crate::config::build;
use crate::license::get_license;
use axum::{
Json, Router,
body::Body,
extract::Request,
middleware,
response::{IntoResponse, Response},
routing::get,
};
use axum::Json;
use axum::body::Body;
use axum::response::{IntoResponse, Response};
use axum::{Router, extract::Request, middleware, routing::get};
use axum_extra::extract::Host;
use axum_server::tls_rustls::RustlsConfig;
use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri};
use http::{HeaderMap, HeaderName, StatusCode, Uri};
use http::{HeaderValue, Method};
use mime_guess::from_path;
use rust_embed::RustEmbed;
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use serde::Serialize;
use serde_json::json;
use std::{
io::Result,
net::{IpAddr, SocketAddr},
sync::{Arc, OnceLock},
time::Duration,
};
use std::io::Result;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use tokio_rustls::rustls::ServerConfig;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::compression::CompressionLayer;
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::limit::RequestBodyLimitLayer;
use tower_http::timeout::TimeoutLayer;
@@ -279,6 +274,7 @@ async fn console_logging_middleware(req: Request, next: axum::middleware::Next)
let method = req.method().clone();
let uri = req.uri().clone();
let start = std::time::Instant::now();
let response = next.run(req).await;
let duration = start.elapsed();
@@ -413,8 +409,6 @@ fn setup_console_middleware_stack(
app = app
.layer(CatchPanicLayer::new())
.layer(TraceLayer::new_for_http())
// Compress responses
.layer(CompressionLayer::new())
.layer(middleware::from_fn(console_logging_middleware))
.layer(cors_layer)
// Add timeout layer - convert auth_timeout from seconds to Duration

View File

@@ -28,7 +28,7 @@ use std::net::SocketAddr;
use std::path::Path;
use tokio::net::lookup_host;
use tokio::time::{Duration, sleep};
use tracing::{Span, debug, error, info, warn};
use tracing::{debug, error, info, warn};
use url::Url;
#[derive(Debug, Deserialize)]
@@ -121,8 +121,6 @@ pub struct NotificationTarget {}
#[async_trait::async_trait]
impl Operation for NotificationTarget {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
// 1. Analyze query parameters
let (target_type, target_name) = extract_target_params(&params)?;
@@ -276,9 +274,6 @@ impl Operation for NotificationTarget {
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
@@ -288,8 +283,6 @@ pub struct ListNotificationTargets {}
#[async_trait::async_trait]
impl Operation for ListNotificationTargets {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
debug!("ListNotificationTargets call start request params: {:?}", req.uri.query());
// 1. Permission verification
@@ -327,9 +320,6 @@ impl Operation for ListNotificationTargets {
debug!("ListNotificationTargets call end, response data length: {}", data.len(),);
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -339,8 +329,6 @@ pub struct ListTargetsArns {}
#[async_trait::async_trait]
impl Operation for ListTargetsArns {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
debug!("ListTargetsArns call start request params: {:?}", req.uri.query());
// 1. Permission verification
@@ -376,9 +364,6 @@ impl Operation for ListTargetsArns {
debug!("ListTargetsArns call end, response data length: {}", data.len(),);
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -388,8 +373,6 @@ pub struct RemoveNotificationTarget {}
#[async_trait::async_trait]
impl Operation for RemoveNotificationTarget {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
// 1. Analyze query parameters
let (target_type, target_name) = extract_target_params(&params)?;
@@ -415,9 +398,6 @@ impl Operation for RemoveNotificationTarget {
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}

View File

@@ -184,7 +184,7 @@ async fn run(opt: config::Opt) -> Result<()> {
);
if eps.drives_per_set > 1 {
warn!(target: "rustfs::main::run","WARNING: Host local has more than 0 drives of set. A host failure will result in data becoming unavailable.");
warn!("WARNING: Host local has more than 0 drives of set. A host failure will result in data becoming unavailable.");
}
}

View File

@@ -43,9 +43,7 @@ use tokio_rustls::TlsAcceptor;
use tonic::{Request, Status, metadata::MetadataValue};
use tower::ServiceBuilder;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::compression::CompressionLayer;
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer};
use tower_http::trace::TraceLayer;
use tracing::{Span, debug, error, info, instrument, warn};
@@ -450,18 +448,11 @@ fn process_connection(
let service = hybrid(s3_service, rpc_service);
let hybrid_service = ServiceBuilder::new()
.layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
.layer(CatchPanicLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let trace_id = request
.headers()
.get(http::header::HeaderName::from_static("x-request-id"))
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");
let span = tracing::info_span!("http-request",
trace_id = %trace_id,
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
@@ -475,8 +466,7 @@ fn process_connection(
span
})
.on_request(|request: &HttpRequest<_>, span: &Span| {
let _enter = span.enter();
.on_request(|request: &HttpRequest<_>, _span: &Span| {
debug!("http started method: {}, url path: {}", request.method(), request.uri().path());
let labels = [
("key_request_method", format!("{}", request.method())),
@@ -484,31 +474,23 @@ fn process_connection(
];
counter!("rustfs_api_requests_total", &labels).increment(1);
})
.on_response(|response: &Response<_>, latency: Duration, span: &Span| {
span.record("status_code", tracing::field::display(response.status()));
let _enter = span.enter();
histogram!("request.latency.ms").record(latency.as_millis() as f64);
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, span: &Span| {
let _enter = span.enter();
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
histogram!("request.body.len").record(chunk.len() as f64);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency);
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span| {
let _enter = span.enter();
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, span: &Span| {
let _enter = span.enter();
.on_failure(|_error, latency: Duration, _span: &Span| {
counter!("rustfs_api_requests_failure_total").increment(1);
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
.layer(PropagateRequestIdLayer::x_request_id())
.layer(cors_layer)
// Compress responses
.layer(CompressionLayer::new())
.option_layer(if is_console { Some(RedirectLayer) } else { None })
.service(service);

View File

@@ -49,9 +49,9 @@ where
{
type Response = Response<HybridBody<RestBody, GrpcBody>>;
type Error = Box<dyn std::error::Error + Send + Sync>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

View File

@@ -77,7 +77,7 @@ use rustfs_ecstore::{
},
};
use rustfs_filemeta::REPLICATE_INCOMING_DELETE;
use rustfs_filemeta::{ObjectPartInfo, RestoreStatusOps};
use rustfs_filemeta::fileinfo::{ObjectPartInfo, RestoreStatusOps};
use rustfs_filemeta::{ReplicationStatusType, ReplicationType, VersionPurgeStatusType};
use rustfs_kms::{
DataKey,
@@ -2164,7 +2164,12 @@ impl S3 for FS {
} = req.input;
let prefix = prefix.unwrap_or_default();
let max_keys = max_keys.unwrap_or(1000);
let max_keys = match max_keys {
Some(v) if v > 0 && v <= 1000 => v,
Some(v) if v > 1000 => 1000,
None => 1000,
_ => return Err(s3_error!(InvalidArgument, "max-keys must be between 1 and 1000")),
};
let delimiter = delimiter.filter(|v| !v.is_empty());
let start_after = start_after.filter(|v| !v.is_empty());
@@ -4160,13 +4165,6 @@ impl S3 for FS {
let object_lock_configuration = match metadata_sys::get_object_lock_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
if err == StorageError::ConfigNotFound {
return Err(S3Error::with_message(
S3ErrorCode::ObjectLockConfigurationNotFoundError,
"Object Lock configuration does not exist for this bucket".to_string(),
));
}
debug!("get_object_lock_config err {:?}", err);
None
}

View File

@@ -58,8 +58,8 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
#export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOG_STDOUT_ENABLED=true # Whether to enable local stdout logging
export RUSTFS_OBS_LOGGER_LEVEL=debug # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB
@@ -70,7 +70,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300
#tokio runtime
export RUSTFS_RUNTIME_WORKER_THREADS=16
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
# shellcheck disable=SC2125
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60