mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
29 Commits
fix/axum-t
...
weisd/scan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1be390809d | ||
|
|
234cc3f248 | ||
|
|
bd6daf9b3f | ||
|
|
9c0a2ebf81 | ||
|
|
3567175723 | ||
|
|
b6521210f7 | ||
|
|
1c4af735ae | ||
|
|
e981e39f8a | ||
|
|
3c66561779 | ||
|
|
e012984ebd | ||
|
|
7df872cd58 | ||
|
|
68821d82f7 | ||
|
|
bd660f88a2 | ||
|
|
39dd0276b0 | ||
|
|
d99861940d | ||
|
|
9d79fb25e4 | ||
|
|
0d5f080223 | ||
|
|
314373e6a9 | ||
|
|
fef0fac62f | ||
|
|
dea457c0a7 | ||
|
|
cb0860753f | ||
|
|
87a73d9c36 | ||
|
|
d30675f376 | ||
|
|
0aad1ed6aa | ||
|
|
e437d42d31 | ||
|
|
008be7d061 | ||
|
|
8f227b2691 | ||
|
|
c0cdad2192 | ||
|
|
6466cdbc54 |
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -7639,6 +7639,7 @@ dependencies = [
|
||||
"rustfs-rio",
|
||||
"rustfs-s3select-api",
|
||||
"rustfs-s3select-query",
|
||||
"rustfs-scanner",
|
||||
"rustfs-targets",
|
||||
"rustfs-utils",
|
||||
"rustfs-zip",
|
||||
@@ -8182,6 +8183,39 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustfs-scanner"
|
||||
version = "0.0.5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"futures",
|
||||
"http 1.4.0",
|
||||
"path-clean",
|
||||
"rand 0.10.0-rc.6",
|
||||
"rmp-serde",
|
||||
"rustfs-common",
|
||||
"rustfs-config",
|
||||
"rustfs-ecstore",
|
||||
"rustfs-filemeta",
|
||||
"rustfs-madmin",
|
||||
"rustfs-utils",
|
||||
"s3s",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serial_test",
|
||||
"tempfile",
|
||||
"thiserror 2.0.17",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustfs-signer"
|
||||
version = "0.0.5"
|
||||
|
||||
@@ -35,6 +35,7 @@ members = [
|
||||
"crates/targets", # Target-specific configurations and utilities
|
||||
"crates/s3select-api", # S3 Select API interface
|
||||
"crates/s3select-query", # S3 Select query engine
|
||||
"crates/scanner", # Scanner for data integrity checks and health monitoring
|
||||
"crates/signer", # client signer
|
||||
"crates/checksums", # client checksums
|
||||
"crates/utils", # Utility functions and helpers
|
||||
@@ -88,6 +89,7 @@ rustfs-protos = { path = "crates/protos", version = "0.0.5" }
|
||||
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
|
||||
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
|
||||
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
|
||||
rustfs-scanner = { path = "crates/scanner", version = "0.0.5" }
|
||||
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
|
||||
rustfs-targets = { path = "crates/targets", version = "0.0.5" }
|
||||
rustfs-utils = { path = "crates/utils", version = "0.0.5" }
|
||||
|
||||
@@ -183,7 +183,7 @@ impl HealChannelProcessor {
|
||||
HealType::Object {
|
||||
bucket: request.bucket.clone(),
|
||||
object: prefix.clone(),
|
||||
version_id: None,
|
||||
version_id: request.object_version_id.clone(),
|
||||
}
|
||||
} else {
|
||||
HealType::Bucket {
|
||||
@@ -366,6 +366,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: None,
|
||||
object_version_id: None,
|
||||
disk: None,
|
||||
priority: HealChannelPriority::Normal,
|
||||
scan_mode: None,
|
||||
@@ -394,6 +395,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: Some("test-object".to_string()),
|
||||
object_version_id: None,
|
||||
disk: None,
|
||||
priority: HealChannelPriority::High,
|
||||
scan_mode: Some(HealScanMode::Deep),
|
||||
@@ -425,6 +427,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: None,
|
||||
object_version_id: None,
|
||||
disk: Some("pool_0_set_1".to_string()),
|
||||
priority: HealChannelPriority::Critical,
|
||||
scan_mode: None,
|
||||
@@ -453,6 +456,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: None,
|
||||
object_version_id: None,
|
||||
disk: Some("invalid-disk-id".to_string()),
|
||||
priority: HealChannelPriority::Normal,
|
||||
scan_mode: None,
|
||||
@@ -488,6 +492,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: None,
|
||||
object_version_id: None,
|
||||
disk: None,
|
||||
priority: channel_priority,
|
||||
scan_mode: None,
|
||||
@@ -516,6 +521,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: None,
|
||||
object_version_id: None,
|
||||
disk: None,
|
||||
priority: HealChannelPriority::Normal,
|
||||
scan_mode: None,
|
||||
@@ -545,6 +551,7 @@ mod tests {
|
||||
id: "test-id".to_string(),
|
||||
bucket: "test-bucket".to_string(),
|
||||
object_prefix: Some("".to_string()), // Empty prefix should be treated as bucket heal
|
||||
object_version_id: None,
|
||||
disk: None,
|
||||
priority: HealChannelPriority::Normal,
|
||||
scan_mode: None,
|
||||
|
||||
@@ -50,7 +50,7 @@ impl Display for HealItemType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum DriveState {
|
||||
Ok,
|
||||
Offline,
|
||||
@@ -59,7 +59,7 @@ pub enum DriveState {
|
||||
PermissionDenied,
|
||||
Faulty,
|
||||
RootMount,
|
||||
Unknown,
|
||||
Unknown(String),
|
||||
Unformatted, // only returned by disk
|
||||
}
|
||||
|
||||
@@ -73,12 +73,28 @@ impl DriveState {
|
||||
DriveState::PermissionDenied => "permission-denied",
|
||||
DriveState::Faulty => "faulty",
|
||||
DriveState::RootMount => "root-mount",
|
||||
DriveState::Unknown => "unknown",
|
||||
DriveState::Unknown(reason) => reason,
|
||||
DriveState::Unformatted => "unformatted",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for DriveState {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
DriveState::Unknown(reason) => DriveState::Unknown(reason.clone()),
|
||||
DriveState::Ok => DriveState::Ok,
|
||||
DriveState::Offline => DriveState::Offline,
|
||||
DriveState::Corrupt => DriveState::Corrupt,
|
||||
DriveState::Missing => DriveState::Missing,
|
||||
DriveState::PermissionDenied => DriveState::PermissionDenied,
|
||||
DriveState::Faulty => DriveState::Faulty,
|
||||
DriveState::RootMount => DriveState::RootMount,
|
||||
DriveState::Unformatted => DriveState::Unformatted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DriveState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_str())
|
||||
@@ -212,6 +228,8 @@ pub struct HealChannelRequest {
|
||||
pub bucket: String,
|
||||
/// Object prefix (optional)
|
||||
pub object_prefix: Option<String>,
|
||||
/// Object version ID (optional)
|
||||
pub object_version_id: Option<String>,
|
||||
/// Force start heal
|
||||
pub force_start: bool,
|
||||
/// Priority
|
||||
@@ -346,6 +364,7 @@ pub fn create_heal_request(
|
||||
id: Uuid::new_v4().to_string(),
|
||||
bucket,
|
||||
object_prefix,
|
||||
object_version_id: None,
|
||||
force_start,
|
||||
priority: priority.unwrap_or_default(),
|
||||
pool_index: None,
|
||||
@@ -374,6 +393,7 @@ pub fn create_heal_request_with_options(
|
||||
id: Uuid::new_v4().to_string(),
|
||||
bucket,
|
||||
object_prefix,
|
||||
object_version_id: None,
|
||||
force_start,
|
||||
priority: priority.unwrap_or_default(),
|
||||
pool_index,
|
||||
@@ -501,6 +521,7 @@ pub async fn send_heal_disk(set_disk_id: String, priority: Option<HealChannelPri
|
||||
bucket: "".to_string(),
|
||||
object_prefix: None,
|
||||
disk: Some(set_disk_id),
|
||||
object_version_id: None,
|
||||
force_start: false,
|
||||
priority: priority.unwrap_or_default(),
|
||||
pool_index: None,
|
||||
|
||||
@@ -23,5 +23,6 @@ pub(crate) mod profiler;
|
||||
pub(crate) mod protocols;
|
||||
pub(crate) mod quota;
|
||||
pub(crate) mod runtime;
|
||||
pub(crate) mod scanner;
|
||||
pub(crate) mod targets;
|
||||
pub(crate) mod tls;
|
||||
|
||||
28
crates/config/src/constants/scanner.rs
Normal file
28
crates/config/src/constants/scanner.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Environment variable name that specifies the data scanner start delay in seconds.
|
||||
/// - Purpose: Define the delay between data scanner operations.
|
||||
/// - Unit: seconds (u64).
|
||||
/// - Valid values: any positive integer.
|
||||
/// - Semantics: This delay controls how frequently the data scanner checks for and processes data; shorter delays lead to more responsive scanning but may increase system load.
|
||||
/// - Example: `export RUSTFS_DATA_SCANNER_START_DELAY_SECS=10`
|
||||
/// - Note: Choose an appropriate delay that balances scanning responsiveness with overall system performance.
|
||||
pub const ENV_DATA_SCANNER_START_DELAY_SECS: &str = "RUSTFS_DATA_SCANNER_START_DELAY_SECS";
|
||||
|
||||
/// Default data scanner start delay in seconds if not specified in the environment variable.
|
||||
/// - Value: 10 seconds.
|
||||
/// - Rationale: This default interval provides a reasonable balance between scanning responsiveness and system load for most deployments.
|
||||
/// - Adjustments: Users may modify this value via the `RUSTFS_DATA_SCANNER_START_DELAY_SECS` environment variable based on their specific scanning requirements and system performance.
|
||||
pub const DEFAULT_DATA_SCANNER_START_DELAY_SECS: u64 = 60;
|
||||
@@ -37,6 +37,8 @@ pub use constants::quota::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::runtime::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::scanner::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::targets::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::tls::*;
|
||||
|
||||
@@ -1154,10 +1154,17 @@ impl TargetClient {
|
||||
body: ByteStream,
|
||||
opts: &PutObjectOptions,
|
||||
) -> Result<(), S3ClientError> {
|
||||
let headers = opts.header();
|
||||
let mut headers = opts.header();
|
||||
|
||||
let builder = self.client.put_object();
|
||||
|
||||
let version_id = opts.internal.source_version_id.clone();
|
||||
if !version_id.is_empty()
|
||||
&& let Ok(header_value) = HeaderValue::from_str(&version_id)
|
||||
{
|
||||
headers.insert(RUSTFS_BUCKET_SOURCE_VERSION_ID, header_value);
|
||||
}
|
||||
|
||||
match builder
|
||||
.bucket(bucket)
|
||||
.key(object)
|
||||
@@ -1185,9 +1192,33 @@ impl TargetClient {
|
||||
&self,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
_opts: &PutObjectOptions,
|
||||
opts: &PutObjectOptions,
|
||||
) -> Result<String, S3ClientError> {
|
||||
match self.client.create_multipart_upload().bucket(bucket).key(object).send().await {
|
||||
let mut headers = HeaderMap::new();
|
||||
let version_id = opts.internal.source_version_id.clone();
|
||||
if !version_id.is_empty()
|
||||
&& let Ok(header_value) = HeaderValue::from_str(&version_id)
|
||||
{
|
||||
headers.insert(RUSTFS_BUCKET_SOURCE_VERSION_ID, header_value);
|
||||
}
|
||||
|
||||
match self
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket(bucket)
|
||||
.key(object)
|
||||
.customize()
|
||||
.map_request(move |mut req| {
|
||||
for (k, v) in headers.clone().into_iter() {
|
||||
let key_str = k.unwrap().as_str().to_string();
|
||||
let value_str = v.to_str().unwrap_or("").to_string();
|
||||
req.headers_mut().insert(key_str, value_str);
|
||||
}
|
||||
Result::<_, aws_smithy_types::error::operation::BuildError>::Ok(req)
|
||||
})
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(res.upload_id.unwrap_or_default()),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
|
||||
@@ -953,7 +953,7 @@ impl LifecycleOps for ObjectInfo {
|
||||
lifecycle::ObjectOpts {
|
||||
name: self.name.clone(),
|
||||
user_tags: self.user_tags.clone(),
|
||||
version_id: self.version_id.map(|v| v.to_string()).unwrap_or_default(),
|
||||
version_id: self.version_id.clone(),
|
||||
mod_time: self.mod_time,
|
||||
size: self.size as usize,
|
||||
is_latest: self.is_latest,
|
||||
@@ -1067,7 +1067,7 @@ pub async fn eval_action_from_lifecycle(
|
||||
event
|
||||
}
|
||||
|
||||
async fn apply_transition_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
|
||||
pub async fn apply_transition_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
|
||||
if oi.delete_marker || oi.is_dir {
|
||||
return false;
|
||||
}
|
||||
@@ -1161,7 +1161,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
|
||||
true
|
||||
}
|
||||
|
||||
async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
|
||||
pub async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
|
||||
let mut expiry_state = GLOBAL_ExpiryState.write().await;
|
||||
expiry_state.enqueue_by_days(oi, event, src).await;
|
||||
true
|
||||
|
||||
191
crates/ecstore/src/bucket/lifecycle/evaluator.rs
Normal file
191
crates/ecstore/src/bucket/lifecycle/evaluator.rs
Normal file
@@ -0,0 +1,191 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use s3s::dto::{
|
||||
BucketLifecycleConfiguration, ObjectLockConfiguration, ObjectLockEnabled, ObjectLockLegalHoldStatus, ObjectLockRetentionMode,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::info;
|
||||
|
||||
use crate::bucket::lifecycle::lifecycle::{Event, Lifecycle, ObjectOpts};
|
||||
use crate::bucket::object_lock::ObjectLockStatusExt;
|
||||
use crate::bucket::object_lock::objectlock::{get_object_legalhold_meta, get_object_retention_meta, utc_now_ntp};
|
||||
use crate::bucket::replication::ReplicationConfig;
|
||||
use rustfs_common::metrics::IlmAction;
|
||||
|
||||
/// Evaluator - evaluates lifecycle policy on objects for the given lifecycle
|
||||
/// configuration, lock retention configuration and replication configuration.
|
||||
pub struct Evaluator {
|
||||
policy: Arc<BucketLifecycleConfiguration>,
|
||||
lock_retention: Option<Arc<ObjectLockConfiguration>>,
|
||||
repl_cfg: Option<Arc<ReplicationConfig>>,
|
||||
}
|
||||
|
||||
impl Evaluator {
|
||||
/// NewEvaluator - creates a new evaluator with the given lifecycle
|
||||
pub fn new(policy: Arc<BucketLifecycleConfiguration>) -> Self {
|
||||
Self {
|
||||
policy,
|
||||
lock_retention: None,
|
||||
repl_cfg: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// WithLockRetention - sets the lock retention configuration for the evaluator
|
||||
pub fn with_lock_retention(mut self, lr: Option<Arc<ObjectLockConfiguration>>) -> Self {
|
||||
self.lock_retention = lr;
|
||||
self
|
||||
}
|
||||
|
||||
/// WithReplicationConfig - sets the replication configuration for the evaluator
|
||||
pub fn with_replication_config(mut self, rcfg: Option<Arc<ReplicationConfig>>) -> Self {
|
||||
self.repl_cfg = rcfg;
|
||||
self
|
||||
}
|
||||
|
||||
/// IsPendingReplication checks if the object is pending replication.
|
||||
pub fn is_pending_replication(&self, obj: &ObjectOpts) -> bool {
|
||||
use crate::bucket::replication::ReplicationConfigurationExt;
|
||||
if self.repl_cfg.is_none() {
|
||||
return false;
|
||||
}
|
||||
if let Some(rcfg) = &self.repl_cfg
|
||||
&& rcfg
|
||||
.config
|
||||
.as_ref()
|
||||
.is_some_and(|config| config.has_active_rules(obj.name.as_str(), true))
|
||||
&& !obj.version_purge_status.is_empty()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// IsObjectLocked checks if it is appropriate to remove an
|
||||
/// object according to locking configuration when this is lifecycle/ bucket quota asking.
|
||||
/// (copied over from enforceRetentionForDeletion)
|
||||
pub fn is_object_locked(&self, obj: &ObjectOpts) -> bool {
|
||||
if self.lock_retention.as_ref().is_none_or(|v| {
|
||||
v.object_lock_enabled
|
||||
.as_ref()
|
||||
.is_none_or(|v| v.as_str() != ObjectLockEnabled::ENABLED)
|
||||
}) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if obj.delete_marker {
|
||||
return false;
|
||||
}
|
||||
|
||||
let lhold = get_object_legalhold_meta(obj.user_defined.clone());
|
||||
if lhold
|
||||
.status
|
||||
.is_some_and(|v| v.valid() && v.as_str() == ObjectLockLegalHoldStatus::ON)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
let ret = get_object_retention_meta(obj.user_defined.clone());
|
||||
if ret
|
||||
.mode
|
||||
.is_some_and(|v| matches!(v.as_str(), ObjectLockRetentionMode::COMPLIANCE | ObjectLockRetentionMode::GOVERNANCE))
|
||||
{
|
||||
let t = utc_now_ntp();
|
||||
if let Some(retain_until) = ret.retain_until_date
|
||||
&& OffsetDateTime::from(retain_until).gt(&t)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// eval will return a lifecycle event for each object in objs for a given time.
|
||||
async fn eval_inner(&self, objs: &[ObjectOpts], now: OffsetDateTime) -> Vec<Event> {
|
||||
let mut events = vec![Event::default(); objs.len()];
|
||||
let mut newer_noncurrent_versions = 0;
|
||||
|
||||
'top_loop: {
|
||||
for (i, obj) in objs.iter().enumerate() {
|
||||
let mut event = self.policy.eval_inner(obj, now, newer_noncurrent_versions).await;
|
||||
match event.action {
|
||||
IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => {
|
||||
// Skip if bucket has object locking enabled; To prevent the
|
||||
// possibility of violating an object retention on one of the
|
||||
// noncurrent versions of this object.
|
||||
if self.lock_retention.as_ref().is_some_and(|v| {
|
||||
v.object_lock_enabled
|
||||
.as_ref()
|
||||
.is_some_and(|v| v.as_str() == ObjectLockEnabled::ENABLED)
|
||||
}) {
|
||||
event = Event::default();
|
||||
} else {
|
||||
// No need to evaluate remaining versions' lifecycle
|
||||
// events after DeleteAllVersionsAction*
|
||||
events[i] = event;
|
||||
|
||||
info!("eval_inner: skipping remaining versions' lifecycle events after DeleteAllVersionsAction*");
|
||||
|
||||
break 'top_loop;
|
||||
}
|
||||
}
|
||||
IlmAction::DeleteVersionAction | IlmAction::DeleteRestoredVersionAction => {
|
||||
// Defensive code, should never happen
|
||||
if obj.version_id.is_none_or(|v| v.is_nil()) {
|
||||
event.action = IlmAction::NoneAction;
|
||||
}
|
||||
if self.is_object_locked(obj) {
|
||||
event = Event::default();
|
||||
}
|
||||
|
||||
if self.is_pending_replication(obj) {
|
||||
event = Event::default();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
match event.action {
|
||||
IlmAction::DeleteVersionAction => {
|
||||
// this noncurrent version will be expired, nothing to add
|
||||
}
|
||||
_ => {
|
||||
// this noncurrent version will be spared
|
||||
newer_noncurrent_versions += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
events[i] = event;
|
||||
}
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
/// Eval will return a lifecycle event for each object in objs
|
||||
pub async fn eval(&self, objs: &[ObjectOpts]) -> Result<Vec<Event>, std::io::Error> {
|
||||
if objs.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
if objs.len() != objs[0].num_versions {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!("number of versions mismatch, expected {}, got {}", objs[0].num_versions, objs.len()),
|
||||
));
|
||||
}
|
||||
Ok(self.eval_inner(objs, OffsetDateTime::now_utc()).await)
|
||||
}
|
||||
}
|
||||
@@ -19,28 +19,31 @@
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use crate::bucket::lifecycle::rule::TransitionOps;
|
||||
use crate::store_api::ObjectInfo;
|
||||
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
|
||||
use s3s::dto::{
|
||||
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
|
||||
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use time::macros::{datetime, offset};
|
||||
use time::{self, Duration, OffsetDateTime};
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub const TRANSITION_COMPLETE: &str = "complete";
|
||||
pub const TRANSITION_PENDING: &str = "pending";
|
||||
|
||||
const ERR_LIFECYCLE_TOO_MANY_RULES: &str = "Lifecycle configuration allows a maximum of 1000 rules";
|
||||
const ERR_LIFECYCLE_NO_RULE: &str = "Lifecycle configuration should have at least one rule";
|
||||
const ERR_LIFECYCLE_DUPLICATE_ID: &str = "Rule ID must be unique. Found same ID for more than one rule";
|
||||
const _ERR_XML_NOT_WELL_FORMED: &str =
|
||||
"The XML you provided was not well-formed or did not validate against our published schema";
|
||||
const ERR_LIFECYCLE_BUCKET_LOCKED: &str =
|
||||
"ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an retention bucket";
|
||||
const ERR_LIFECYCLE_TOO_MANY_RULES: &str = "Lifecycle configuration should have at most 1000 rules";
|
||||
|
||||
pub use rustfs_common::metrics::IlmAction;
|
||||
|
||||
@@ -130,11 +133,11 @@ impl RuleValidate for LifecycleRule {
|
||||
pub trait Lifecycle {
|
||||
async fn has_transition(&self) -> bool;
|
||||
fn has_expiry(&self) -> bool;
|
||||
async fn has_active_rules(&self, prefix: &str) -> bool;
|
||||
fn has_active_rules(&self, prefix: &str) -> bool;
|
||||
async fn validate(&self, lr: &ObjectLockConfiguration) -> Result<(), std::io::Error>;
|
||||
async fn filter_rules(&self, obj: &ObjectOpts) -> Option<Vec<LifecycleRule>>;
|
||||
async fn eval(&self, obj: &ObjectOpts) -> Event;
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event;
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime, newer_noncurrent_versions: usize) -> Event;
|
||||
//fn set_prediction_headers(&self, w: http.ResponseWriter, obj: ObjectOpts);
|
||||
async fn noncurrent_versions_expiration_limit(self: Arc<Self>, obj: &ObjectOpts) -> Event;
|
||||
}
|
||||
@@ -159,7 +162,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
false
|
||||
}
|
||||
|
||||
async fn has_active_rules(&self, prefix: &str) -> bool {
|
||||
fn has_active_rules(&self, prefix: &str) -> bool {
|
||||
if self.rules.len() == 0 {
|
||||
return false;
|
||||
}
|
||||
@@ -273,10 +276,10 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
}
|
||||
|
||||
async fn eval(&self, obj: &ObjectOpts) -> Event {
|
||||
self.eval_inner(obj, OffsetDateTime::now_utc()).await
|
||||
self.eval_inner(obj, OffsetDateTime::now_utc(), 0).await
|
||||
}
|
||||
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime, newer_noncurrent_versions: usize) -> Event {
|
||||
let mut events = Vec::<Event>::new();
|
||||
info!(
|
||||
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
|
||||
@@ -435,10 +438,10 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
obj.is_latest,
|
||||
obj.delete_marker,
|
||||
obj.version_id,
|
||||
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
|
||||
(obj.is_latest || obj.version_id.is_none_or(|v| v.is_nil())) && !obj.delete_marker
|
||||
);
|
||||
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
|
||||
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
|
||||
if (obj.is_latest || obj.version_id.is_none_or(|v| v.is_nil())) && !obj.delete_marker {
|
||||
info!("eval_inner: entering expiration check");
|
||||
if let Some(ref expiration) = rule.expiration {
|
||||
if let Some(ref date) = expiration.date {
|
||||
@@ -658,7 +661,7 @@ pub struct ObjectOpts {
|
||||
pub user_tags: String,
|
||||
pub mod_time: Option<OffsetDateTime>,
|
||||
pub size: usize,
|
||||
pub version_id: String,
|
||||
pub version_id: Option<Uuid>,
|
||||
pub is_latest: bool,
|
||||
pub delete_marker: bool,
|
||||
pub num_versions: usize,
|
||||
@@ -668,12 +671,37 @@ pub struct ObjectOpts {
|
||||
pub restore_expires: Option<OffsetDateTime>,
|
||||
pub versioned: bool,
|
||||
pub version_suspended: bool,
|
||||
pub user_defined: HashMap<String, String>,
|
||||
pub version_purge_status: VersionPurgeStatusType,
|
||||
pub replication_status: ReplicationStatusType,
|
||||
}
|
||||
|
||||
impl ObjectOpts {
|
||||
pub fn expired_object_deletemarker(&self) -> bool {
|
||||
self.delete_marker && self.num_versions == 1
|
||||
}
|
||||
|
||||
pub fn from_object_info(oi: &ObjectInfo) -> Self {
|
||||
Self {
|
||||
name: oi.name.clone(),
|
||||
user_tags: oi.user_tags.clone(),
|
||||
mod_time: oi.mod_time,
|
||||
size: oi.size as usize,
|
||||
version_id: oi.version_id.clone(),
|
||||
is_latest: oi.is_latest,
|
||||
delete_marker: oi.delete_marker,
|
||||
num_versions: oi.num_versions,
|
||||
successor_mod_time: oi.successor_mod_time,
|
||||
transition_status: oi.transitioned_object.status.clone(),
|
||||
restore_ongoing: oi.restore_ongoing,
|
||||
restore_expires: oi.restore_expires,
|
||||
versioned: false,
|
||||
version_suspended: false,
|
||||
user_defined: oi.user_defined.clone(),
|
||||
version_purge_status: oi.version_purge_status.clone(),
|
||||
replication_status: oi.replication_status.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod bucket_lifecycle_audit;
|
||||
pub mod bucket_lifecycle_ops;
|
||||
pub mod evaluator;
|
||||
pub mod lifecycle;
|
||||
pub mod rule;
|
||||
pub mod tier_last_day_stats;
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::global::GLOBAL_TierConfigMgr;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::any::Any;
|
||||
use std::io::Write;
|
||||
use uuid::Uuid;
|
||||
use xxhash_rust::xxh64;
|
||||
|
||||
static XXHASH_SEED: u64 = 0;
|
||||
@@ -33,7 +34,7 @@ static XXHASH_SEED: u64 = 0;
|
||||
struct ObjSweeper {
|
||||
object: String,
|
||||
bucket: String,
|
||||
version_id: String,
|
||||
version_id: Option<Uuid>,
|
||||
versioned: bool,
|
||||
suspended: bool,
|
||||
transition_status: String,
|
||||
@@ -53,8 +54,8 @@ impl ObjSweeper {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_version(&mut self, vid: String) -> &Self {
|
||||
self.version_id = vid;
|
||||
pub fn with_version(&mut self, vid: Option<Uuid>) -> &Self {
|
||||
self.version_id = vid.clone();
|
||||
self
|
||||
}
|
||||
|
||||
@@ -71,8 +72,8 @@ impl ObjSweeper {
|
||||
version_suspended: self.suspended,
|
||||
..Default::default()
|
||||
};
|
||||
if self.suspended && self.version_id == "" {
|
||||
opts.version_id = String::from("");
|
||||
if self.suspended && self.version_id.is_none_or(|v| v.is_nil()) {
|
||||
opts.version_id = None;
|
||||
}
|
||||
opts
|
||||
}
|
||||
@@ -93,7 +94,7 @@ impl ObjSweeper {
|
||||
if !self.versioned || self.suspended {
|
||||
// 1, 2.a, 2.b
|
||||
del_tier = true;
|
||||
} else if self.versioned && self.version_id != "" {
|
||||
} else if self.versioned && self.version_id.is_some_and(|v| !v.is_nil()) {
|
||||
// 3.a
|
||||
del_tier = true;
|
||||
}
|
||||
|
||||
@@ -180,6 +180,13 @@ pub async fn created_at(bucket: &str) -> Result<OffsetDateTime> {
|
||||
bucket_meta_sys.created_at(bucket).await
|
||||
}
|
||||
|
||||
pub async fn list_bucket_targets(bucket: &str) -> Result<BucketTargets> {
|
||||
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
|
||||
|
||||
bucket_meta_sys.get_bucket_targets_config(bucket).await
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BucketMetadataSys {
|
||||
metadata_map: RwLock<HashMap<String, Arc<BucketMetadata>>>,
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
pub mod objectlock;
|
||||
pub mod objectlock_sys;
|
||||
|
||||
use s3s::dto::{ObjectLockConfiguration, ObjectLockEnabled};
|
||||
use s3s::dto::{ObjectLockConfiguration, ObjectLockEnabled, ObjectLockLegalHoldStatus};
|
||||
|
||||
pub trait ObjectLockApi {
|
||||
fn enabled(&self) -> bool;
|
||||
@@ -28,3 +28,13 @@ impl ObjectLockApi for ObjectLockConfiguration {
|
||||
.is_some_and(|v| v.as_str() == ObjectLockEnabled::ENABLED)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ObjectLockStatusExt {
|
||||
fn valid(&self) -> bool;
|
||||
}
|
||||
|
||||
impl ObjectLockStatusExt for ObjectLockLegalHoldStatus {
|
||||
fn valid(&self) -> bool {
|
||||
matches!(self.as_str(), ObjectLockLegalHoldStatus::ON | ObjectLockLegalHoldStatus::OFF)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,12 +13,15 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::StorageAPI;
|
||||
use crate::bucket::bucket_target_sys::BucketTargetSys;
|
||||
use crate::bucket::metadata_sys;
|
||||
use crate::bucket::replication::ResyncOpts;
|
||||
use crate::bucket::replication::ResyncStatusType;
|
||||
use crate::bucket::replication::replicate_delete;
|
||||
use crate::bucket::replication::replicate_object;
|
||||
use crate::bucket::replication::replication_resyncer::{
|
||||
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, ReplicationResyncer,
|
||||
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, ReplicationConfig, ReplicationResyncer,
|
||||
get_heal_replicate_object_info,
|
||||
};
|
||||
use crate::bucket::replication::replication_state::ReplicationStats;
|
||||
use crate::config::com::read_config;
|
||||
@@ -34,8 +37,10 @@ use rustfs_filemeta::ReplicationStatusType;
|
||||
use rustfs_filemeta::ReplicationType;
|
||||
use rustfs_filemeta::ReplicationWorkerOperation;
|
||||
use rustfs_filemeta::ResyncDecision;
|
||||
use rustfs_filemeta::VersionPurgeStatusType;
|
||||
use rustfs_filemeta::replication_statuses_map;
|
||||
use rustfs_filemeta::version_purge_statuses_map;
|
||||
use rustfs_filemeta::{REPLICATE_EXISTING, REPLICATE_HEAL, REPLICATE_HEAL_DELETE};
|
||||
use rustfs_utils::http::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
@@ -1041,3 +1046,152 @@ pub async fn schedule_replication_delete(dv: DeletedObjectReplicationInfo) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// QueueReplicationHeal is a wrapper for queue_replication_heal_internal
|
||||
pub async fn queue_replication_heal(bucket: &str, oi: ObjectInfo, retry_count: u32) {
|
||||
// ignore modtime zero objects
|
||||
if oi.mod_time.is_none() || oi.mod_time == Some(OffsetDateTime::UNIX_EPOCH) {
|
||||
return;
|
||||
}
|
||||
|
||||
let rcfg = match metadata_sys::get_replication_config(bucket).await {
|
||||
Ok((config, _)) => config,
|
||||
Err(err) => {
|
||||
warn!("Failed to get replication config for bucket {}: {}", bucket, err);
|
||||
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let tgts = match BucketTargetSys::get().list_bucket_targets(bucket).await {
|
||||
Ok(targets) => Some(targets),
|
||||
Err(err) => {
|
||||
warn!("Failed to list bucket targets for bucket {}: {}", bucket, err);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let rcfg_wrapper = ReplicationConfig::new(Some(rcfg), tgts);
|
||||
queue_replication_heal_internal(bucket, oi, rcfg_wrapper, retry_count).await;
|
||||
}
|
||||
|
||||
/// queue_replication_heal_internal enqueues objects that failed replication OR eligible for resyncing through
|
||||
/// an ongoing resync operation or via existing objects replication configuration setting.
|
||||
pub async fn queue_replication_heal_internal(
|
||||
_bucket: &str,
|
||||
oi: ObjectInfo,
|
||||
rcfg: ReplicationConfig,
|
||||
retry_count: u32,
|
||||
) -> ReplicateObjectInfo {
|
||||
let mut roi = ReplicateObjectInfo::default();
|
||||
|
||||
// ignore modtime zero objects
|
||||
if oi.mod_time.is_none() || oi.mod_time == Some(OffsetDateTime::UNIX_EPOCH) {
|
||||
return roi;
|
||||
}
|
||||
|
||||
if rcfg.config.is_none() || rcfg.remotes.is_none() {
|
||||
return roi;
|
||||
}
|
||||
|
||||
roi = get_heal_replicate_object_info(&oi, &rcfg).await;
|
||||
roi.retry_count = retry_count;
|
||||
|
||||
if !roi.dsc.replicate_any() {
|
||||
return roi;
|
||||
}
|
||||
|
||||
// early return if replication already done, otherwise we need to determine if this
|
||||
// version is an existing object that needs healing.
|
||||
if roi.replication_status == ReplicationStatusType::Completed
|
||||
&& roi.version_purge_status.is_empty()
|
||||
&& !roi.existing_obj_resync.must_resync()
|
||||
{
|
||||
return roi;
|
||||
}
|
||||
|
||||
if roi.delete_marker || !roi.version_purge_status.is_empty() {
|
||||
let (version_id, dm_version_id) = if roi.version_purge_status.is_empty() {
|
||||
(None, roi.version_id)
|
||||
} else {
|
||||
(roi.version_id, None)
|
||||
};
|
||||
|
||||
let dv = DeletedObjectReplicationInfo {
|
||||
delete_object: crate::store_api::DeletedObject {
|
||||
object_name: roi.name.clone(),
|
||||
delete_marker_version_id: dm_version_id,
|
||||
version_id,
|
||||
replication_state: roi.replication_state.clone(),
|
||||
delete_marker_mtime: roi.mod_time,
|
||||
delete_marker: roi.delete_marker,
|
||||
..Default::default()
|
||||
},
|
||||
bucket: roi.bucket.clone(),
|
||||
op_type: ReplicationType::Heal,
|
||||
event_type: REPLICATE_HEAL_DELETE.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// heal delete marker replication failure or versioned delete replication failure
|
||||
if roi.replication_status == ReplicationStatusType::Pending
|
||||
|| roi.replication_status == ReplicationStatusType::Failed
|
||||
|| roi.version_purge_status == VersionPurgeStatusType::Failed
|
||||
|| roi.version_purge_status == VersionPurgeStatusType::Pending
|
||||
{
|
||||
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
|
||||
pool.queue_replica_delete_task(dv).await;
|
||||
}
|
||||
return roi;
|
||||
}
|
||||
|
||||
// if replication status is Complete on DeleteMarker and existing object resync required
|
||||
let existing_obj_resync = roi.existing_obj_resync.clone();
|
||||
if existing_obj_resync.must_resync()
|
||||
&& (roi.replication_status == ReplicationStatusType::Completed || roi.replication_status.is_empty())
|
||||
{
|
||||
queue_replicate_deletes_wrapper(dv, existing_obj_resync).await;
|
||||
return roi;
|
||||
}
|
||||
|
||||
return roi;
|
||||
}
|
||||
|
||||
if roi.existing_obj_resync.must_resync() {
|
||||
roi.op_type = ReplicationType::ExistingObject;
|
||||
}
|
||||
|
||||
match roi.replication_status {
|
||||
ReplicationStatusType::Pending | ReplicationStatusType::Failed => {
|
||||
roi.event_type = REPLICATE_HEAL.to_string();
|
||||
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
|
||||
pool.queue_replica_task(roi.clone()).await;
|
||||
}
|
||||
return roi;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if roi.existing_obj_resync.must_resync() {
|
||||
roi.event_type = REPLICATE_EXISTING.to_string();
|
||||
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
|
||||
pool.queue_replica_task(roi.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
roi
|
||||
}
|
||||
|
||||
/// Wrapper function for queueing replicate deletes with resync decision
|
||||
async fn queue_replicate_deletes_wrapper(doi: DeletedObjectReplicationInfo, existing_obj_resync: ResyncDecision) {
|
||||
for (k, v) in existing_obj_resync.targets.iter() {
|
||||
if v.replicate {
|
||||
let mut dv = doi.clone();
|
||||
dv.reset_id = v.reset_id.clone();
|
||||
dv.target_arn = k.clone();
|
||||
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
|
||||
pool.queue_replica_delete_task(dv).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -754,7 +754,7 @@ impl ReplicationWorkerOperation for DeletedObjectReplicationInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct ReplicationConfig {
|
||||
pub config: Option<ReplicationConfiguration>,
|
||||
pub remotes: Option<BucketTargets>,
|
||||
@@ -2174,35 +2174,122 @@ fn is_standard_header(k: &str) -> bool {
|
||||
STANDARD_HEADERS.iter().any(|h| h.eq_ignore_ascii_case(k))
|
||||
}
|
||||
|
||||
// Valid SSE replication headers mapping from internal to replication headers
|
||||
static VALID_SSE_REPLICATION_HEADERS: &[(&str, &str)] = &[
|
||||
(
|
||||
"X-Rustfs-Internal-Server-Side-Encryption-Sealed-Key",
|
||||
"X-Rustfs-Replication-Server-Side-Encryption-Sealed-Key",
|
||||
),
|
||||
(
|
||||
"X-Rustfs-Internal-Server-Side-Encryption-Seal-Algorithm",
|
||||
"X-Rustfs-Replication-Server-Side-Encryption-Seal-Algorithm",
|
||||
),
|
||||
(
|
||||
"X-Rustfs-Internal-Server-Side-Encryption-Iv",
|
||||
"X-Rustfs-Replication-Server-Side-Encryption-Iv",
|
||||
),
|
||||
("X-Rustfs-Internal-Encrypted-Multipart", "X-Rustfs-Replication-Encrypted-Multipart"),
|
||||
("X-Rustfs-Internal-Actual-Object-Size", "X-Rustfs-Replication-Actual-Object-Size"),
|
||||
];
|
||||
|
||||
const REPLICATION_SSEC_CHECKSUM_HEADER: &str = "X-Rustfs-Replication-Ssec-Crc";
|
||||
|
||||
fn is_valid_sse_header(k: &str) -> Option<&str> {
|
||||
VALID_SSE_REPLICATION_HEADERS
|
||||
.iter()
|
||||
.find(|(internal, _)| k.eq_ignore_ascii_case(internal))
|
||||
.map(|(_, replication)| *replication)
|
||||
}
|
||||
|
||||
fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObjectOptions, bool)> {
|
||||
use crate::config::storageclass::{RRS, STANDARD};
|
||||
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
|
||||
use rustfs_utils::http::{
|
||||
AMZ_CHECKSUM_TYPE, AMZ_CHECKSUM_TYPE_FULL_OBJECT, AMZ_SERVER_SIDE_ENCRYPTION, AMZ_SERVER_SIDE_ENCRYPTION_KMS_ID,
|
||||
};
|
||||
|
||||
let mut meta = HashMap::new();
|
||||
let is_ssec = is_ssec_encrypted(&object_info.user_defined);
|
||||
|
||||
// Process user-defined metadata
|
||||
for (k, v) in object_info.user_defined.iter() {
|
||||
if strings_has_prefix_fold(k, RESERVED_METADATA_PREFIX) {
|
||||
continue;
|
||||
let has_valid_sse_header = is_valid_sse_header(k).is_some();
|
||||
|
||||
// In case of SSE-C objects copy the allowed internal headers as well
|
||||
if !is_ssec || !has_valid_sse_header {
|
||||
if strings_has_prefix_fold(k, RESERVED_METADATA_PREFIX) {
|
||||
continue;
|
||||
}
|
||||
if is_standard_header(k) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if is_standard_header(k) {
|
||||
continue;
|
||||
if let Some(replication_header) = is_valid_sse_header(k) {
|
||||
meta.insert(replication_header.to_string(), v.to_string());
|
||||
} else {
|
||||
meta.insert(k.to_string(), v.to_string());
|
||||
}
|
||||
|
||||
meta.insert(k.to_string(), v.to_string());
|
||||
}
|
||||
|
||||
let is_multipart = object_info.is_multipart();
|
||||
let mut is_multipart = object_info.is_multipart();
|
||||
|
||||
// Handle checksum
|
||||
if let Some(checksum_data) = &object_info.checksum
|
||||
&& !checksum_data.is_empty()
|
||||
{
|
||||
// Add encrypted CRC to metadata for SSE-C objects
|
||||
if is_ssec {
|
||||
let encoded = BASE64_STANDARD.encode(checksum_data);
|
||||
meta.insert(REPLICATION_SSEC_CHECKSUM_HEADER.to_string(), encoded);
|
||||
} else {
|
||||
// Get checksum metadata for non-SSE-C objects
|
||||
let (cs_meta, is_mp) = object_info.decrypt_checksums(0, &http::HeaderMap::new())?;
|
||||
is_multipart = is_mp;
|
||||
|
||||
// Set object checksum metadata
|
||||
for (k, v) in cs_meta.iter() {
|
||||
if k != AMZ_CHECKSUM_TYPE {
|
||||
meta.insert(k.clone(), v.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// For objects where checksum is full object, use the cheaper PutObject replication
|
||||
if !object_info.is_multipart()
|
||||
&& cs_meta
|
||||
.get(AMZ_CHECKSUM_TYPE)
|
||||
.map(|v| v.as_str() == AMZ_CHECKSUM_TYPE_FULL_OBJECT)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
is_multipart = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle storage class default
|
||||
let storage_class = if sc.is_empty() {
|
||||
let obj_sc = object_info.storage_class.as_deref().unwrap_or_default();
|
||||
if obj_sc == STANDARD || obj_sc == RRS {
|
||||
obj_sc.to_string()
|
||||
} else {
|
||||
sc.to_string()
|
||||
}
|
||||
} else {
|
||||
sc.to_string()
|
||||
};
|
||||
|
||||
let mut put_op = PutObjectOptions {
|
||||
user_metadata: meta,
|
||||
content_type: object_info.content_type.clone().unwrap_or_default(),
|
||||
content_encoding: object_info.content_encoding.clone().unwrap_or_default(),
|
||||
expires: object_info.expires.unwrap_or(OffsetDateTime::UNIX_EPOCH),
|
||||
storage_class: sc.to_string(),
|
||||
storage_class,
|
||||
internal: AdvancedPutOptions {
|
||||
source_version_id: object_info.version_id.map(|v| v.to_string()).unwrap_or_default(),
|
||||
source_etag: object_info.etag.clone().unwrap_or_default(),
|
||||
source_mtime: object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH),
|
||||
replication_status: ReplicationStatusType::Pending,
|
||||
replication_request: true,
|
||||
replication_status: ReplicationStatusType::Replica, // Changed from Pending to Replica
|
||||
replication_request: true, // always set this to distinguish between replication and normal PUT operation
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
@@ -2213,36 +2300,43 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
|
||||
|
||||
if !tags.is_empty() {
|
||||
put_op.user_tags = tags;
|
||||
// set tag timestamp in opts
|
||||
put_op.internal.tagging_timestamp = if let Some(ts) = object_info
|
||||
.user_defined
|
||||
.get(&format!("{RESERVED_METADATA_PREFIX}tagging-timestamp"))
|
||||
.get(&format!("{RESERVED_METADATA_PREFIX_LOWER}tagging-timestamp"))
|
||||
{
|
||||
OffsetDateTime::parse(ts, &Rfc3339).unwrap_or(OffsetDateTime::UNIX_EPOCH)
|
||||
OffsetDateTime::parse(ts, &Rfc3339)
|
||||
.map_err(|e| Error::other(format!("Failed to parse tagging timestamp: {}", e)))?
|
||||
} else {
|
||||
object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(lang) = object_info.user_defined.lookup(headers::CONTENT_LANGUAGE) {
|
||||
// Use case-insensitive lookup for headers
|
||||
let lk_map = object_info.user_defined.clone();
|
||||
|
||||
if let Some(lang) = lk_map.lookup(headers::CONTENT_LANGUAGE) {
|
||||
put_op.content_language = lang.to_string();
|
||||
}
|
||||
|
||||
if let Some(cd) = object_info.user_defined.lookup(headers::CONTENT_DISPOSITION) {
|
||||
if let Some(cd) = lk_map.lookup(headers::CONTENT_DISPOSITION) {
|
||||
put_op.content_disposition = cd.to_string();
|
||||
}
|
||||
|
||||
if let Some(v) = object_info.user_defined.lookup(headers::CACHE_CONTROL) {
|
||||
if let Some(v) = lk_map.lookup(headers::CACHE_CONTROL) {
|
||||
put_op.cache_control = v.to_string();
|
||||
}
|
||||
|
||||
if let Some(v) = object_info.user_defined.lookup(headers::AMZ_OBJECT_LOCK_MODE) {
|
||||
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_MODE) {
|
||||
let mode = v.to_string().to_uppercase();
|
||||
put_op.mode = Some(aws_sdk_s3::types::ObjectLockRetentionMode::from(mode.as_str()));
|
||||
}
|
||||
|
||||
if let Some(v) = object_info.user_defined.lookup(headers::AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE) {
|
||||
put_op.retain_until_date = OffsetDateTime::parse(v, &Rfc3339).unwrap_or(OffsetDateTime::UNIX_EPOCH);
|
||||
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE) {
|
||||
put_op.retain_until_date =
|
||||
OffsetDateTime::parse(v, &Rfc3339).map_err(|e| Error::other(format!("Failed to parse retain until date: {}", e)))?;
|
||||
// set retention timestamp in opts
|
||||
put_op.internal.retention_timestamp = if let Some(v) = object_info
|
||||
.user_defined
|
||||
.get(&format!("{RESERVED_METADATA_PREFIX_LOWER}objectlock-retention-timestamp"))
|
||||
@@ -2253,9 +2347,10 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(v) = object_info.user_defined.lookup(headers::AMZ_OBJECT_LOCK_LEGAL_HOLD) {
|
||||
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_LEGAL_HOLD) {
|
||||
let hold = v.to_uppercase();
|
||||
put_op.legalhold = Some(ObjectLockLegalHoldStatus::from(hold.as_str()));
|
||||
// set legalhold timestamp in opts
|
||||
put_op.internal.legalhold_timestamp = if let Some(v) = object_info
|
||||
.user_defined
|
||||
.get(&format!("{RESERVED_METADATA_PREFIX_LOWER}objectlock-legalhold-timestamp"))
|
||||
@@ -2266,7 +2361,34 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
|
||||
};
|
||||
}
|
||||
|
||||
// TODO: is encrypted
|
||||
// Handle SSE-S3 encryption
|
||||
if object_info
|
||||
.user_defined
|
||||
.get(AMZ_SERVER_SIDE_ENCRYPTION)
|
||||
.map(|v| v.eq_ignore_ascii_case("AES256"))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// SSE-S3 detected - set ServerSideEncryption
|
||||
// Note: This requires the PutObjectOptions to support SSE
|
||||
// TODO: Implement SSE-S3 support in PutObjectOptions if not already present
|
||||
}
|
||||
|
||||
// Handle SSE-KMS encryption
|
||||
if object_info.user_defined.contains_key(AMZ_SERVER_SIDE_ENCRYPTION_KMS_ID) {
|
||||
// SSE-KMS detected
|
||||
// If KMS key ID replication is enabled (as by default)
|
||||
// we include the object's KMS key ID. In any case, we
|
||||
// always set the SSE-KMS header. If no KMS key ID is
|
||||
// specified, MinIO is supposed to use whatever default
|
||||
// config applies on the site or bucket.
|
||||
// TODO: Implement SSE-KMS support with key ID replication
|
||||
// let key_id = if kms::replicate_key_id() {
|
||||
// object_info.kms_key_id()
|
||||
// } else {
|
||||
// None
|
||||
// };
|
||||
// TODO: Set SSE-KMS encryption in put_op
|
||||
}
|
||||
|
||||
Ok((put_op, is_multipart))
|
||||
}
|
||||
|
||||
@@ -15,7 +15,8 @@
|
||||
use crate::disk::{
|
||||
CheckPartsResp, DeleteOptions, DiskAPI, DiskError, DiskInfo, DiskInfoOptions, DiskLocation, Endpoint, Error,
|
||||
FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, Result, UpdateMetadataOpts, VolumeInfo,
|
||||
WalkDirOptions, local::LocalDisk,
|
||||
WalkDirOptions,
|
||||
local::{LocalDisk, ScanGuard},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
|
||||
@@ -259,6 +260,7 @@ impl LocalDiskWrapper {
|
||||
let test_obj = format!("health-check-{}", Uuid::new_v4());
|
||||
if Self::perform_health_check(disk.clone(), &TEST_BUCKET, &test_obj, &TEST_DATA, true, CHECK_TIMEOUT_DURATION).await.is_err() && health.swap_ok_to_faulty() {
|
||||
// Health check failed, disk is considered faulty
|
||||
warn!("health check: failed, disk is considered faulty");
|
||||
|
||||
health.increment_waiting(); // Balance the increment from failed operation
|
||||
|
||||
@@ -429,7 +431,7 @@ impl LocalDiskWrapper {
|
||||
{
|
||||
// Check if disk is faulty
|
||||
if self.health.is_faulty() {
|
||||
warn!("disk {} health is faulty, returning error", self.to_string());
|
||||
warn!("local disk {} health is faulty, returning error", self.to_string());
|
||||
return Err(DiskError::FaultyDisk);
|
||||
}
|
||||
|
||||
@@ -476,6 +478,15 @@ impl LocalDiskWrapper {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl DiskAPI for LocalDiskWrapper {
|
||||
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
self.track_disk_health(|| async { self.disk.read_metadata(volume, path).await }, Duration::ZERO)
|
||||
.await
|
||||
}
|
||||
|
||||
fn start_scan(&self) -> ScanGuard {
|
||||
self.disk.start_scan()
|
||||
}
|
||||
|
||||
fn to_string(&self) -> String {
|
||||
self.disk.to_string()
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ pub struct LocalDisk {
|
||||
pub format_info: RwLock<FormatInfo>,
|
||||
pub endpoint: Endpoint,
|
||||
pub disk_info_cache: Arc<Cache<DiskInfo>>,
|
||||
pub scanning: AtomicU32,
|
||||
pub scanning: Arc<AtomicU32>,
|
||||
pub rotational: bool,
|
||||
pub fstype: String,
|
||||
pub major: u64,
|
||||
@@ -209,7 +209,7 @@ impl LocalDisk {
|
||||
format_path,
|
||||
format_info: RwLock::new(format_info),
|
||||
disk_info_cache: Arc::new(cache),
|
||||
scanning: AtomicU32::new(0),
|
||||
scanning: Arc::new(AtomicU32::new(0)),
|
||||
rotational: Default::default(),
|
||||
fstype: Default::default(),
|
||||
minor: Default::default(),
|
||||
@@ -664,6 +664,7 @@ impl LocalDisk {
|
||||
match self.read_metadata_with_dmtime(meta_path).await {
|
||||
Ok(res) => Ok(res),
|
||||
Err(err) => {
|
||||
warn!("read_raw: error: {:?}", err);
|
||||
if err == Error::FileNotFound
|
||||
&& !skip_access_checks(volume_dir.as_ref().to_string_lossy().to_string().as_str())
|
||||
&& let Err(e) = access(volume_dir.as_ref()).await
|
||||
@@ -687,20 +688,6 @@ impl LocalDisk {
|
||||
Ok((buf, mtime))
|
||||
}
|
||||
|
||||
async fn read_metadata(&self, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
|
||||
// Try to use cached file content reading for better performance, with safe fallback
|
||||
let path = file_path.as_ref().to_path_buf();
|
||||
|
||||
// First, try the cache
|
||||
if let Ok(bytes) = get_global_file_cache().get_file_content(path.clone()).await {
|
||||
return Ok(bytes.to_vec());
|
||||
}
|
||||
|
||||
// Fallback to direct read if cache fails
|
||||
let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?;
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
async fn read_metadata_with_dmtime(&self, file_path: impl AsRef<Path>) -> Result<(Vec<u8>, Option<OffsetDateTime>)> {
|
||||
check_path_length(file_path.as_ref().to_string_lossy().as_ref())?;
|
||||
|
||||
@@ -1052,7 +1039,7 @@ impl LocalDisk {
|
||||
|
||||
if entry.ends_with(STORAGE_FORMAT_FILE) {
|
||||
let metadata = self
|
||||
.read_metadata(self.get_object_path(bucket, format!("{}/{}", ¤t, &entry).as_str())?)
|
||||
.read_metadata(bucket, format!("{}/{}", ¤t, &entry).as_str())
|
||||
.await?;
|
||||
|
||||
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
|
||||
@@ -1068,7 +1055,7 @@ impl LocalDisk {
|
||||
|
||||
out.write_obj(&MetaCacheEntry {
|
||||
name: name.clone(),
|
||||
metadata,
|
||||
metadata: metadata.to_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
@@ -1135,14 +1122,14 @@ impl LocalDisk {
|
||||
|
||||
let fname = format!("{}/{}", &meta.name, STORAGE_FORMAT_FILE);
|
||||
|
||||
match self.read_metadata(self.get_object_path(&opts.bucket, fname.as_str())?).await {
|
||||
match self.read_metadata(&opts.bucket, fname.as_str()).await {
|
||||
Ok(res) => {
|
||||
if is_dir_obj {
|
||||
meta.name = meta.name.trim_end_matches(GLOBAL_DIR_SUFFIX_WITH_SLASH).to_owned();
|
||||
meta.name.push_str(SLASH_SEPARATOR_STR);
|
||||
}
|
||||
|
||||
meta.metadata = res;
|
||||
meta.metadata = res.to_vec();
|
||||
|
||||
out.write_obj(&meta).await?;
|
||||
|
||||
@@ -1189,6 +1176,14 @@ impl LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScanGuard(pub Arc<AtomicU32>);
|
||||
|
||||
impl Drop for ScanGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
fn is_root_path(path: impl AsRef<Path>) -> bool {
|
||||
path.as_ref().components().count() == 1 && path.as_ref().has_root()
|
||||
}
|
||||
@@ -1579,6 +1574,8 @@ impl DiskAPI for LocalDisk {
|
||||
.as_str(),
|
||||
)?;
|
||||
|
||||
info!("check_parts: part_path: {:?}", &part_path);
|
||||
|
||||
match lstat(&part_path).await {
|
||||
Ok(st) => {
|
||||
if st.is_dir() {
|
||||
@@ -1593,6 +1590,8 @@ impl DiskAPI for LocalDisk {
|
||||
resp.results[i] = CHECK_PART_SUCCESS;
|
||||
}
|
||||
Err(err) => {
|
||||
info!("check_parts: failed to stat file: {:?}, error: {:?}", &part_path, &err);
|
||||
|
||||
let e: DiskError = to_file_error(err).into();
|
||||
|
||||
if e == DiskError::FileNotFound {
|
||||
@@ -1882,19 +1881,20 @@ impl DiskAPI for LocalDisk {
|
||||
let mut objs_returned = 0;
|
||||
|
||||
if opts.base_dir.ends_with(SLASH_SEPARATOR_STR) {
|
||||
let fpath = self.get_object_path(
|
||||
&opts.bucket,
|
||||
path_join_buf(&[
|
||||
format!("{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR_STR), GLOBAL_DIR_SUFFIX).as_str(),
|
||||
STORAGE_FORMAT_FILE,
|
||||
])
|
||||
.as_str(),
|
||||
)?;
|
||||
|
||||
if let Ok(data) = self.read_metadata(fpath).await {
|
||||
if let Ok(data) = self
|
||||
.read_metadata(
|
||||
&opts.bucket,
|
||||
path_join_buf(&[
|
||||
format!("{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR_STR), GLOBAL_DIR_SUFFIX).as_str(),
|
||||
STORAGE_FORMAT_FILE,
|
||||
])
|
||||
.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let meta = MetaCacheEntry {
|
||||
name: opts.base_dir.clone(),
|
||||
metadata: data,
|
||||
metadata: data.to_vec(),
|
||||
..Default::default()
|
||||
};
|
||||
out.write_obj(&meta).await?;
|
||||
@@ -2549,6 +2549,26 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
Ok(info)
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn start_scan(&self) -> ScanGuard {
|
||||
self.scanning.fetch_add(1, Ordering::Relaxed);
|
||||
ScanGuard(Arc::clone(&self.scanning))
|
||||
}
|
||||
|
||||
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
// Try to use cached file content reading for better performance, with safe fallback
|
||||
let file_path = self.get_object_path(volume, path)?;
|
||||
// let file_path = file_path.join(Path::new(STORAGE_FORMAT_FILE));
|
||||
|
||||
// First, try the cache
|
||||
if let Ok(bytes) = get_global_file_cache().get_file_content(file_path.clone()).await {
|
||||
return Ok(bytes);
|
||||
}
|
||||
|
||||
// Fallback to direct read if cache fails
|
||||
let (data, _) = self.read_metadata_with_dmtime(file_path).await?;
|
||||
Ok(data.into())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> {
|
||||
|
||||
@@ -32,6 +32,7 @@ pub const STORAGE_FORMAT_FILE: &str = "xl.meta";
|
||||
pub const STORAGE_FORMAT_FILE_BACKUP: &str = "xl.meta.bkp";
|
||||
|
||||
use crate::disk::disk_store::LocalDiskWrapper;
|
||||
use crate::disk::local::ScanGuard;
|
||||
use crate::rpc::RemoteDisk;
|
||||
use bytes::Bytes;
|
||||
use endpoint::Endpoint;
|
||||
@@ -395,6 +396,20 @@ impl DiskAPI for Disk {
|
||||
Disk::Remote(remote_disk) => remote_disk.disk_info(opts).await,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_scan(&self) -> ScanGuard {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.start_scan(),
|
||||
Disk::Remote(remote_disk) => remote_disk.start_scan(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.read_metadata(volume, path).await,
|
||||
Disk::Remote(remote_disk) => remote_disk.read_metadata(volume, path).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
|
||||
@@ -458,6 +473,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
|
||||
opts: &ReadOptions,
|
||||
) -> Result<FileInfo>;
|
||||
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo>;
|
||||
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes>;
|
||||
async fn rename_data(
|
||||
&self,
|
||||
src_volume: &str,
|
||||
@@ -489,6 +505,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;
|
||||
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
|
||||
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo>;
|
||||
fn start_scan(&self) -> ScanGuard;
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
@@ -694,6 +711,10 @@ pub fn has_part_err(part_errs: &[usize]) -> bool {
|
||||
part_errs.iter().any(|err| *err != CHECK_PART_SUCCESS)
|
||||
}
|
||||
|
||||
pub fn count_part_not_success(part_errs: &[usize]) -> usize {
|
||||
part_errs.iter().filter(|err| **err != CHECK_PART_SUCCESS).count()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -442,11 +442,11 @@ impl PoolMeta {
|
||||
}
|
||||
}
|
||||
|
||||
fn path2_bucket_object(name: &str) -> (String, String) {
|
||||
pub fn path2_bucket_object(name: &str) -> (String, String) {
|
||||
path2_bucket_object_with_base_path("", name)
|
||||
}
|
||||
|
||||
fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, String) {
|
||||
pub fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, String) {
|
||||
// Trim the base path and leading slash
|
||||
let trimmed_path = path
|
||||
.strip_prefix(base_path)
|
||||
@@ -454,7 +454,9 @@ fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, S
|
||||
.strip_prefix(SLASH_SEPARATOR_STR)
|
||||
.unwrap_or(path);
|
||||
// Find the position of the first '/'
|
||||
let pos = trimmed_path.find(SLASH_SEPARATOR_STR).unwrap_or(trimmed_path.len());
|
||||
let Some(pos) = trimmed_path.find(SLASH_SEPARATOR_STR) else {
|
||||
return (trimmed_path.to_string(), "".to_string());
|
||||
};
|
||||
// Split into bucket and prefix
|
||||
let bucket = &trimmed_path[0..pos];
|
||||
let prefix = &trimmed_path[pos + 1..]; // +1 to skip the '/' character if it exists
|
||||
|
||||
@@ -12,23 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{
|
||||
disk::{
|
||||
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
|
||||
FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo,
|
||||
WalkDirOptions,
|
||||
disk_store::{
|
||||
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE,
|
||||
get_max_timeout_duration,
|
||||
},
|
||||
endpoint::Endpoint,
|
||||
{
|
||||
disk_store::DiskHealthTracker,
|
||||
error::{DiskError, Error, Result},
|
||||
},
|
||||
use crate::disk::{
|
||||
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions, FileReader,
|
||||
FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
|
||||
disk_store::{
|
||||
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE, get_max_timeout_duration,
|
||||
},
|
||||
endpoint::Endpoint,
|
||||
};
|
||||
use crate::disk::{disk_store::DiskHealthTracker, error::DiskError, local::ScanGuard};
|
||||
use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
|
||||
use crate::{
|
||||
disk::error::{Error, Result},
|
||||
rpc::build_auth_headers,
|
||||
rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use futures::lock::Mutex;
|
||||
@@ -38,15 +34,18 @@ use rustfs_protos::proto_gen::node_service::RenamePartRequest;
|
||||
use rustfs_protos::proto_gen::node_service::{
|
||||
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
|
||||
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
|
||||
ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
|
||||
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
|
||||
ReadMetadataRequest, ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest,
|
||||
RenameFileRequest, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
|
||||
node_service_client::NodeServiceClient,
|
||||
};
|
||||
use rustfs_rio::{HttpReader, HttpWriter};
|
||||
use rustfs_utils::string::parse_bool_with_default;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{Arc, atomic::Ordering},
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU32, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::time;
|
||||
@@ -60,9 +59,8 @@ use uuid::Uuid;
|
||||
pub struct RemoteDisk {
|
||||
pub id: Mutex<Option<Uuid>>,
|
||||
pub addr: String,
|
||||
pub url: url::Url,
|
||||
pub root: PathBuf,
|
||||
endpoint: Endpoint,
|
||||
pub scanning: Arc<AtomicU32>,
|
||||
/// Whether health checking is enabled
|
||||
health_check: bool,
|
||||
/// Health tracker for connection monitoring
|
||||
@@ -73,8 +71,6 @@ pub struct RemoteDisk {
|
||||
|
||||
impl RemoteDisk {
|
||||
pub async fn new(ep: &Endpoint, opt: &DiskOption) -> Result<Self> {
|
||||
// let root = fs::canonicalize(ep.url.path()).await?;
|
||||
let root = PathBuf::from(ep.get_file_path());
|
||||
let addr = if let Some(port) = ep.url.port() {
|
||||
format!("{}://{}:{}", ep.url.scheme(), ep.url.host_str().unwrap(), port)
|
||||
} else {
|
||||
@@ -88,9 +84,8 @@ impl RemoteDisk {
|
||||
let disk = Self {
|
||||
id: Mutex::new(None),
|
||||
addr: addr.clone(),
|
||||
url: ep.url.clone(),
|
||||
root,
|
||||
endpoint: ep.clone(),
|
||||
scanning: Arc::new(AtomicU32::new(0)),
|
||||
health_check: opt.health_check && env_health_check,
|
||||
health: Arc::new(DiskHealthTracker::new()),
|
||||
cancel_token: CancellationToken::new(),
|
||||
@@ -227,7 +222,7 @@ impl RemoteDisk {
|
||||
{
|
||||
// Check if disk is faulty
|
||||
if self.health.is_faulty() {
|
||||
warn!("disk {} health is faulty, returning error", self.to_string());
|
||||
warn!("remote disk {} health is faulty, returning error", self.to_string());
|
||||
return Err(DiskError::FaultyDisk);
|
||||
}
|
||||
|
||||
@@ -313,7 +308,7 @@ impl DiskAPI for RemoteDisk {
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn path(&self) -> PathBuf {
|
||||
self.root.clone()
|
||||
PathBuf::from(self.endpoint.get_file_path())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
@@ -740,6 +735,26 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
let mut client = self
|
||||
.get_client()
|
||||
.await
|
||||
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
|
||||
let request = Request::new(ReadMetadataRequest {
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
});
|
||||
|
||||
let response = client.read_metadata(request).await?.into_inner();
|
||||
|
||||
if !response.success {
|
||||
return Err(response.error.unwrap_or_default().into());
|
||||
}
|
||||
|
||||
Ok(response.data)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()> {
|
||||
info!("update_metadata");
|
||||
@@ -1360,6 +1375,12 @@ impl DiskAPI for RemoteDisk {
|
||||
|
||||
Ok(disk_info)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn start_scan(&self) -> ScanGuard {
|
||||
self.scanning.fetch_add(1, Ordering::Relaxed);
|
||||
ScanGuard(Arc::clone(&self.scanning))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -22,12 +22,12 @@ use crate::bucket::replication::check_replicate_delete;
|
||||
use crate::bucket::versioning::VersioningApi;
|
||||
use crate::bucket::versioning_sys::BucketVersioningSys;
|
||||
use crate::client::{object_api_utils::get_raw_etag, transition_api::ReaderImpl};
|
||||
use crate::disk::STORAGE_FORMAT_FILE;
|
||||
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs};
|
||||
use crate::disk::{
|
||||
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN,
|
||||
conv_part_err_to_int, has_part_err,
|
||||
};
|
||||
use crate::disk::{STORAGE_FORMAT_FILE, count_part_not_success};
|
||||
use crate::erasure_coding;
|
||||
use crate::erasure_coding::bitrot_verify;
|
||||
use crate::error::{Error, Result, is_err_version_not_found};
|
||||
@@ -73,6 +73,7 @@ use rustfs_filemeta::{
|
||||
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
|
||||
RawFileInfo, ReplicationStatusType, VersionPurgeStatusType, file_info_from_raw, merge_file_meta_versions,
|
||||
};
|
||||
use rustfs_lock::FastLockGuard;
|
||||
use rustfs_lock::fast_lock::types::LockResult;
|
||||
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, WarpReader};
|
||||
@@ -2147,6 +2148,18 @@ impl SetDisks {
|
||||
shuffled_disks
|
||||
}
|
||||
|
||||
fn shuffle_check_parts(parts_errs: &[usize], distribution: &[usize]) -> Vec<usize> {
|
||||
if distribution.is_empty() {
|
||||
return parts_errs.to_vec();
|
||||
}
|
||||
let mut shuffled_parts_errs = vec![0; parts_errs.len()];
|
||||
for (i, v) in parts_errs.iter().enumerate() {
|
||||
let idx = distribution[i];
|
||||
shuffled_parts_errs[idx - 1] = *v;
|
||||
}
|
||||
shuffled_parts_errs
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn get_object_fileinfo(
|
||||
&self,
|
||||
@@ -2602,18 +2615,25 @@ impl SetDisks {
|
||||
opts: &HealOpts,
|
||||
) -> disk::error::Result<(HealResultItem, Option<DiskError>)> {
|
||||
info!(?opts, "Starting heal_object");
|
||||
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
let mut result = HealResultItem {
|
||||
heal_item_type: HealItemType::Object.to_string(),
|
||||
bucket: bucket.to_string(),
|
||||
object: object.to_string(),
|
||||
version_id: version_id.to_string(),
|
||||
disk_count: self.disks.read().await.len(),
|
||||
disk_count: disks.len(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let _write_lock_guard = if !opts.no_lock {
|
||||
info!("Acquiring write lock for object: {}, owner: {}", object, self.locker_owner);
|
||||
|
||||
// let fast_lock_guard = self.new_ns_lock(bucket, object).await?;
|
||||
|
||||
// Some(fast_lock_guard)
|
||||
|
||||
// Check if lock is already held
|
||||
let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object);
|
||||
let mut reuse_existing_lock = false;
|
||||
@@ -2675,39 +2695,16 @@ impl SetDisks {
|
||||
}
|
||||
};
|
||||
|
||||
let disks = { self.disks.read().await.clone() };
|
||||
let (mut parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, version_id, true, true).await?;
|
||||
|
||||
let (mut parts_metadata, errs) = {
|
||||
let mut retry_count = 0;
|
||||
loop {
|
||||
let (parts, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, version_id, true, true).await?;
|
||||
|
||||
// Check if we have enough valid metadata to proceed
|
||||
// If we have too many errors, and we haven't exhausted retries, try again
|
||||
let valid_count = errs.iter().filter(|e| e.is_none()).count();
|
||||
// Simple heuristic: if valid_count is less than expected quorum (e.g. half disks), retry
|
||||
// But we don't know the exact quorum yet. Let's just retry on high error rate if possible.
|
||||
// Actually, read_all_fileinfo shouldn't fail easily.
|
||||
// Let's just retry if we see ANY non-NotFound errors that might be transient (like timeouts)
|
||||
|
||||
let has_transient_error = errs
|
||||
.iter()
|
||||
.any(|e| matches!(e, Some(DiskError::SourceStalled) | Some(DiskError::Timeout)));
|
||||
|
||||
if !has_transient_error || retry_count >= 3 {
|
||||
break (parts, errs);
|
||||
}
|
||||
|
||||
info!(
|
||||
"read_all_fileinfo encountered transient errors, retrying (attempt {}/3). Errs: {:?}",
|
||||
retry_count + 1,
|
||||
errs
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(50 * (retry_count as u64 + 1))).await;
|
||||
retry_count += 1;
|
||||
}
|
||||
};
|
||||
info!(parts_count = parts_metadata.len(), ?errs, "File info read complete");
|
||||
info!(
|
||||
parts_count = parts_metadata.len(),
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
version_id = version_id,
|
||||
?errs,
|
||||
"File info read complete"
|
||||
);
|
||||
if DiskError::is_all_not_found(&errs) {
|
||||
warn!(
|
||||
"heal_object failed, all obj part not found, bucket: {}, obj: {}, version_id: {}",
|
||||
@@ -2726,25 +2723,31 @@ impl SetDisks {
|
||||
));
|
||||
}
|
||||
|
||||
info!(parts_count = parts_metadata.len(), "Initiating quorum check");
|
||||
info!(parts_count = parts_metadata.len(), "heal_object Initiating quorum check");
|
||||
match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count) {
|
||||
Ok((read_quorum, _)) => {
|
||||
result.parity_blocks = result.disk_count - read_quorum as usize;
|
||||
result.data_blocks = read_quorum as usize;
|
||||
|
||||
let ((online_disks, mod_time, etag), disk_len) = {
|
||||
let ((mut online_disks, quorum_mod_time, quorum_etag), disk_len) = {
|
||||
let disks = self.disks.read().await;
|
||||
let disk_len = disks.len();
|
||||
(Self::list_online_disks(&disks, &parts_metadata, &errs, read_quorum as usize), disk_len)
|
||||
};
|
||||
|
||||
match Self::pick_valid_fileinfo(&parts_metadata, mod_time, etag, read_quorum as usize) {
|
||||
info!(?parts_metadata, ?errs, ?read_quorum, ?disk_len, "heal_object List disks metadata");
|
||||
|
||||
info!(?online_disks, ?quorum_mod_time, ?quorum_etag, "heal_object List online disks");
|
||||
|
||||
let filter_by_etag = quorum_etag.is_some();
|
||||
match Self::pick_valid_fileinfo(&parts_metadata, quorum_mod_time, quorum_etag.clone(), read_quorum as usize) {
|
||||
Ok(latest_meta) => {
|
||||
let (available_disks, data_errs_by_disk, data_errs_by_part) = disks_with_all_parts(
|
||||
&online_disks,
|
||||
let (data_errs_by_disk, data_errs_by_part) = disks_with_all_parts(
|
||||
&mut online_disks,
|
||||
&mut parts_metadata,
|
||||
&errs,
|
||||
&latest_meta,
|
||||
filter_by_etag,
|
||||
bucket,
|
||||
object,
|
||||
opts.scan_mode,
|
||||
@@ -2752,9 +2755,9 @@ impl SetDisks {
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
"disks_with_all_parts results: available_disks count={}, total_disks={}",
|
||||
available_disks.iter().filter(|d| d.is_some()).count(),
|
||||
available_disks.len()
|
||||
"disks_with_all_parts heal_object results: available_disks count={}, total_disks={}",
|
||||
online_disks.iter().filter(|d| d.is_some()).count(),
|
||||
online_disks.len()
|
||||
);
|
||||
|
||||
let erasure = if !latest_meta.deleted && !latest_meta.is_remote() {
|
||||
@@ -2773,12 +2776,16 @@ impl SetDisks {
|
||||
// Loop to find number of disks with valid data, per-drive
|
||||
// data state and a list of outdated disks on which data needs
|
||||
// to be healed.
|
||||
let mut outdate_disks = vec![None; disk_len];
|
||||
let mut out_dated_disks = vec![None; disk_len];
|
||||
let mut disks_to_heal_count = 0;
|
||||
let mut meta_to_heal_count = 0;
|
||||
|
||||
info!("Checking {} disks for healing needs (bucket={}, object={})", disk_len, bucket, object);
|
||||
for index in 0..available_disks.len() {
|
||||
let (yes, reason) = should_heal_object_on_disk(
|
||||
info!(
|
||||
"heal_object Checking {} disks for healing needs (bucket={}, object={})",
|
||||
disk_len, bucket, object
|
||||
);
|
||||
for index in 0..online_disks.len() {
|
||||
let (yes, is_meta, reason) = should_heal_object_on_disk(
|
||||
&errs[index],
|
||||
&data_errs_by_disk[&index],
|
||||
&parts_metadata[index],
|
||||
@@ -2786,14 +2793,17 @@ impl SetDisks {
|
||||
);
|
||||
|
||||
info!(
|
||||
"Disk {} heal check: should_heal={}, reason={:?}, err={:?}, endpoint={}",
|
||||
"heal_object Disk {} heal check: should_heal={}, reason={:?}, err={:?}, endpoint={}",
|
||||
index, yes, reason, errs[index], self.set_endpoints[index]
|
||||
);
|
||||
|
||||
if yes {
|
||||
outdate_disks[index] = disks[index].clone();
|
||||
out_dated_disks[index] = disks[index].clone();
|
||||
disks_to_heal_count += 1;
|
||||
info!("Disk {} marked for healing (endpoint={})", index, self.set_endpoints[index]);
|
||||
if is_meta {
|
||||
meta_to_heal_count += 1;
|
||||
}
|
||||
info!("heal_object Disk {} marked for healing (endpoint={})", index, self.set_endpoints[index]);
|
||||
}
|
||||
|
||||
let drive_state = match reason {
|
||||
@@ -2804,7 +2814,8 @@ impl SetDisks {
|
||||
| DiskError::VolumeNotFound
|
||||
| DiskError::PartMissingOrCorrupt
|
||||
| DiskError::OutdatedXLMeta => DriveState::Missing.to_string(),
|
||||
_ => DriveState::Corrupt.to_string(),
|
||||
DiskError::FileCorrupt => DriveState::Corrupt.to_string(),
|
||||
_ => DriveState::Unknown(err.to_string()).to_string(),
|
||||
},
|
||||
None => DriveState::Ok.to_string(),
|
||||
};
|
||||
@@ -2826,24 +2837,6 @@ impl SetDisks {
|
||||
disks_to_heal_count, disk_len, bucket, object
|
||||
);
|
||||
|
||||
if DiskError::is_all_not_found(&errs) {
|
||||
warn!(
|
||||
"heal_object failed, all obj part not found, bucket: {}, obj: {}, version_id: {}",
|
||||
bucket, object, version_id
|
||||
);
|
||||
let err = if !version_id.is_empty() {
|
||||
DiskError::FileVersionNotFound
|
||||
} else {
|
||||
DiskError::FileNotFound
|
||||
};
|
||||
|
||||
return Ok((
|
||||
self.default_heal_result(FileInfo::default(), &errs, bucket, object, version_id)
|
||||
.await,
|
||||
Some(err),
|
||||
));
|
||||
}
|
||||
|
||||
if disks_to_heal_count == 0 {
|
||||
info!("No disks to heal, returning early");
|
||||
return Ok((result, None));
|
||||
@@ -2859,7 +2852,21 @@ impl SetDisks {
|
||||
disks_to_heal_count, opts.dry_run
|
||||
);
|
||||
|
||||
if !latest_meta.deleted && disks_to_heal_count > latest_meta.erasure.parity_blocks {
|
||||
let mut cannot_heal = !latest_meta.deleted && disks_to_heal_count > latest_meta.erasure.parity_blocks;
|
||||
if cannot_heal && quorum_etag.is_some() {
|
||||
cannot_heal = false;
|
||||
}
|
||||
|
||||
if !latest_meta.deleted && !latest_meta.is_remote() {
|
||||
for (_, part_errs) in data_errs_by_part.iter() {
|
||||
if count_part_not_success(part_errs) > latest_meta.erasure.parity_blocks {
|
||||
cannot_heal = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cannot_heal {
|
||||
let total_disks = parts_metadata.len();
|
||||
let healthy_count = total_disks.saturating_sub(disks_to_heal_count);
|
||||
let required_data = total_disks.saturating_sub(latest_meta.erasure.parity_blocks);
|
||||
@@ -2918,10 +2925,10 @@ impl SetDisks {
|
||||
};
|
||||
}
|
||||
|
||||
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != available_disks.len() {
|
||||
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != online_disks.len() {
|
||||
let err_str = format!(
|
||||
"unexpected file distribution ({:?}) from available disks ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})",
|
||||
latest_meta.erasure.distribution, available_disks, bucket, object, version_id
|
||||
latest_meta.erasure.distribution, online_disks, bucket, object, version_id
|
||||
);
|
||||
warn!(err_str);
|
||||
let err = DiskError::other(err_str);
|
||||
@@ -2931,11 +2938,11 @@ impl SetDisks {
|
||||
));
|
||||
}
|
||||
|
||||
let latest_disks = Self::shuffle_disks(&available_disks, &latest_meta.erasure.distribution);
|
||||
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != outdate_disks.len() {
|
||||
let latest_disks = Self::shuffle_disks(&online_disks, &latest_meta.erasure.distribution);
|
||||
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != out_dated_disks.len() {
|
||||
let err_str = format!(
|
||||
"unexpected file distribution ({:?}) from outdated disks ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})",
|
||||
latest_meta.erasure.distribution, outdate_disks, bucket, object, version_id
|
||||
latest_meta.erasure.distribution, out_dated_disks, bucket, object, version_id
|
||||
);
|
||||
warn!(err_str);
|
||||
let err = DiskError::other(err_str);
|
||||
@@ -2962,7 +2969,7 @@ impl SetDisks {
|
||||
));
|
||||
}
|
||||
|
||||
let out_dated_disks = Self::shuffle_disks(&outdate_disks, &latest_meta.erasure.distribution);
|
||||
out_dated_disks = Self::shuffle_disks(&out_dated_disks, &latest_meta.erasure.distribution);
|
||||
let mut parts_metadata = Self::shuffle_parts_metadata(&parts_metadata, &latest_meta.erasure.distribution);
|
||||
let mut copy_parts_metadata = vec![None; parts_metadata.len()];
|
||||
for (index, disk) in latest_disks.iter().enumerate() {
|
||||
@@ -2999,21 +3006,28 @@ impl SetDisks {
|
||||
latest_meta.is_remote()
|
||||
);
|
||||
if !latest_meta.deleted && !latest_meta.is_remote() {
|
||||
let erasure_info = latest_meta.erasure;
|
||||
for part in latest_meta.parts.iter() {
|
||||
let erasure_info = latest_meta.erasure.clone();
|
||||
for (part_index, part) in latest_meta.parts.iter().enumerate() {
|
||||
let till_offset = erasure.shard_file_offset(0, part.size, part.size);
|
||||
let checksum_algo = erasure_info.get_checksum_info(part.number).algorithm;
|
||||
let mut readers = Vec::with_capacity(latest_disks.len());
|
||||
let mut writers = Vec::with_capacity(out_dated_disks.len());
|
||||
// let mut errors = Vec::with_capacity(out_dated_disks.len());
|
||||
|
||||
let mut prefer = vec![false; latest_disks.len()];
|
||||
for (index, disk) in latest_disks.iter().enumerate() {
|
||||
let this_part_errs =
|
||||
Self::shuffle_check_parts(&data_errs_by_part[&index], &erasure_info.distribution);
|
||||
if this_part_errs[part_index] != CHECK_PART_SUCCESS {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let (Some(disk), Some(metadata)) = (disk, ©_parts_metadata[index]) {
|
||||
match create_bitrot_reader(
|
||||
metadata.data.as_deref(),
|
||||
Some(disk),
|
||||
bucket,
|
||||
&format!("{}/{}/part.{}", object, src_data_dir, part.number),
|
||||
&path_join_buf(&[object, &src_data_dir, &format!("part.{}", part.number)]),
|
||||
0,
|
||||
till_offset,
|
||||
erasure.shard_size(),
|
||||
@@ -3056,14 +3070,18 @@ impl SetDisks {
|
||||
latest_disks.len(),
|
||||
out_dated_disks.len()
|
||||
);
|
||||
for (index, disk) in latest_disks.iter().enumerate() {
|
||||
if let Some(outdated_disk) = &out_dated_disks[index] {
|
||||
for (index, disk_op) in out_dated_disks.iter().enumerate() {
|
||||
if let Some(outdated_disk) = disk_op {
|
||||
info!(disk_index = index, "Creating writer for outdated disk");
|
||||
let writer = match create_bitrot_writer(
|
||||
is_inline_buffer,
|
||||
Some(outdated_disk),
|
||||
RUSTFS_META_TMP_BUCKET,
|
||||
&format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number),
|
||||
&path_join_buf(&[
|
||||
&tmp_id.to_string(),
|
||||
&dst_data_dir.to_string(),
|
||||
&format!("part.{}", part.number),
|
||||
]),
|
||||
erasure.shard_file_size(part.size as i64),
|
||||
erasure.shard_size(),
|
||||
HashAlgorithm::HighwayHash256,
|
||||
@@ -3086,56 +3104,6 @@ impl SetDisks {
|
||||
info!(disk_index = index, "Skipping writer (disk not outdated)");
|
||||
writers.push(None);
|
||||
}
|
||||
|
||||
// if let Some(disk) = disk {
|
||||
// // let filewriter = {
|
||||
// // if is_inline_buffer {
|
||||
// // Box::new(Cursor::new(Vec::new()))
|
||||
// // } else {
|
||||
// // let disk = disk.clone();
|
||||
// // let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number);
|
||||
// // disk.create_file("", RUSTFS_META_TMP_BUCKET, &part_path, 0).await?
|
||||
// // }
|
||||
// // };
|
||||
|
||||
// if is_inline_buffer {
|
||||
// let writer = BitrotWriter::new(
|
||||
// Writer::from_cursor(Cursor::new(Vec::new())),
|
||||
// erasure.shard_size(),
|
||||
// HashAlgorithm::HighwayHash256,
|
||||
// );
|
||||
// writers.push(Some(writer));
|
||||
// } else {
|
||||
// let f = disk
|
||||
// .create_file(
|
||||
// "",
|
||||
// RUSTFS_META_TMP_BUCKET,
|
||||
// &format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number),
|
||||
// 0,
|
||||
// )
|
||||
// .await?;
|
||||
// let writer = BitrotWriter::new(
|
||||
// Writer::from_tokio_writer(f),
|
||||
// erasure.shard_size(),
|
||||
// HashAlgorithm::HighwayHash256,
|
||||
// );
|
||||
// writers.push(Some(writer));
|
||||
// }
|
||||
|
||||
// // let writer = new_bitrot_filewriter(
|
||||
// // disk.clone(),
|
||||
// // RUSTFS_META_TMP_BUCKET,
|
||||
// // format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number).as_str(),
|
||||
// // is_inline_buffer,
|
||||
// // DEFAULT_BITROT_ALGO,
|
||||
// // erasure.shard_size(erasure.block_size),
|
||||
// // )
|
||||
// // .await?;
|
||||
|
||||
// // writers.push(Some(writer));
|
||||
// } else {
|
||||
// writers.push(None);
|
||||
// }
|
||||
}
|
||||
// Heal each part. erasure.Heal() will write the healed
|
||||
// part to .rustfs/tmp/uuid/ which needs to be renamed
|
||||
@@ -3143,13 +3111,13 @@ impl SetDisks {
|
||||
erasure.heal(&mut writers, readers, part.size, &prefer).await?;
|
||||
// close_bitrot_writers(&mut writers).await?;
|
||||
|
||||
for (index, disk) in out_dated_disks.iter().enumerate() {
|
||||
if disk.is_none() {
|
||||
for (index, disk_op) in out_dated_disks.iter_mut().enumerate() {
|
||||
if disk_op.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if writers[index].is_none() {
|
||||
outdate_disks[index] = None;
|
||||
*disk_op = None;
|
||||
disks_to_heal_count -= 1;
|
||||
continue;
|
||||
}
|
||||
@@ -3218,45 +3186,49 @@ impl SetDisks {
|
||||
"Rename failed, attempting fallback"
|
||||
);
|
||||
|
||||
// Preserve temp files for safety
|
||||
info!(temp_uuid = %tmp_id, "Rename failed, preserving temporary files for safety");
|
||||
self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_id)
|
||||
.await
|
||||
.map_err(DiskError::other)?;
|
||||
|
||||
let healthy_index = latest_disks.iter().position(|d| d.is_some()).unwrap_or(0);
|
||||
// // Preserve temp files for safety
|
||||
// info!(temp_uuid = %tmp_id, "Rename failed, preserving temporary files for safety");
|
||||
|
||||
if let Some(healthy_disk) = &latest_disks[healthy_index] {
|
||||
let xlmeta_path = format!("{object}/xl.meta");
|
||||
// let healthy_index = latest_disks.iter().position(|d| d.is_some()).unwrap_or(0);
|
||||
|
||||
match healthy_disk.read_all(bucket, &xlmeta_path).await {
|
||||
Ok(xlmeta_bytes) => {
|
||||
if let Err(e) = disk.write_all(bucket, &xlmeta_path, xlmeta_bytes).await {
|
||||
info!("fallback xl.meta overwrite failed: {}", e.to_string());
|
||||
// if let Some(healthy_disk) = &latest_disks[healthy_index] {
|
||||
// let xlmeta_path = format!("{object}/xl.meta");
|
||||
|
||||
return Ok((
|
||||
result,
|
||||
Some(DiskError::other(format!("fallback xl.meta overwrite failed: {e}"))),
|
||||
));
|
||||
} else {
|
||||
info!("fallback xl.meta overwrite succeeded for disk {}", disk.to_string());
|
||||
}
|
||||
}
|
||||
// match healthy_disk.read_all(bucket, &xlmeta_path).await {
|
||||
// Ok(xlmeta_bytes) => {
|
||||
// if let Err(e) = disk.write_all(bucket, &xlmeta_path, xlmeta_bytes).await {
|
||||
// info!("fallback xl.meta overwrite failed: {}", e.to_string());
|
||||
|
||||
Err(e) => {
|
||||
info!("read healthy xl.meta failed: {}", e.to_string());
|
||||
// return Ok((
|
||||
// result,
|
||||
// Some(DiskError::other(format!("fallback xl.meta overwrite failed: {e}"))),
|
||||
// ));
|
||||
// } else {
|
||||
// info!("fallback xl.meta overwrite succeeded for disk {}", disk.to_string());
|
||||
// }
|
||||
// }
|
||||
|
||||
return Ok((
|
||||
result,
|
||||
Some(DiskError::other(format!("read healthy xl.meta failed: {e}"))),
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("no healthy disk found for xl.meta fallback overwrite");
|
||||
// Err(e) => {
|
||||
// info!("read healthy xl.meta failed: {}", e.to_string());
|
||||
|
||||
return Ok((
|
||||
result,
|
||||
Some(DiskError::other("no healthy disk found for xl.meta fallback overwrite")),
|
||||
));
|
||||
}
|
||||
// return Ok((
|
||||
// result,
|
||||
// Some(DiskError::other(format!("read healthy xl.meta failed: {e}"))),
|
||||
// ));
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// info!("no healthy disk found for xl.meta fallback overwrite");
|
||||
|
||||
// return Ok((
|
||||
// result,
|
||||
// Some(DiskError::other("no healthy disk found for xl.meta fallback overwrite")),
|
||||
// ));
|
||||
// }
|
||||
} else {
|
||||
info!(
|
||||
"Successfully renamed healed data for disk {} (endpoint={}), removing temp files from volume={}, path={}",
|
||||
@@ -3534,73 +3506,99 @@ impl SetDisks {
|
||||
data_errs_by_part: &HashMap<usize, Vec<usize>>,
|
||||
opts: ObjectOptions,
|
||||
) -> disk::error::Result<FileInfo> {
|
||||
if let Ok(m) = is_object_dang_ling(meta_arr, errs, data_errs_by_part) {
|
||||
let mut tags = HashMap::new();
|
||||
tags.insert("set", self.set_index.to_string());
|
||||
tags.insert("pool", self.pool_index.to_string());
|
||||
tags.insert("merrs", join_errs(errs));
|
||||
tags.insert("derrs", format!("{data_errs_by_part:?}"));
|
||||
if m.is_valid() {
|
||||
tags.insert("sz", m.size.to_string());
|
||||
tags.insert(
|
||||
"mt",
|
||||
m.mod_time
|
||||
.as_ref()
|
||||
.map_or(String::new(), |mod_time| mod_time.unix_timestamp().to_string()),
|
||||
);
|
||||
tags.insert("d:p", format!("{}:{}", m.erasure.data_blocks, m.erasure.parity_blocks));
|
||||
} else {
|
||||
tags.insert("invalid", "1".to_string());
|
||||
tags.insert(
|
||||
"d:p",
|
||||
format!("{}:{}", self.set_drive_count - self.default_parity_count, self.default_parity_count),
|
||||
);
|
||||
let (m, can_heal) = is_object_dang_ling(meta_arr, errs, data_errs_by_part);
|
||||
|
||||
if !can_heal {
|
||||
return Err(DiskError::ErasureReadQuorum);
|
||||
}
|
||||
|
||||
let mut tags: HashMap<String, String> = HashMap::new();
|
||||
tags.insert("set".to_string(), self.set_index.to_string());
|
||||
tags.insert("pool".to_string(), self.pool_index.to_string());
|
||||
tags.insert("merrs".to_string(), join_errs(errs));
|
||||
tags.insert("derrs".to_string(), format!("{data_errs_by_part:?}"));
|
||||
if m.is_valid() {
|
||||
tags.insert("sz".to_string(), m.size.to_string());
|
||||
tags.insert(
|
||||
"mt".to_string(),
|
||||
m.mod_time
|
||||
.as_ref()
|
||||
.map_or(String::new(), |mod_time| mod_time.unix_timestamp().to_string()),
|
||||
);
|
||||
tags.insert("d:p".to_string(), format!("{}:{}", m.erasure.data_blocks, m.erasure.parity_blocks));
|
||||
} else {
|
||||
tags.insert("invalid".to_string(), "1".to_string());
|
||||
tags.insert(
|
||||
"d:p".to_string(),
|
||||
format!("{}:{}", self.set_drive_count - self.default_parity_count, self.default_parity_count),
|
||||
);
|
||||
}
|
||||
let mut offline = 0;
|
||||
for (i, err) in errs.iter().enumerate() {
|
||||
let mut found = false;
|
||||
if let Some(err) = err
|
||||
&& err == &DiskError::DiskNotFound
|
||||
{
|
||||
found = true;
|
||||
}
|
||||
let mut offline = 0;
|
||||
for (i, err) in errs.iter().enumerate() {
|
||||
let mut found = false;
|
||||
if let Some(err) = err
|
||||
&& err == &DiskError::DiskNotFound
|
||||
for p in data_errs_by_part {
|
||||
if let Some(v) = p.1.get(i)
|
||||
&& *v == CHECK_PART_DISK_NOT_FOUND
|
||||
{
|
||||
found = true;
|
||||
}
|
||||
for p in data_errs_by_part {
|
||||
if let Some(v) = p.1.get(i)
|
||||
&& *v == CHECK_PART_DISK_NOT_FOUND
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
offline += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if offline > 0 {
|
||||
tags.insert("offline", offline.to_string());
|
||||
if found {
|
||||
offline += 1;
|
||||
}
|
||||
|
||||
// TODO: audit
|
||||
let mut fi = FileInfo::default();
|
||||
if let Some(ref version_id) = opts.version_id {
|
||||
fi.version_id = Uuid::parse_str(version_id).ok();
|
||||
}
|
||||
// TODO: tier
|
||||
for disk in self.disks.read().await.iter().flatten() {
|
||||
let _ = disk
|
||||
.delete_version(bucket, object, fi.clone(), false, DeleteOptions::default())
|
||||
.await;
|
||||
}
|
||||
Ok(m)
|
||||
} else {
|
||||
error!(
|
||||
"Object {}/{} is corrupted but not dangling (some parts exist). Preserving data for potential manual recovery. Errors: {:?}",
|
||||
bucket, object, errs
|
||||
);
|
||||
Err(DiskError::ErasureReadQuorum)
|
||||
}
|
||||
|
||||
if offline > 0 {
|
||||
tags.insert("offline".to_string(), offline.to_string());
|
||||
}
|
||||
|
||||
let mut fi = FileInfo::default();
|
||||
if let Some(ref version_id) = opts.version_id {
|
||||
fi.version_id = Uuid::parse_str(version_id).ok();
|
||||
}
|
||||
|
||||
fi.set_tier_free_version_id(&Uuid::new_v4().to_string());
|
||||
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
for disk_op in disks.iter() {
|
||||
let bucket = bucket.to_string();
|
||||
let object = object.to_string();
|
||||
let fi = fi.clone();
|
||||
futures.push(async move {
|
||||
if let Some(disk) = disk_op {
|
||||
disk.delete_version(&bucket, &object, fi, false, DeleteOptions::default())
|
||||
.await
|
||||
} else {
|
||||
Err(DiskError::DiskNotFound)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
for (index, result) in results.into_iter().enumerate() {
|
||||
let key = format!("ddisk-{index}");
|
||||
match result {
|
||||
Ok(_) => {
|
||||
tags.insert(key, "<nil>".to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
tags.insert(key, e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: audit
|
||||
|
||||
Ok(m)
|
||||
}
|
||||
|
||||
async fn delete_prefix(&self, bucket: &str, prefix: &str) -> disk::error::Result<()> {
|
||||
@@ -4067,6 +4065,14 @@ impl ObjectIO for SetDisks {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for SetDisks {
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
|
||||
self.fast_lock_manager
|
||||
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
|
||||
.await
|
||||
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
|
||||
unimplemented!()
|
||||
@@ -6314,8 +6320,7 @@ fn is_object_dang_ling(
|
||||
meta_arr: &[FileInfo],
|
||||
errs: &[Option<DiskError>],
|
||||
data_errs_by_part: &HashMap<usize, Vec<usize>>,
|
||||
) -> disk::error::Result<FileInfo> {
|
||||
let mut valid_meta = FileInfo::default();
|
||||
) -> (FileInfo, bool) {
|
||||
let (not_found_meta_errs, non_actionable_meta_errs) = dang_ling_meta_errs_count(errs);
|
||||
|
||||
let (mut not_found_parts_errs, mut non_actionable_parts_errs) = (0, 0);
|
||||
@@ -6327,42 +6332,42 @@ fn is_object_dang_ling(
|
||||
}
|
||||
});
|
||||
|
||||
meta_arr.iter().for_each(|fi| {
|
||||
let mut valid_meta = FileInfo::default();
|
||||
|
||||
for fi in meta_arr.iter() {
|
||||
if fi.is_valid() {
|
||||
valid_meta = fi.clone();
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if !valid_meta.is_valid() {
|
||||
let data_blocks = meta_arr.len().div_ceil(2);
|
||||
if not_found_parts_errs > data_blocks {
|
||||
return Ok(valid_meta);
|
||||
return (valid_meta, true);
|
||||
}
|
||||
|
||||
return Err(DiskError::other("not ok"));
|
||||
return (valid_meta, false);
|
||||
}
|
||||
|
||||
if non_actionable_meta_errs > 0 || non_actionable_parts_errs > 0 {
|
||||
return Err(DiskError::other("not ok"));
|
||||
return (valid_meta, false);
|
||||
}
|
||||
|
||||
if valid_meta.deleted {
|
||||
let data_blocks = errs.len().div_ceil(2);
|
||||
if not_found_meta_errs > data_blocks {
|
||||
return Ok(valid_meta);
|
||||
}
|
||||
return Err(DiskError::other("not ok"));
|
||||
return (valid_meta, not_found_meta_errs > data_blocks);
|
||||
}
|
||||
|
||||
if not_found_meta_errs > 0 && not_found_meta_errs > valid_meta.erasure.parity_blocks {
|
||||
return Ok(valid_meta);
|
||||
return (valid_meta, true);
|
||||
}
|
||||
|
||||
if !valid_meta.is_remote() && not_found_parts_errs > 0 && not_found_parts_errs > valid_meta.erasure.parity_blocks {
|
||||
return Ok(valid_meta);
|
||||
return (valid_meta, true);
|
||||
}
|
||||
|
||||
Err(DiskError::other("not ok"))
|
||||
(valid_meta, false)
|
||||
}
|
||||
|
||||
fn dang_ling_meta_errs_count(cerrs: &[Option<DiskError>]) -> (usize, usize) {
|
||||
@@ -6436,15 +6441,17 @@ fn join_errs(errs: &[Option<DiskError>]) -> String {
|
||||
/// It sets partsMetadata and onlineDisks when xl.meta is inexistant/corrupted or outdated.
|
||||
/// It also checks if the status of each part (corrupted, missing, ok) in each drive.
|
||||
/// Returns (availableDisks, dataErrsByDisk, dataErrsByPart).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn disks_with_all_parts(
|
||||
online_disks: &[Option<DiskStore>],
|
||||
online_disks: &mut [Option<DiskStore>],
|
||||
parts_metadata: &mut [FileInfo],
|
||||
errs: &[Option<DiskError>],
|
||||
latest_meta: &FileInfo,
|
||||
filter_by_etag: bool,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
scan_mode: HealScanMode,
|
||||
) -> disk::error::Result<(Vec<Option<DiskStore>>, HashMap<usize, Vec<usize>>, HashMap<usize, Vec<usize>>)> {
|
||||
) -> disk::error::Result<(HashMap<usize, Vec<usize>>, HashMap<usize, Vec<usize>>)> {
|
||||
let object_name = latest_meta.name.clone();
|
||||
debug!(
|
||||
"disks_with_all_partsv2: starting with object_name={}, online_disks.len()={}, scan_mode={:?}",
|
||||
@@ -6453,16 +6460,14 @@ async fn disks_with_all_parts(
|
||||
scan_mode
|
||||
);
|
||||
|
||||
let mut available_disks = vec![None; online_disks.len()];
|
||||
|
||||
// Initialize dataErrsByDisk and dataErrsByPart with 0 (CHECK_PART_UNKNOWN) to match Go
|
||||
let mut data_errs_by_disk: HashMap<usize, Vec<usize>> = HashMap::new();
|
||||
for i in 0..online_disks.len() {
|
||||
data_errs_by_disk.insert(i, vec![CHECK_PART_SUCCESS; latest_meta.parts.len()]);
|
||||
data_errs_by_disk.insert(i, vec![CHECK_PART_UNKNOWN; latest_meta.parts.len()]);
|
||||
}
|
||||
let mut data_errs_by_part: HashMap<usize, Vec<usize>> = HashMap::new();
|
||||
for i in 0..latest_meta.parts.len() {
|
||||
data_errs_by_part.insert(i, vec![CHECK_PART_SUCCESS; online_disks.len()]);
|
||||
data_errs_by_part.insert(i, vec![CHECK_PART_UNKNOWN; online_disks.len()]);
|
||||
}
|
||||
|
||||
// Check for inconsistent erasure distribution
|
||||
@@ -6498,49 +6503,63 @@ async fn disks_with_all_parts(
|
||||
meta_errs.push(None);
|
||||
}
|
||||
|
||||
let online_disks_len = online_disks.len();
|
||||
|
||||
// Process meta errors
|
||||
for (index, disk) in online_disks.iter().enumerate() {
|
||||
for (index, disk_op) in online_disks.iter_mut().enumerate() {
|
||||
if let Some(err) = &errs[index] {
|
||||
meta_errs[index] = Some(err.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
let disk = if let Some(disk) = disk {
|
||||
disk
|
||||
} else {
|
||||
meta_errs[index] = Some(DiskError::DiskNotFound);
|
||||
continue;
|
||||
};
|
||||
|
||||
if !disk.is_online().await {
|
||||
if disk_op.is_none() {
|
||||
meta_errs[index] = Some(DiskError::DiskNotFound);
|
||||
continue;
|
||||
}
|
||||
|
||||
let meta = &parts_metadata[index];
|
||||
// Check if metadata is corrupted (equivalent to filterByETag=false in Go)
|
||||
let corrupted = !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir);
|
||||
|
||||
let corrupted = if filter_by_etag {
|
||||
latest_meta.get_etag() != meta.get_etag()
|
||||
} else {
|
||||
!meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir)
|
||||
};
|
||||
|
||||
if corrupted {
|
||||
warn!(
|
||||
"disks_with_all_partsv2: metadata is corrupted, object_name={}, index: {index}",
|
||||
object_name
|
||||
);
|
||||
meta_errs[index] = Some(DiskError::FileCorrupt);
|
||||
parts_metadata[index] = FileInfo::default();
|
||||
*disk_op = None;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if erasure_distribution_reliable {
|
||||
if !meta.is_valid() {
|
||||
warn!(
|
||||
"disks_with_all_partsv2: metadata is not valid, object_name={}, index: {index}",
|
||||
object_name
|
||||
);
|
||||
parts_metadata[index] = FileInfo::default();
|
||||
meta_errs[index] = Some(DiskError::FileCorrupt);
|
||||
*disk_op = None;
|
||||
continue;
|
||||
}
|
||||
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !meta.deleted && meta.erasure.distribution.len() != online_disks.len() {
|
||||
if !meta.deleted && meta.erasure.distribution.len() != online_disks_len {
|
||||
// Erasure distribution is not the same as onlineDisks
|
||||
// attempt a fix if possible, assuming other entries
|
||||
// might have the right erasure distribution.
|
||||
warn!(
|
||||
"disks_with_all_partsv2: erasure distribution is not the same as onlineDisks, object_name={}, index: {index}",
|
||||
object_name
|
||||
);
|
||||
parts_metadata[index] = FileInfo::default();
|
||||
meta_errs[index] = Some(DiskError::FileCorrupt);
|
||||
*disk_op = None;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -6631,6 +6650,7 @@ async fn disks_with_all_parts(
|
||||
} else {
|
||||
match disk.check_parts(bucket, object, meta).await {
|
||||
Ok(v) => {
|
||||
info!("check_parts: verify_resp: {:?}", v);
|
||||
verify_resp = v;
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -6676,48 +6696,20 @@ async fn disks_with_all_parts(
|
||||
|
||||
// Build dataErrsByDisk from dataErrsByPart
|
||||
for (part, disks) in data_errs_by_part.iter() {
|
||||
for (disk_idx, disk_err) in disks.iter().enumerate() {
|
||||
if let Some(vec) = data_errs_by_disk.get_mut(&disk_idx)
|
||||
&& *part < vec.len()
|
||||
for disk_idx in disks.iter() {
|
||||
if let Some(parts) = data_errs_by_disk.get_mut(disk_idx)
|
||||
&& *part < parts.len()
|
||||
{
|
||||
vec[*part] = *disk_err;
|
||||
parts[*part] = disks[*disk_idx];
|
||||
info!(
|
||||
"data_errs_by_disk: update data_errs_by_disk: object_name={}, part: {part}, disk_idx: {disk_idx}, disk_err: {disk_err}",
|
||||
object_name,
|
||||
"data_errs_by_disk: update data_errs_by_disk: object_name={}, part: {part}, disk_idx: {disk_idx}, disk_err: {}",
|
||||
object_name, parts[*part]
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate available_disks based on meta_errs and data_errs_by_disk
|
||||
for (i, disk) in online_disks.iter().enumerate() {
|
||||
if let Some(disk_errs) = data_errs_by_disk.get(&i) {
|
||||
if meta_errs[i].is_none() && disk.is_some() && !has_part_err(disk_errs) {
|
||||
available_disks[i] = Some(disk.clone().unwrap());
|
||||
} else {
|
||||
warn!(
|
||||
"disks_with_all_partsv2: disk is not available, object_name={}, index: {}, meta_errs={:?}, disk_errs={:?}, disk_is_some={:?}",
|
||||
object_name,
|
||||
i,
|
||||
meta_errs[i],
|
||||
disk_errs,
|
||||
disk.is_some(),
|
||||
);
|
||||
parts_metadata[i] = FileInfo::default();
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"disks_with_all_partsv2: data_errs_by_disk missing entry for object_name={},index {}, meta_errs={:?}, disk_is_some={:?}",
|
||||
object_name,
|
||||
i,
|
||||
meta_errs[i],
|
||||
disk.is_some(),
|
||||
);
|
||||
parts_metadata[i] = FileInfo::default();
|
||||
}
|
||||
}
|
||||
|
||||
Ok((available_disks, data_errs_by_disk, data_errs_by_part))
|
||||
Ok((data_errs_by_disk, data_errs_by_part))
|
||||
}
|
||||
|
||||
pub fn should_heal_object_on_disk(
|
||||
@@ -6725,30 +6717,30 @@ pub fn should_heal_object_on_disk(
|
||||
parts_errs: &[usize],
|
||||
meta: &FileInfo,
|
||||
latest_meta: &FileInfo,
|
||||
) -> (bool, Option<DiskError>) {
|
||||
) -> (bool, bool, Option<DiskError>) {
|
||||
if let Some(err) = err
|
||||
&& (err == &DiskError::FileNotFound || err == &DiskError::FileVersionNotFound || err == &DiskError::FileCorrupt)
|
||||
{
|
||||
return (true, Some(err.clone()));
|
||||
return (true, true, Some(err.clone()));
|
||||
}
|
||||
|
||||
if latest_meta.volume != meta.volume
|
||||
|| latest_meta.name != meta.name
|
||||
|| latest_meta.version_id != meta.version_id
|
||||
|| latest_meta.deleted != meta.deleted
|
||||
{
|
||||
info!("latest_meta not Eq meta, latest_meta: {:?}, meta: {:?}", latest_meta, meta);
|
||||
return (true, Some(DiskError::OutdatedXLMeta));
|
||||
if err.is_some() {
|
||||
return (false, false, err.clone());
|
||||
}
|
||||
|
||||
if !meta.equals(latest_meta) {
|
||||
return (true, true, Some(DiskError::OutdatedXLMeta));
|
||||
}
|
||||
|
||||
if !meta.deleted && !meta.is_remote() {
|
||||
let err_vec = [CHECK_PART_FILE_NOT_FOUND, CHECK_PART_FILE_CORRUPT];
|
||||
for part_err in parts_errs.iter() {
|
||||
if err_vec.contains(part_err) {
|
||||
return (true, Some(DiskError::PartMissingOrCorrupt));
|
||||
return (true, false, Some(DiskError::PartMissingOrCorrupt));
|
||||
}
|
||||
}
|
||||
}
|
||||
(false, err.clone())
|
||||
(false, false, None)
|
||||
}
|
||||
|
||||
async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<rustfs_madmin::Disk> {
|
||||
@@ -7285,15 +7277,15 @@ mod tests {
|
||||
|
||||
// Test with file not found error
|
||||
let err = Some(DiskError::FileNotFound);
|
||||
let (should_heal, _) = should_heal_object_on_disk(&err, &[], &meta, &latest_meta);
|
||||
let (should_heal, _, _) = should_heal_object_on_disk(&err, &[], &meta, &latest_meta);
|
||||
assert!(should_heal);
|
||||
|
||||
// Test with no error and no part errors
|
||||
let (should_heal, _) = should_heal_object_on_disk(&None, &[CHECK_PART_SUCCESS], &meta, &latest_meta);
|
||||
let (should_heal, _, _) = should_heal_object_on_disk(&None, &[CHECK_PART_SUCCESS], &meta, &latest_meta);
|
||||
assert!(!should_heal);
|
||||
|
||||
// Test with part corruption
|
||||
let (should_heal, _) = should_heal_object_on_disk(&None, &[CHECK_PART_FILE_CORRUPT], &meta, &latest_meta);
|
||||
let (should_heal, _, _) = should_heal_object_on_disk(&None, &[CHECK_PART_FILE_CORRUPT], &meta, &latest_meta);
|
||||
assert!(should_heal);
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ use rustfs_common::{
|
||||
heal_channel::{DriveState, HealItemType},
|
||||
};
|
||||
use rustfs_filemeta::FileInfo;
|
||||
use rustfs_lock::FastLockGuard;
|
||||
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use rustfs_utils::{crc_hash, path::path_join_buf, sip_hash};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
@@ -363,6 +364,10 @@ impl ObjectIO for Sets {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for Sets {
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
|
||||
self.disk_set[0].new_ns_lock(bucket, object).await
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
|
||||
unimplemented!()
|
||||
|
||||
@@ -69,6 +69,7 @@ use rand::Rng as _;
|
||||
use rustfs_common::heal_channel::{HealItemType, HealOpts};
|
||||
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT};
|
||||
use rustfs_filemeta::FileInfo;
|
||||
use rustfs_lock::FastLockGuard;
|
||||
use rustfs_madmin::heal_commands::HealResultItem;
|
||||
use rustfs_utils::path::{decode_dir_object, encode_dir_object, path_join_buf};
|
||||
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
|
||||
@@ -1203,6 +1204,10 @@ lazy_static! {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for ECStore {
|
||||
#[instrument(skip(self))]
|
||||
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
|
||||
self.pools[0].new_ns_lock(bucket, object).await
|
||||
}
|
||||
#[instrument(skip(self))]
|
||||
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
|
||||
let (standard_sc_parity, rr_sc_parity) = {
|
||||
|
||||
@@ -31,6 +31,7 @@ use rustfs_filemeta::{
|
||||
ReplicationStatusType, RestoreStatusOps as _, VersionPurgeStatusType, parse_restore_obj_status, replication_statuses_map,
|
||||
version_purge_statuses_map,
|
||||
};
|
||||
use rustfs_lock::FastLockGuard;
|
||||
use rustfs_madmin::heal_commands::HealResultItem;
|
||||
use rustfs_rio::Checksum;
|
||||
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
|
||||
@@ -1349,6 +1350,7 @@ pub trait ObjectIO: Send + Sync + Debug + 'static {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub trait StorageAPI: ObjectIO + Debug {
|
||||
// NewNSLock TODO:
|
||||
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard>;
|
||||
// Shutdown TODO:
|
||||
// NSScanner TODO:
|
||||
|
||||
|
||||
@@ -1005,6 +1005,41 @@ impl FileMeta {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_file_info_versions(&self, volume: &str, path: &str, include_free_versions: bool) -> Result<FileInfoVersions> {
|
||||
let mut versions = self.into_file_info_versions(volume, path, true)?;
|
||||
|
||||
let mut n = 0;
|
||||
|
||||
let mut versions_vec = Vec::new();
|
||||
|
||||
for fi in versions.versions.iter() {
|
||||
if fi.tier_free_version() {
|
||||
if !include_free_versions {
|
||||
versions.free_versions.push(fi.clone());
|
||||
}
|
||||
} else {
|
||||
if !include_free_versions {
|
||||
versions_vec.push(fi.clone());
|
||||
}
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if !include_free_versions {
|
||||
versions.versions = versions_vec;
|
||||
}
|
||||
|
||||
for fi in versions.free_versions.iter_mut() {
|
||||
fi.num_versions = n;
|
||||
}
|
||||
|
||||
Ok(versions)
|
||||
}
|
||||
|
||||
pub fn get_all_file_info_versions(&self, volume: &str, path: &str, all_parts: bool) -> Result<FileInfoVersions> {
|
||||
self.into_file_info_versions(volume, path, all_parts)
|
||||
}
|
||||
|
||||
pub fn into_file_info_versions(&self, volume: &str, path: &str, all_parts: bool) -> Result<FileInfoVersions> {
|
||||
let mut versions = Vec::new();
|
||||
for version in self.versions.iter() {
|
||||
|
||||
@@ -708,7 +708,7 @@ pub fn parse_replicate_decision(_bucket: &str, s: &str) -> std::io::Result<Repli
|
||||
// }
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct ReplicateObjectInfo {
|
||||
pub name: String,
|
||||
pub size: i64,
|
||||
|
||||
@@ -438,6 +438,24 @@ pub struct DeletePathsResponse {
|
||||
pub error: ::core::option::Option<Error>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct ReadMetadataRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub disk: ::prost::alloc::string::String,
|
||||
#[prost(string, tag = "2")]
|
||||
pub volume: ::prost::alloc::string::String,
|
||||
#[prost(string, tag = "3")]
|
||||
pub path: ::prost::alloc::string::String,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct ReadMetadataResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
pub success: bool,
|
||||
#[prost(message, optional, tag = "2")]
|
||||
pub error: ::core::option::Option<Error>,
|
||||
#[prost(bytes = "bytes", tag = "3")]
|
||||
pub data: ::prost::bytes::Bytes,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct UpdateMetadataRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub disk: ::prost::alloc::string::String,
|
||||
@@ -1524,6 +1542,21 @@ pub mod node_service_client {
|
||||
.insert(GrpcMethod::new("node_service.NodeService", "UpdateMetadata"));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn read_metadata(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::ReadMetadataRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::ReadMetadataResponse>, tonic::Status> {
|
||||
self.inner
|
||||
.ready()
|
||||
.await
|
||||
.map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?;
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadMetadata");
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut()
|
||||
.insert(GrpcMethod::new("node_service.NodeService", "ReadMetadata"));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn write_metadata(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::WriteMetadataRequest>,
|
||||
@@ -2403,6 +2436,10 @@ pub mod node_service_server {
|
||||
&self,
|
||||
request: tonic::Request<super::UpdateMetadataRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::UpdateMetadataResponse>, tonic::Status>;
|
||||
async fn read_metadata(
|
||||
&self,
|
||||
request: tonic::Request<super::ReadMetadataRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::ReadMetadataResponse>, tonic::Status>;
|
||||
async fn write_metadata(
|
||||
&self,
|
||||
request: tonic::Request<super::WriteMetadataRequest>,
|
||||
@@ -3407,6 +3444,34 @@ pub mod node_service_server {
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/node_service.NodeService/ReadMetadata" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct ReadMetadataSvc<T: NodeService>(pub Arc<T>);
|
||||
impl<T: NodeService> tonic::server::UnaryService<super::ReadMetadataRequest> for ReadMetadataSvc<T> {
|
||||
type Response = super::ReadMetadataResponse;
|
||||
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
|
||||
fn call(&mut self, request: tonic::Request<super::ReadMetadataRequest>) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move { <T as NodeService>::read_metadata(&inner, request).await };
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let method = ReadMetadataSvc(inner);
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
|
||||
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/node_service.NodeService/WriteMetadata" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct WriteMetadataSvc<T: NodeService>(pub Arc<T>);
|
||||
|
||||
@@ -313,6 +313,18 @@ message DeletePathsResponse {
|
||||
optional Error error = 2;
|
||||
}
|
||||
|
||||
message ReadMetadataRequest {
|
||||
string disk = 1;
|
||||
string volume = 2;
|
||||
string path = 3;
|
||||
}
|
||||
|
||||
message ReadMetadataResponse {
|
||||
bool success = 1;
|
||||
optional Error error = 2;
|
||||
bytes data = 3;
|
||||
}
|
||||
|
||||
message UpdateMetadataRequest {
|
||||
string disk = 1;
|
||||
string volume = 2;
|
||||
@@ -786,6 +798,7 @@ service NodeService {
|
||||
rpc StatVolume(StatVolumeRequest) returns (StatVolumeResponse) {};
|
||||
rpc DeletePaths(DeletePathsRequest) returns (DeletePathsResponse) {};
|
||||
rpc UpdateMetadata(UpdateMetadataRequest) returns (UpdateMetadataResponse) {};
|
||||
rpc ReadMetadata(ReadMetadataRequest) returns (ReadMetadataResponse) {};
|
||||
rpc WriteMetadata(WriteMetadataRequest) returns (WriteMetadataResponse) {};
|
||||
rpc ReadVersion(ReadVersionRequest) returns (ReadVersionResponse) {};
|
||||
rpc ReadXL(ReadXLRequest) returns (ReadXLResponse) {};
|
||||
@@ -794,6 +807,7 @@ service NodeService {
|
||||
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
|
||||
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
|
||||
rpc DiskInfo(DiskInfoRequest) returns (DiskInfoResponse) {};
|
||||
|
||||
|
||||
/* -------------------------------lock service-------------------------- */
|
||||
|
||||
|
||||
61
crates/scanner/Cargo.toml
Normal file
61
crates/scanner/Cargo.toml
Normal file
@@ -0,0 +1,61 @@
|
||||
# Copyright 2024 RustFS Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
[package]
|
||||
name = "rustfs-scanner"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
authors = ["RustFS Team"]
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
rust-version.workspace = true
|
||||
homepage.workspace = true
|
||||
description = "RustFS Scanner provides scanning capabilities for data integrity checks, health monitoring, and storage analysis."
|
||||
keywords = ["RustFS", "scanner", "health-monitoring", "data-integrity", "storage-analysis", "Minio"]
|
||||
categories = ["web-programming", "development-tools", "filesystem"]
|
||||
documentation = "https://docs.rs/rustfs-scanner/latest/rustfs_scanner/"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
rustfs-config = { workspace = true }
|
||||
rustfs-common = { workspace = true }
|
||||
rustfs-utils = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v4", "serde"] }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
time = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
path-clean = { workspace = true }
|
||||
rmp-serde = { workspace = true }
|
||||
rustfs-filemeta = { workspace = true }
|
||||
rustfs-madmin = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
rustfs-ecstore = { workspace = true }
|
||||
http = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
s3s = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
36
crates/scanner/README.md
Normal file
36
crates/scanner/README.md
Normal file
@@ -0,0 +1,36 @@
|
||||
# RustFS Scanner
|
||||
|
||||
RustFS Scanner 提供了数据完整性检查、健康监控和存储分析等扫描功能。
|
||||
|
||||
## 功能特性
|
||||
|
||||
- 数据完整性扫描
|
||||
- 健康监控
|
||||
- 存储分析
|
||||
- 可扩展的扫描框架
|
||||
|
||||
## 使用示例
|
||||
|
||||
```rust
|
||||
use rustfs_scanner::ScannerError;
|
||||
|
||||
// TODO: 添加使用示例
|
||||
```
|
||||
|
||||
## 开发
|
||||
|
||||
### 构建
|
||||
|
||||
```bash
|
||||
cargo build --package rustfs-scanner
|
||||
```
|
||||
|
||||
### 测试
|
||||
|
||||
```bash
|
||||
cargo test --package rustfs-scanner
|
||||
```
|
||||
|
||||
## 许可证
|
||||
|
||||
Apache License 2.0
|
||||
1601
crates/scanner/src/data_usage_define.rs
Normal file
1601
crates/scanner/src/data_usage_define.rs
Normal file
File diff suppressed because it is too large
Load Diff
36
crates/scanner/src/error.rs
Normal file
36
crates/scanner/src/error.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
/// Scanner-related errors
|
||||
#[derive(Error, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub enum ScannerError {
|
||||
/// Configuration error
|
||||
#[error("Configuration error: {0}")]
|
||||
Config(String),
|
||||
|
||||
/// I/O error
|
||||
#[error("I/O error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
/// Serialization error
|
||||
#[error("Serialization error: {0}")]
|
||||
Serialization(#[from] serde_json::Error),
|
||||
|
||||
/// Generic error
|
||||
#[error("Scanner error: {0}")]
|
||||
Other(String),
|
||||
}
|
||||
886
crates/scanner/src/last_minute.rs
Normal file
886
crates/scanner/src/last_minute.rs
Normal file
@@ -0,0 +1,886 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TimedAction {
|
||||
count: u64,
|
||||
acc_time: u64,
|
||||
min_time: Option<u64>,
|
||||
max_time: Option<u64>,
|
||||
bytes: u64,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl TimedAction {
|
||||
// Avg returns the average time spent on the action.
|
||||
pub fn avg(&self) -> Option<Duration> {
|
||||
if self.count == 0 {
|
||||
return None;
|
||||
}
|
||||
Some(Duration::from_nanos(self.acc_time / self.count))
|
||||
}
|
||||
|
||||
// AvgBytes returns the average bytes processed.
|
||||
pub fn avg_bytes(&self) -> u64 {
|
||||
if self.count == 0 {
|
||||
return 0;
|
||||
}
|
||||
self.bytes / self.count
|
||||
}
|
||||
|
||||
// Merge other into t.
|
||||
pub fn merge(&mut self, other: TimedAction) {
|
||||
self.count += other.count;
|
||||
self.acc_time += other.acc_time;
|
||||
self.bytes += other.bytes;
|
||||
|
||||
if self.count == 0 {
|
||||
self.min_time = other.min_time;
|
||||
}
|
||||
if let Some(other_min) = other.min_time {
|
||||
self.min_time = self.min_time.map_or(Some(other_min), |min| Some(min.min(other_min)));
|
||||
}
|
||||
|
||||
self.max_time = self
|
||||
.max_time
|
||||
.map_or(other.max_time, |max| Some(max.max(other.max_time.unwrap_or(0))));
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug)]
|
||||
enum SizeCategory {
|
||||
SizeLessThan1KiB = 0,
|
||||
SizeLessThan1MiB,
|
||||
SizeLessThan10MiB,
|
||||
SizeLessThan100MiB,
|
||||
SizeLessThan1GiB,
|
||||
SizeGreaterThan1GiB,
|
||||
// Add new entries here
|
||||
SizeLastElemMarker,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SizeCategory {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = match *self {
|
||||
SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB",
|
||||
SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB",
|
||||
SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB",
|
||||
SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB",
|
||||
SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB",
|
||||
SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB",
|
||||
SizeCategory::SizeLastElemMarker => "SizeLastElemMarker",
|
||||
};
|
||||
write!(f, "{s}")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Copy)]
|
||||
pub struct AccElem {
|
||||
pub total: u64,
|
||||
pub size: u64,
|
||||
pub n: u64,
|
||||
}
|
||||
|
||||
impl AccElem {
|
||||
pub fn add(&mut self, dur: &Duration) {
|
||||
let dur = dur.as_secs();
|
||||
self.total = self.total.wrapping_add(dur);
|
||||
self.n = self.n.wrapping_add(1);
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, b: &AccElem) {
|
||||
self.n = self.n.wrapping_add(b.n);
|
||||
self.total = self.total.wrapping_add(b.total);
|
||||
self.size = self.size.wrapping_add(b.size);
|
||||
}
|
||||
|
||||
pub fn avg(&self) -> Duration {
|
||||
if self.n >= 1 && self.total > 0 {
|
||||
return Duration::from_secs(self.total / self.n);
|
||||
}
|
||||
Duration::from_secs(0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LastMinuteLatency {
|
||||
pub totals: Vec<AccElem>,
|
||||
pub last_sec: u64,
|
||||
}
|
||||
|
||||
impl Default for LastMinuteLatency {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
totals: vec![AccElem::default(); 60],
|
||||
last_sec: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LastMinuteLatency {
|
||||
pub fn merge(&mut self, o: &LastMinuteLatency) -> LastMinuteLatency {
|
||||
let mut merged = LastMinuteLatency::default();
|
||||
let mut x = o.clone();
|
||||
if self.last_sec > o.last_sec {
|
||||
x.forward_to(self.last_sec);
|
||||
merged.last_sec = self.last_sec;
|
||||
} else {
|
||||
self.forward_to(o.last_sec);
|
||||
merged.last_sec = o.last_sec;
|
||||
}
|
||||
|
||||
for i in 0..merged.totals.len() {
|
||||
merged.totals[i] = AccElem {
|
||||
total: self.totals[i].total + o.totals[i].total,
|
||||
n: self.totals[i].n + o.totals[i].n,
|
||||
size: self.totals[i].size + o.totals[i].size,
|
||||
}
|
||||
}
|
||||
merged
|
||||
}
|
||||
|
||||
pub fn add(&mut self, t: &Duration) {
|
||||
let sec = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs();
|
||||
self.forward_to(sec);
|
||||
let win_idx = sec % 60;
|
||||
self.totals[win_idx as usize].add(t);
|
||||
self.last_sec = sec;
|
||||
}
|
||||
|
||||
pub fn add_all(&mut self, sec: u64, a: &AccElem) {
|
||||
self.forward_to(sec);
|
||||
let win_idx = sec % 60;
|
||||
self.totals[win_idx as usize].merge(a);
|
||||
self.last_sec = sec;
|
||||
}
|
||||
|
||||
pub fn get_total(&mut self) -> AccElem {
|
||||
let mut res = AccElem::default();
|
||||
let sec = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs();
|
||||
self.forward_to(sec);
|
||||
for elem in self.totals.iter() {
|
||||
res.merge(elem);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
pub fn forward_to(&mut self, t: u64) {
|
||||
if self.last_sec >= t {
|
||||
return;
|
||||
}
|
||||
if t - self.last_sec >= 60 {
|
||||
self.totals = vec![AccElem::default(); 60];
|
||||
self.last_sec = t;
|
||||
return;
|
||||
}
|
||||
while self.last_sec != t {
|
||||
let idx = (self.last_sec + 1) % 60;
|
||||
self.totals[idx as usize] = AccElem::default();
|
||||
self.last_sec += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_default() {
|
||||
let elem = AccElem::default();
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.size, 0);
|
||||
assert_eq!(elem.n, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_add_single_duration() {
|
||||
let mut elem = AccElem::default();
|
||||
let duration = Duration::from_secs(5);
|
||||
|
||||
elem.add(&duration);
|
||||
|
||||
assert_eq!(elem.total, 5);
|
||||
assert_eq!(elem.n, 1);
|
||||
assert_eq!(elem.size, 0); // size is not modified by add
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_add_multiple_durations() {
|
||||
let mut elem = AccElem::default();
|
||||
|
||||
elem.add(&Duration::from_secs(3));
|
||||
elem.add(&Duration::from_secs(7));
|
||||
elem.add(&Duration::from_secs(2));
|
||||
|
||||
assert_eq!(elem.total, 12);
|
||||
assert_eq!(elem.n, 3);
|
||||
assert_eq!(elem.size, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_add_zero_duration() {
|
||||
let mut elem = AccElem::default();
|
||||
let duration = Duration::from_secs(0);
|
||||
|
||||
elem.add(&duration);
|
||||
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.n, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_add_subsecond_duration() {
|
||||
let mut elem = AccElem::default();
|
||||
// Duration less than 1 second should be truncated to 0
|
||||
let duration = Duration::from_millis(500);
|
||||
|
||||
elem.add(&duration);
|
||||
|
||||
assert_eq!(elem.total, 0); // as_secs() truncates subsecond values
|
||||
assert_eq!(elem.n, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_merge_empty_elements() {
|
||||
let mut elem1 = AccElem::default();
|
||||
let elem2 = AccElem::default();
|
||||
|
||||
elem1.merge(&elem2);
|
||||
|
||||
assert_eq!(elem1.total, 0);
|
||||
assert_eq!(elem1.size, 0);
|
||||
assert_eq!(elem1.n, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_merge_with_data() {
|
||||
let mut elem1 = AccElem {
|
||||
total: 10,
|
||||
size: 100,
|
||||
n: 2,
|
||||
};
|
||||
let elem2 = AccElem {
|
||||
total: 15,
|
||||
size: 200,
|
||||
n: 3,
|
||||
};
|
||||
|
||||
elem1.merge(&elem2);
|
||||
|
||||
assert_eq!(elem1.total, 25);
|
||||
assert_eq!(elem1.size, 300);
|
||||
assert_eq!(elem1.n, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_merge_one_empty() {
|
||||
let mut elem1 = AccElem {
|
||||
total: 10,
|
||||
size: 100,
|
||||
n: 2,
|
||||
};
|
||||
let elem2 = AccElem::default();
|
||||
|
||||
elem1.merge(&elem2);
|
||||
|
||||
assert_eq!(elem1.total, 10);
|
||||
assert_eq!(elem1.size, 100);
|
||||
assert_eq!(elem1.n, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_avg_with_data() {
|
||||
let elem = AccElem {
|
||||
total: 15,
|
||||
size: 0,
|
||||
n: 3,
|
||||
};
|
||||
|
||||
let avg = elem.avg();
|
||||
assert_eq!(avg, Duration::from_secs(5)); // 15 / 3 = 5
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_avg_zero_count() {
|
||||
let elem = AccElem {
|
||||
total: 10,
|
||||
size: 0,
|
||||
n: 0,
|
||||
};
|
||||
|
||||
let avg = elem.avg();
|
||||
assert_eq!(avg, Duration::from_secs(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_avg_zero_total() {
|
||||
let elem = AccElem { total: 0, size: 0, n: 5 };
|
||||
|
||||
let avg = elem.avg();
|
||||
assert_eq!(avg, Duration::from_secs(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_avg_rounding() {
|
||||
let elem = AccElem {
|
||||
total: 10,
|
||||
size: 0,
|
||||
n: 3,
|
||||
};
|
||||
|
||||
let avg = elem.avg();
|
||||
assert_eq!(avg, Duration::from_secs(3)); // 10 / 3 = 3 (integer division)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_default() {
|
||||
let latency = LastMinuteLatency::default();
|
||||
|
||||
assert_eq!(latency.totals.len(), 60);
|
||||
assert_eq!(latency.last_sec, 0);
|
||||
|
||||
// All elements should be default (empty)
|
||||
for elem in &latency.totals {
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.size, 0);
|
||||
assert_eq!(elem.n, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_forward_to_same_time() {
|
||||
let mut latency = LastMinuteLatency {
|
||||
last_sec: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Add some data to verify it's not cleared
|
||||
latency.totals[0].total = 10;
|
||||
latency.totals[0].n = 1;
|
||||
|
||||
latency.forward_to(100); // Same time
|
||||
|
||||
assert_eq!(latency.last_sec, 100);
|
||||
assert_eq!(latency.totals[0].total, 10); // Data should remain
|
||||
assert_eq!(latency.totals[0].n, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_forward_to_past_time() {
|
||||
let mut latency = LastMinuteLatency {
|
||||
last_sec: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Add some data to verify it's not cleared
|
||||
latency.totals[0].total = 10;
|
||||
latency.totals[0].n = 1;
|
||||
|
||||
latency.forward_to(50); // Past time
|
||||
|
||||
assert_eq!(latency.last_sec, 100); // Should not change
|
||||
assert_eq!(latency.totals[0].total, 10); // Data should remain
|
||||
assert_eq!(latency.totals[0].n, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_forward_to_large_gap() {
|
||||
let mut latency = LastMinuteLatency {
|
||||
last_sec: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Add some data to verify it's cleared
|
||||
latency.totals[0].total = 10;
|
||||
latency.totals[0].n = 1;
|
||||
|
||||
latency.forward_to(200); // Gap >= 60 seconds
|
||||
|
||||
assert_eq!(latency.last_sec, 200); // last_sec should be updated to target time
|
||||
|
||||
// All data should be cleared
|
||||
for elem in &latency.totals {
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.size, 0);
|
||||
assert_eq!(elem.n, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_forward_to_small_gap() {
|
||||
let mut latency = LastMinuteLatency {
|
||||
last_sec: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Add data at specific indices
|
||||
latency.totals[41].total = 10; // (100 + 1) % 60 = 41
|
||||
latency.totals[42].total = 20; // (100 + 2) % 60 = 42
|
||||
|
||||
latency.forward_to(102); // Forward by 2 seconds
|
||||
|
||||
assert_eq!(latency.last_sec, 102);
|
||||
|
||||
// The slots that were advanced should be cleared
|
||||
assert_eq!(latency.totals[41].total, 0); // Cleared during forward
|
||||
assert_eq!(latency.totals[42].total, 0); // Cleared during forward
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_add_all() {
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
let acc_elem = AccElem {
|
||||
total: 15,
|
||||
size: 100,
|
||||
n: 3,
|
||||
};
|
||||
|
||||
latency.add_all(1000, &acc_elem);
|
||||
|
||||
assert_eq!(latency.last_sec, 1000);
|
||||
let idx = 1000 % 60; // Should be 40
|
||||
assert_eq!(latency.totals[idx as usize].total, 15);
|
||||
assert_eq!(latency.totals[idx as usize].size, 100);
|
||||
assert_eq!(latency.totals[idx as usize].n, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_add_all_multiple() {
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
|
||||
let acc_elem1 = AccElem {
|
||||
total: 10,
|
||||
size: 50,
|
||||
n: 2,
|
||||
};
|
||||
let acc_elem2 = AccElem {
|
||||
total: 20,
|
||||
size: 100,
|
||||
n: 4,
|
||||
};
|
||||
|
||||
latency.add_all(1000, &acc_elem1);
|
||||
latency.add_all(1000, &acc_elem2); // Same second
|
||||
|
||||
let idx = 1000 % 60;
|
||||
assert_eq!(latency.totals[idx as usize].total, 30); // 10 + 20
|
||||
assert_eq!(latency.totals[idx as usize].size, 150); // 50 + 100
|
||||
assert_eq!(latency.totals[idx as usize].n, 6); // 2 + 4
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_merge_same_time() {
|
||||
let mut latency1 = LastMinuteLatency::default();
|
||||
let mut latency2 = LastMinuteLatency::default();
|
||||
|
||||
latency1.last_sec = 1000;
|
||||
latency2.last_sec = 1000;
|
||||
|
||||
// Add data to both
|
||||
latency1.totals[0].total = 10;
|
||||
latency1.totals[0].n = 2;
|
||||
latency2.totals[0].total = 20;
|
||||
latency2.totals[0].n = 3;
|
||||
|
||||
let merged = latency1.merge(&latency2);
|
||||
|
||||
assert_eq!(merged.last_sec, 1000);
|
||||
assert_eq!(merged.totals[0].total, 30); // 10 + 20
|
||||
assert_eq!(merged.totals[0].n, 5); // 2 + 3
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_merge_different_times() {
|
||||
let mut latency1 = LastMinuteLatency::default();
|
||||
let mut latency2 = LastMinuteLatency::default();
|
||||
|
||||
latency1.last_sec = 1000;
|
||||
latency2.last_sec = 1010; // 10 seconds later
|
||||
|
||||
// Add data to both
|
||||
latency1.totals[0].total = 10;
|
||||
latency2.totals[0].total = 20;
|
||||
|
||||
let merged = latency1.merge(&latency2);
|
||||
|
||||
assert_eq!(merged.last_sec, 1010); // Should use the later time
|
||||
assert_eq!(merged.totals[0].total, 30);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_merge_empty() {
|
||||
let mut latency1 = LastMinuteLatency::default();
|
||||
let latency2 = LastMinuteLatency::default();
|
||||
|
||||
let merged = latency1.merge(&latency2);
|
||||
|
||||
assert_eq!(merged.last_sec, 0);
|
||||
for elem in &merged.totals {
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.size, 0);
|
||||
assert_eq!(elem.n, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_window_wraparound() {
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
|
||||
// Test that indices wrap around correctly
|
||||
for sec in 0..120 {
|
||||
// Test for 2 minutes
|
||||
let acc_elem = AccElem {
|
||||
total: sec,
|
||||
size: 0,
|
||||
n: 1,
|
||||
};
|
||||
latency.add_all(sec, &acc_elem);
|
||||
|
||||
let expected_idx = sec % 60;
|
||||
assert_eq!(latency.totals[expected_idx as usize].total, sec);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_time_progression() {
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
|
||||
// Add data at time 1000
|
||||
latency.add_all(
|
||||
1000,
|
||||
&AccElem {
|
||||
total: 10,
|
||||
size: 0,
|
||||
n: 1,
|
||||
},
|
||||
);
|
||||
|
||||
// Forward to time 1030 (30 seconds later)
|
||||
latency.forward_to(1030);
|
||||
|
||||
// Original data should still be there
|
||||
let idx_1000 = 1000 % 60;
|
||||
assert_eq!(latency.totals[idx_1000 as usize].total, 10);
|
||||
|
||||
// Forward to time 1070 (70 seconds from original, > 60 seconds)
|
||||
latency.forward_to(1070);
|
||||
|
||||
// All data should be cleared due to large gap
|
||||
for elem in &latency.totals {
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.n, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_realistic_scenario() {
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
let base_time = 1000u64;
|
||||
|
||||
// Add data for exactly 60 seconds to fill the window
|
||||
for i in 0..60 {
|
||||
let current_time = base_time + i;
|
||||
let duration_secs = i % 10 + 1; // Varying durations 1-10 seconds
|
||||
let acc_elem = AccElem {
|
||||
total: duration_secs,
|
||||
size: 1024 * (i % 5 + 1), // Varying sizes
|
||||
n: 1,
|
||||
};
|
||||
|
||||
latency.add_all(current_time, &acc_elem);
|
||||
}
|
||||
|
||||
// Count non-empty slots after filling the window
|
||||
let mut non_empty_count = 0;
|
||||
let mut total_n = 0;
|
||||
let mut total_sum = 0;
|
||||
|
||||
for elem in &latency.totals {
|
||||
if elem.n > 0 {
|
||||
non_empty_count += 1;
|
||||
total_n += elem.n;
|
||||
total_sum += elem.total;
|
||||
}
|
||||
}
|
||||
|
||||
// We should have exactly 60 non-empty slots (one for each second in the window)
|
||||
assert_eq!(non_empty_count, 60);
|
||||
assert_eq!(total_n, 60); // 60 data points total
|
||||
assert!(total_sum > 0);
|
||||
|
||||
// Test manual total calculation (get_total uses system time which interferes with test)
|
||||
let mut manual_total = AccElem::default();
|
||||
for elem in &latency.totals {
|
||||
manual_total.merge(elem);
|
||||
}
|
||||
assert_eq!(manual_total.n, 60);
|
||||
assert_eq!(manual_total.total, total_sum);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_clone_and_debug() {
|
||||
let elem = AccElem {
|
||||
total: 100,
|
||||
size: 200,
|
||||
n: 5,
|
||||
};
|
||||
|
||||
let cloned = elem;
|
||||
assert_eq!(elem.total, cloned.total);
|
||||
assert_eq!(elem.size, cloned.size);
|
||||
assert_eq!(elem.n, cloned.n);
|
||||
|
||||
// Test Debug trait
|
||||
let debug_str = format!("{elem:?}");
|
||||
assert!(debug_str.contains("100"));
|
||||
assert!(debug_str.contains("200"));
|
||||
assert!(debug_str.contains("5"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_minute_latency_clone() {
|
||||
let mut latency = LastMinuteLatency {
|
||||
last_sec: 1000,
|
||||
..Default::default()
|
||||
};
|
||||
latency.totals[0].total = 100;
|
||||
latency.totals[0].n = 5;
|
||||
|
||||
let cloned = latency.clone();
|
||||
assert_eq!(latency.last_sec, cloned.last_sec);
|
||||
assert_eq!(latency.totals[0].total, cloned.totals[0].total);
|
||||
assert_eq!(latency.totals[0].n, cloned.totals[0].n);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_edge_case_max_values() {
|
||||
let mut elem = AccElem {
|
||||
total: u64::MAX - 50,
|
||||
size: u64::MAX - 50,
|
||||
n: u64::MAX - 50,
|
||||
};
|
||||
|
||||
let other = AccElem {
|
||||
total: 100,
|
||||
size: 100,
|
||||
n: 100,
|
||||
};
|
||||
|
||||
// This should not panic due to overflow, values will wrap around
|
||||
elem.merge(&other);
|
||||
|
||||
// Values should wrap around due to overflow (wrapping_add behavior)
|
||||
assert_eq!(elem.total, 49); // (u64::MAX - 50) + 100 wraps to 49
|
||||
assert_eq!(elem.size, 49);
|
||||
assert_eq!(elem.n, 49);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_forward_to_boundary_conditions() {
|
||||
let mut latency = LastMinuteLatency {
|
||||
last_sec: 59,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Add data at the last slot
|
||||
latency.totals[59].total = 100;
|
||||
latency.totals[59].n = 1;
|
||||
|
||||
// Forward exactly 60 seconds (boundary case)
|
||||
latency.forward_to(119);
|
||||
|
||||
// All data should be cleared
|
||||
for elem in &latency.totals {
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.n, 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_total_with_data() {
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
|
||||
// Set a recent timestamp to avoid forward_to clearing data
|
||||
let current_time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs();
|
||||
latency.last_sec = current_time;
|
||||
|
||||
// Add data to multiple slots
|
||||
latency.totals[0] = AccElem {
|
||||
total: 10,
|
||||
size: 100,
|
||||
n: 1,
|
||||
};
|
||||
latency.totals[1] = AccElem {
|
||||
total: 20,
|
||||
size: 200,
|
||||
n: 2,
|
||||
};
|
||||
latency.totals[59] = AccElem {
|
||||
total: 30,
|
||||
size: 300,
|
||||
n: 3,
|
||||
};
|
||||
|
||||
let total = latency.get_total();
|
||||
|
||||
assert_eq!(total.total, 60);
|
||||
assert_eq!(total.size, 600);
|
||||
assert_eq!(total.n, 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_window_index_calculation() {
|
||||
// Test that window index calculation works correctly
|
||||
let _latency = LastMinuteLatency::default();
|
||||
|
||||
let acc_elem = AccElem { total: 1, size: 1, n: 1 };
|
||||
|
||||
// Test various timestamps
|
||||
let test_cases = [(0, 0), (1, 1), (59, 59), (60, 0), (61, 1), (119, 59), (120, 0)];
|
||||
|
||||
for (timestamp, expected_idx) in test_cases {
|
||||
let mut test_latency = LastMinuteLatency::default();
|
||||
test_latency.add_all(timestamp, &acc_elem);
|
||||
|
||||
assert_eq!(
|
||||
test_latency.totals[expected_idx].n, 1,
|
||||
"Failed for timestamp {timestamp} (expected index {expected_idx})"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_concurrent_safety_simulation() {
|
||||
// Simulate concurrent access patterns
|
||||
let mut latency = LastMinuteLatency::default();
|
||||
|
||||
// Use current time to ensure data doesn't get cleared by get_total
|
||||
let current_time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs();
|
||||
|
||||
// Simulate rapid additions within a 60-second window
|
||||
for i in 0..1000 {
|
||||
let acc_elem = AccElem {
|
||||
total: (i % 10) + 1, // Ensure non-zero values
|
||||
size: (i % 100) + 1,
|
||||
n: 1,
|
||||
};
|
||||
// Keep all timestamps within the current minute window
|
||||
latency.add_all(current_time - (i % 60), &acc_elem);
|
||||
}
|
||||
|
||||
let total = latency.get_total();
|
||||
assert!(total.n > 0, "Total count should be greater than 0");
|
||||
assert!(total.total > 0, "Total time should be greater than 0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acc_elem_debug_format() {
|
||||
let elem = AccElem {
|
||||
total: 123,
|
||||
size: 456,
|
||||
n: 789,
|
||||
};
|
||||
|
||||
let debug_str = format!("{elem:?}");
|
||||
assert!(debug_str.contains("123"));
|
||||
assert!(debug_str.contains("456"));
|
||||
assert!(debug_str.contains("789"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_large_values() {
|
||||
let mut elem = AccElem::default();
|
||||
|
||||
// Test with large duration values
|
||||
let large_duration = Duration::from_secs(u64::MAX / 2);
|
||||
elem.add(&large_duration);
|
||||
|
||||
assert_eq!(elem.total, u64::MAX / 2);
|
||||
assert_eq!(elem.n, 1);
|
||||
|
||||
// Test average calculation with large values
|
||||
let avg = elem.avg();
|
||||
assert_eq!(avg, Duration::from_secs(u64::MAX / 2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_zero_duration_handling() {
|
||||
let mut elem = AccElem::default();
|
||||
|
||||
let zero_duration = Duration::from_secs(0);
|
||||
elem.add(&zero_duration);
|
||||
|
||||
assert_eq!(elem.total, 0);
|
||||
assert_eq!(elem.n, 1);
|
||||
assert_eq!(elem.avg(), Duration::from_secs(0));
|
||||
}
|
||||
}
|
||||
|
||||
const SIZE_LAST_ELEM_MARKER: usize = 10; // Assumed marker size is 10, modify according to actual situation
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct LastMinuteHistogram {
|
||||
histogram: Vec<LastMinuteLatency>,
|
||||
size: u32,
|
||||
}
|
||||
|
||||
impl LastMinuteHistogram {
|
||||
pub fn merge(&mut self, other: &LastMinuteHistogram) {
|
||||
for i in 0..self.histogram.len() {
|
||||
self.histogram[i].merge(&other.histogram[i]);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, size: i64, t: Duration) {
|
||||
let index = size_to_tag(size);
|
||||
self.histogram[index].add(&t);
|
||||
}
|
||||
|
||||
pub fn get_avg_data(&mut self) -> [AccElem; SIZE_LAST_ELEM_MARKER] {
|
||||
let mut res = [AccElem::default(); SIZE_LAST_ELEM_MARKER];
|
||||
for (i, elem) in self.histogram.iter_mut().enumerate() {
|
||||
res[i] = elem.get_total();
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
fn size_to_tag(size: i64) -> usize {
|
||||
match size {
|
||||
_ if size < 1024 => 0, // sizeLessThan1KiB
|
||||
_ if size < 1024 * 1024 => 1, // sizeLessThan1MiB
|
||||
_ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB
|
||||
_ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB
|
||||
_ if size < 1024 * 1024 * 1024 => 4, // sizeLessThan1GiB
|
||||
_ => 5, // sizeGreaterThan1GiB
|
||||
}
|
||||
}
|
||||
33
crates/scanner/src/lib.rs
Normal file
33
crates/scanner/src/lib.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||
#![warn(
|
||||
// missing_docs,
|
||||
rustdoc::missing_crate_level_docs,
|
||||
unreachable_pub,
|
||||
rust_2018_idioms
|
||||
)]
|
||||
|
||||
pub mod data_usage_define;
|
||||
pub mod error;
|
||||
pub mod last_minute;
|
||||
pub mod metrics;
|
||||
pub mod scanner;
|
||||
pub mod scanner_folder;
|
||||
pub mod scanner_io;
|
||||
|
||||
pub use data_usage_define::*;
|
||||
pub use error::ScannerError;
|
||||
pub use scanner::init_data_scanner;
|
||||
576
crates/scanner/src/metrics.rs
Normal file
576
crates/scanner/src/metrics.rs
Normal file
@@ -0,0 +1,576 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::last_minute::{AccElem, LastMinuteLatency};
|
||||
use chrono::{DateTime, Utc};
|
||||
use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Display,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
Arc, OnceLock,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum IlmAction {
|
||||
NoneAction = 0,
|
||||
DeleteAction,
|
||||
DeleteVersionAction,
|
||||
TransitionAction,
|
||||
TransitionVersionAction,
|
||||
DeleteRestoredAction,
|
||||
DeleteRestoredVersionAction,
|
||||
DeleteAllVersionsAction,
|
||||
DelMarkerDeleteAllVersionsAction,
|
||||
ActionCount,
|
||||
}
|
||||
|
||||
impl IlmAction {
|
||||
pub fn delete_restored(&self) -> bool {
|
||||
*self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction
|
||||
}
|
||||
|
||||
pub fn delete_versioned(&self) -> bool {
|
||||
*self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction
|
||||
}
|
||||
|
||||
pub fn delete_all(&self) -> bool {
|
||||
*self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction
|
||||
}
|
||||
|
||||
pub fn delete(&self) -> bool {
|
||||
if self.delete_restored() {
|
||||
return true;
|
||||
}
|
||||
*self == Self::DeleteVersionAction
|
||||
|| *self == Self::DeleteAction
|
||||
|| *self == Self::DeleteAllVersionsAction
|
||||
|| *self == Self::DelMarkerDeleteAllVersionsAction
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for IlmAction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
pub static GLOBAL_METRICS: OnceLock<Arc<Metrics>> = OnceLock::new();
|
||||
|
||||
pub fn global_metrics() -> &'static Arc<Metrics> {
|
||||
GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new()))
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd)]
|
||||
pub enum Metric {
|
||||
// START Realtime metrics, that only records
|
||||
// last minute latencies and total operation count.
|
||||
ReadMetadata = 0,
|
||||
CheckMissing,
|
||||
SaveUsage,
|
||||
ApplyAll,
|
||||
ApplyVersion,
|
||||
TierObjSweep,
|
||||
HealCheck,
|
||||
Ilm,
|
||||
CheckReplication,
|
||||
Yield,
|
||||
CleanAbandoned,
|
||||
ApplyNonCurrent,
|
||||
HealAbandonedVersion,
|
||||
|
||||
// START Trace metrics:
|
||||
StartTrace,
|
||||
ScanObject, // Scan object. All operations included.
|
||||
HealAbandonedObject,
|
||||
|
||||
// END realtime metrics:
|
||||
LastRealtime,
|
||||
|
||||
// Trace only metrics:
|
||||
ScanFolder, // Scan a folder on disk, recursively.
|
||||
ScanCycle, // Full cycle, cluster global.
|
||||
ScanBucketDrive, // Single bucket on one drive.
|
||||
CompactFolder, // Folder compacted.
|
||||
|
||||
// Must be last:
|
||||
Last,
|
||||
}
|
||||
|
||||
impl Metric {
|
||||
/// Convert to string representation for metrics
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::ReadMetadata => "read_metadata",
|
||||
Self::CheckMissing => "check_missing",
|
||||
Self::SaveUsage => "save_usage",
|
||||
Self::ApplyAll => "apply_all",
|
||||
Self::ApplyVersion => "apply_version",
|
||||
Self::TierObjSweep => "tier_obj_sweep",
|
||||
Self::HealCheck => "heal_check",
|
||||
Self::Ilm => "ilm",
|
||||
Self::CheckReplication => "check_replication",
|
||||
Self::Yield => "yield",
|
||||
Self::CleanAbandoned => "clean_abandoned",
|
||||
Self::ApplyNonCurrent => "apply_non_current",
|
||||
Self::HealAbandonedVersion => "heal_abandoned_version",
|
||||
Self::StartTrace => "start_trace",
|
||||
Self::ScanObject => "scan_object",
|
||||
Self::HealAbandonedObject => "heal_abandoned_object",
|
||||
Self::LastRealtime => "last_realtime",
|
||||
Self::ScanFolder => "scan_folder",
|
||||
Self::ScanCycle => "scan_cycle",
|
||||
Self::ScanBucketDrive => "scan_bucket_drive",
|
||||
Self::CompactFolder => "compact_folder",
|
||||
Self::Last => "last",
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert from index back to enum (safe version)
|
||||
pub fn from_index(index: usize) -> Option<Self> {
|
||||
if index >= Self::Last as usize {
|
||||
return None;
|
||||
}
|
||||
// Safe conversion using match instead of unsafe transmute
|
||||
match index {
|
||||
0 => Some(Self::ReadMetadata),
|
||||
1 => Some(Self::CheckMissing),
|
||||
2 => Some(Self::SaveUsage),
|
||||
3 => Some(Self::ApplyAll),
|
||||
4 => Some(Self::ApplyVersion),
|
||||
5 => Some(Self::TierObjSweep),
|
||||
6 => Some(Self::HealCheck),
|
||||
7 => Some(Self::Ilm),
|
||||
8 => Some(Self::CheckReplication),
|
||||
9 => Some(Self::Yield),
|
||||
10 => Some(Self::CleanAbandoned),
|
||||
11 => Some(Self::ApplyNonCurrent),
|
||||
12 => Some(Self::HealAbandonedVersion),
|
||||
13 => Some(Self::StartTrace),
|
||||
14 => Some(Self::ScanObject),
|
||||
15 => Some(Self::HealAbandonedObject),
|
||||
16 => Some(Self::LastRealtime),
|
||||
17 => Some(Self::ScanFolder),
|
||||
18 => Some(Self::ScanCycle),
|
||||
19 => Some(Self::ScanBucketDrive),
|
||||
20 => Some(Self::CompactFolder),
|
||||
21 => Some(Self::Last),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread-safe wrapper for LastMinuteLatency with atomic operations
|
||||
#[derive(Default)]
|
||||
pub struct LockedLastMinuteLatency {
|
||||
latency: Arc<Mutex<LastMinuteLatency>>,
|
||||
}
|
||||
|
||||
impl Clone for LockedLastMinuteLatency {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
latency: Arc::clone(&self.latency),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LockedLastMinuteLatency {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
latency: Arc::new(Mutex::new(LastMinuteLatency::default())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a duration measurement
|
||||
pub async fn add(&self, duration: Duration) {
|
||||
self.add_size(duration, 0).await;
|
||||
}
|
||||
|
||||
/// Add a duration measurement with size
|
||||
pub async fn add_size(&self, duration: Duration, size: u64) {
|
||||
let mut latency = self.latency.lock().await;
|
||||
let now = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
|
||||
let elem = AccElem {
|
||||
n: 1,
|
||||
total: duration.as_secs(),
|
||||
size,
|
||||
};
|
||||
latency.add_all(now, &elem);
|
||||
}
|
||||
|
||||
/// Get total accumulated metrics for the last minute
|
||||
pub async fn total(&self) -> AccElem {
|
||||
let mut latency = self.latency.lock().await;
|
||||
latency.get_total()
|
||||
}
|
||||
}
|
||||
|
||||
/// Current path tracker for monitoring active scan paths
|
||||
struct CurrentPathTracker {
|
||||
current_path: Arc<RwLock<String>>,
|
||||
}
|
||||
|
||||
impl CurrentPathTracker {
|
||||
fn new(initial_path: String) -> Self {
|
||||
Self {
|
||||
current_path: Arc::new(RwLock::new(initial_path)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_path(&self, path: String) {
|
||||
*self.current_path.write().await = path;
|
||||
}
|
||||
|
||||
async fn get_path(&self) -> String {
|
||||
self.current_path.read().await.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Main scanner metrics structure
|
||||
pub struct Metrics {
|
||||
// All fields must be accessed atomically and aligned.
|
||||
operations: Vec<AtomicU64>,
|
||||
latency: Vec<LockedLastMinuteLatency>,
|
||||
actions: Vec<AtomicU64>,
|
||||
actions_latency: Vec<LockedLastMinuteLatency>,
|
||||
// Current paths contains disk -> tracker mappings
|
||||
current_paths: Arc<RwLock<HashMap<String, Arc<CurrentPathTracker>>>>,
|
||||
|
||||
// Cycle information
|
||||
cycle_info: Arc<RwLock<Option<CurrentCycle>>>,
|
||||
}
|
||||
|
||||
// This is a placeholder. We'll need to define this struct.
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct CurrentCycle {
|
||||
pub current: u64,
|
||||
pub next: u64,
|
||||
pub cycle_completed: Vec<DateTime<Utc>>,
|
||||
pub started: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl CurrentCycle {
|
||||
pub fn unmarshal(&mut self, buf: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
*self = rmp_serde::from_slice(buf)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn marshal(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
Ok(rmp_serde::to_vec(self)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub fn new() -> Self {
|
||||
let operations = (0..Metric::Last as usize).map(|_| AtomicU64::new(0)).collect();
|
||||
|
||||
let latency = (0..Metric::LastRealtime as usize)
|
||||
.map(|_| LockedLastMinuteLatency::new())
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
operations,
|
||||
latency,
|
||||
actions: (0..IlmAction::ActionCount as usize).map(|_| AtomicU64::new(0)).collect(),
|
||||
actions_latency: vec![LockedLastMinuteLatency::default(); IlmAction::ActionCount as usize],
|
||||
current_paths: Arc::new(RwLock::new(HashMap::new())),
|
||||
cycle_info: Arc::new(RwLock::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Log scanner action with custom metadata - compatible with existing usage
|
||||
pub fn log(metric: Metric) -> impl Fn(&HashMap<String, String>) {
|
||||
let metric = metric as usize;
|
||||
let start_time = SystemTime::now();
|
||||
move |_custom: &HashMap<String, String>| {
|
||||
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
|
||||
|
||||
// Update operation count
|
||||
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Update latency for realtime metrics (spawn async task for this)
|
||||
if (metric) < Metric::LastRealtime as usize {
|
||||
let metric_index = metric;
|
||||
tokio::spawn(async move {
|
||||
global_metrics().latency[metric_index].add(duration).await;
|
||||
});
|
||||
}
|
||||
|
||||
// Log trace metrics
|
||||
if metric as u8 > Metric::StartTrace as u8 {
|
||||
//debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Time scanner action with size - returns function that takes size
|
||||
pub fn time_size(metric: Metric) -> impl Fn(u64) {
|
||||
let metric = metric as usize;
|
||||
let start_time = SystemTime::now();
|
||||
move |size: u64| {
|
||||
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
|
||||
|
||||
// Update operation count
|
||||
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Update latency for realtime metrics with size (spawn async task)
|
||||
if (metric) < Metric::LastRealtime as usize {
|
||||
let metric_index = metric;
|
||||
tokio::spawn(async move {
|
||||
global_metrics().latency[metric_index].add_size(duration, size).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Time a scanner action - returns a closure to call when done
|
||||
pub fn time(metric: Metric) -> impl Fn() {
|
||||
let metric = metric as usize;
|
||||
let start_time = SystemTime::now();
|
||||
move || {
|
||||
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
|
||||
|
||||
// Update operation count
|
||||
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Update latency for realtime metrics (spawn async task)
|
||||
if (metric) < Metric::LastRealtime as usize {
|
||||
let metric_index = metric;
|
||||
tokio::spawn(async move {
|
||||
global_metrics().latency[metric_index].add(duration).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Time N scanner actions - returns function that takes count, then returns completion function
|
||||
pub fn time_n(metric: Metric) -> Box<dyn Fn(usize) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
|
||||
let metric = metric as usize;
|
||||
let start_time = SystemTime::now();
|
||||
Box::new(move |count: usize| {
|
||||
Box::new(move || {
|
||||
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
|
||||
|
||||
// Update operation count
|
||||
global_metrics().operations[metric].fetch_add(count as u64, Ordering::Relaxed);
|
||||
|
||||
// Update latency for realtime metrics (spawn async task)
|
||||
if (metric) < Metric::LastRealtime as usize {
|
||||
let metric_index = metric;
|
||||
tokio::spawn(async move {
|
||||
global_metrics().latency[metric_index].add(duration).await;
|
||||
});
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Time ILM action with versions - returns function that takes versions, then returns completion function
|
||||
pub fn time_ilm(a: IlmAction) -> Box<dyn Fn(u64) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
|
||||
let a_clone = a as usize;
|
||||
if a_clone == IlmAction::NoneAction as usize || a_clone >= IlmAction::ActionCount as usize {
|
||||
return Box::new(move |_: u64| Box::new(move || {}));
|
||||
}
|
||||
let start = SystemTime::now();
|
||||
Box::new(move |versions: u64| {
|
||||
Box::new(move || {
|
||||
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
|
||||
tokio::spawn(async move {
|
||||
global_metrics().actions[a_clone].fetch_add(versions, Ordering::Relaxed);
|
||||
global_metrics().actions_latency[a_clone].add(duration).await;
|
||||
});
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Increment time with specific duration
|
||||
pub async fn inc_time(metric: Metric, duration: Duration) {
|
||||
let metric = metric as usize;
|
||||
// Update operation count
|
||||
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Update latency for realtime metrics
|
||||
if (metric) < Metric::LastRealtime as usize {
|
||||
global_metrics().latency[metric].add(duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get lifetime operation count for a metric
|
||||
pub fn lifetime(&self, metric: Metric) -> u64 {
|
||||
let metric = metric as usize;
|
||||
if (metric) >= Metric::Last as usize {
|
||||
return 0;
|
||||
}
|
||||
self.operations[metric].load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Get last minute statistics for a metric
|
||||
pub async fn last_minute(&self, metric: Metric) -> AccElem {
|
||||
let metric = metric as usize;
|
||||
if (metric) >= Metric::LastRealtime as usize {
|
||||
return AccElem::default();
|
||||
}
|
||||
self.latency[metric].total().await
|
||||
}
|
||||
|
||||
/// Set current cycle information
|
||||
pub async fn set_cycle(&self, cycle: Option<CurrentCycle>) {
|
||||
*self.cycle_info.write().await = cycle;
|
||||
}
|
||||
|
||||
/// Get current cycle information
|
||||
pub async fn get_cycle(&self) -> Option<CurrentCycle> {
|
||||
self.cycle_info.read().await.clone()
|
||||
}
|
||||
|
||||
/// Get current active paths
|
||||
pub async fn get_current_paths(&self) -> Vec<String> {
|
||||
let mut result = Vec::new();
|
||||
let paths = self.current_paths.read().await;
|
||||
|
||||
for (disk, tracker) in paths.iter() {
|
||||
let path = tracker.get_path().await;
|
||||
result.push(format!("{disk}/{path}"));
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Get number of active drives
|
||||
pub async fn active_drives(&self) -> usize {
|
||||
self.current_paths.read().await.len()
|
||||
}
|
||||
|
||||
/// Generate metrics report
|
||||
pub async fn report(&self) -> M_ScannerMetrics {
|
||||
let mut metrics = M_ScannerMetrics::default();
|
||||
|
||||
// Set cycle information
|
||||
if let Some(cycle) = self.get_cycle().await {
|
||||
metrics.current_cycle = cycle.current;
|
||||
metrics.cycles_completed_at = cycle.cycle_completed;
|
||||
metrics.current_started = cycle.started;
|
||||
}
|
||||
|
||||
metrics.collected_at = Utc::now();
|
||||
metrics.active_paths = self.get_current_paths().await;
|
||||
|
||||
// Lifetime operations
|
||||
for i in 0..Metric::Last as usize {
|
||||
let count = self.operations[i].load(Ordering::Relaxed);
|
||||
if count > 0
|
||||
&& let Some(metric) = Metric::from_index(i)
|
||||
{
|
||||
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
|
||||
}
|
||||
}
|
||||
|
||||
// Last minute statistics for realtime metrics
|
||||
for i in 0..Metric::LastRealtime as usize {
|
||||
let last_min = self.latency[i].total().await;
|
||||
if last_min.n > 0
|
||||
&& let Some(_metric) = Metric::from_index(i)
|
||||
{
|
||||
// Convert to madmin TimedAction format if needed
|
||||
// This would require implementing the conversion
|
||||
}
|
||||
}
|
||||
|
||||
metrics
|
||||
}
|
||||
}
|
||||
|
||||
// Type aliases for compatibility with existing code
|
||||
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
|
||||
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
|
||||
|
||||
/// Create a current path updater for tracking scan progress
|
||||
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
|
||||
let tracker = Arc::new(CurrentPathTracker::new(initial.to_string()));
|
||||
let disk_name = disk.to_string();
|
||||
|
||||
// Store the tracker in global metrics
|
||||
let tracker_clone = Arc::clone(&tracker);
|
||||
let disk_clone = disk_name.clone();
|
||||
tokio::spawn(async move {
|
||||
global_metrics().current_paths.write().await.insert(disk_clone, tracker_clone);
|
||||
});
|
||||
|
||||
let update_fn = {
|
||||
let tracker = Arc::clone(&tracker);
|
||||
Arc::new(move |path: &str| -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
|
||||
let tracker = Arc::clone(&tracker);
|
||||
let path = path.to_string();
|
||||
Box::pin(async move {
|
||||
tracker.update_path(path).await;
|
||||
})
|
||||
})
|
||||
};
|
||||
|
||||
let done_fn = {
|
||||
let disk_name = disk_name.clone();
|
||||
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
|
||||
let disk_name = disk_name.clone();
|
||||
Box::pin(async move {
|
||||
global_metrics().current_paths.write().await.remove(&disk_name);
|
||||
})
|
||||
})
|
||||
};
|
||||
|
||||
(update_fn, done_fn)
|
||||
}
|
||||
|
||||
impl Default for Metrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CloseDiskGuard(CloseDiskFn);
|
||||
|
||||
impl CloseDiskGuard {
|
||||
pub fn new(close_disk: CloseDiskFn) -> Self {
|
||||
Self(close_disk)
|
||||
}
|
||||
|
||||
pub async fn close(&self) {
|
||||
(self.0)().await;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CloseDiskGuard {
|
||||
fn drop(&mut self) {
|
||||
// Drop cannot be async, so we spawn the async cleanup task
|
||||
// The task will run in the background and complete asynchronously
|
||||
if let Ok(handle) = tokio::runtime::Handle::try_current() {
|
||||
let close_fn = self.0.clone();
|
||||
handle.spawn(async move {
|
||||
close_fn().await;
|
||||
});
|
||||
} else {
|
||||
// If we're not in a tokio runtime context, we can't spawn
|
||||
// This is a best-effort cleanup, so we just skip it
|
||||
}
|
||||
}
|
||||
}
|
||||
269
crates/scanner/src/scanner.rs
Normal file
269
crates/scanner/src/scanner.rs
Normal file
@@ -0,0 +1,269 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::data_usage_define::{BACKGROUND_HEAL_INFO_PATH, DATA_USAGE_BLOOM_NAME_PATH, DATA_USAGE_OBJ_NAME_PATH};
|
||||
use crate::metrics::CurrentCycle;
|
||||
use crate::metrics::global_metrics;
|
||||
use crate::scanner_folder::data_usage_update_dir_cycles;
|
||||
use crate::scanner_io::ScannerIO;
|
||||
use crate::{DataUsageInfo, ScannerError};
|
||||
use chrono::{DateTime, Utc};
|
||||
use rustfs_common::heal_channel::HealScanMode;
|
||||
use rustfs_config::{DEFAULT_DATA_SCANNER_START_DELAY_SECS, ENV_DATA_SCANNER_START_DELAY_SECS};
|
||||
use rustfs_ecstore::StorageAPI as _;
|
||||
use rustfs_ecstore::config::com::{read_config, save_config};
|
||||
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
|
||||
use rustfs_ecstore::error::Error as EcstoreError;
|
||||
use rustfs_ecstore::global::is_erasure_sd;
|
||||
use rustfs_ecstore::store::ECStore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
fn data_scanner_start_delay() -> Duration {
|
||||
let secs = rustfs_utils::get_env_u64(ENV_DATA_SCANNER_START_DELAY_SECS, DEFAULT_DATA_SCANNER_START_DELAY_SECS);
|
||||
Duration::from_secs(secs)
|
||||
}
|
||||
|
||||
pub async fn init_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) {
|
||||
let ctx_clone = ctx.clone();
|
||||
let storeapi_clone = storeapi.clone();
|
||||
tokio::spawn(async move {
|
||||
let sleep_time = Duration::from_secs(rand::random::<u64>() % 5);
|
||||
tokio::time::sleep(sleep_time).await;
|
||||
|
||||
loop {
|
||||
if ctx_clone.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = run_data_scanner(ctx_clone.clone(), storeapi_clone.clone()).await {
|
||||
error!("Failed to run data scanner: {e}");
|
||||
}
|
||||
tokio::time::sleep(data_scanner_start_delay()).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn get_cycle_scan_mode(_current_cycle: u64, _bitrot_start_cycle: u64, _bitrot_start_time: Option<DateTime<Utc>>) -> HealScanMode {
|
||||
// TODO: from config
|
||||
HealScanMode::Normal
|
||||
}
|
||||
|
||||
/// Background healing information
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BackgroundHealInfo {
|
||||
/// Bitrot scan start time
|
||||
pub bitrot_start_time: Option<DateTime<Utc>>,
|
||||
/// Bitrot scan start cycle
|
||||
pub bitrot_start_cycle: u64,
|
||||
/// Current scan mode
|
||||
pub current_scan_mode: HealScanMode,
|
||||
}
|
||||
|
||||
/// Read background healing information from storage
|
||||
pub async fn read_background_heal_info(storeapi: Arc<ECStore>) -> BackgroundHealInfo {
|
||||
// Skip for ErasureSD setup
|
||||
if is_erasure_sd().await {
|
||||
return BackgroundHealInfo::default();
|
||||
}
|
||||
|
||||
// Get last healing information
|
||||
match read_config(storeapi, &BACKGROUND_HEAL_INFO_PATH).await {
|
||||
Ok(buf) => match serde_json::from_slice::<BackgroundHealInfo>(&buf) {
|
||||
Ok(info) => info,
|
||||
Err(e) => {
|
||||
error!("Failed to unmarshal background heal info from {}: {}", &*BACKGROUND_HEAL_INFO_PATH, e);
|
||||
BackgroundHealInfo::default()
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
// Only log if it's not a ConfigNotFound error
|
||||
if e != EcstoreError::ConfigNotFound {
|
||||
warn!("Failed to read background heal info from {}: {}", &*BACKGROUND_HEAL_INFO_PATH, e);
|
||||
}
|
||||
BackgroundHealInfo::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Save background healing information to storage
|
||||
pub async fn save_background_heal_info(storeapi: Arc<ECStore>, info: BackgroundHealInfo) {
|
||||
// Skip for ErasureSD setup
|
||||
if is_erasure_sd().await {
|
||||
return;
|
||||
}
|
||||
|
||||
// Serialize to JSON
|
||||
let data = match serde_json::to_vec(&info) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!("Failed to marshal background heal info: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Save configuration
|
||||
if let Err(e) = save_config(storeapi, &BACKGROUND_HEAL_INFO_PATH, data).await {
|
||||
warn!("Failed to save background heal info to {}: {}", &*BACKGROUND_HEAL_INFO_PATH, e);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) -> Result<(), ScannerError> {
|
||||
// TODO: leader lock
|
||||
let _guard = match storeapi.new_ns_lock(RUSTFS_META_BUCKET, "leader.lock").await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
error!("run_data_scanner: other node is running, failed to acquire leader lock: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let mut cycle_info = CurrentCycle::default();
|
||||
let buf = read_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
if buf.len() == 8 {
|
||||
cycle_info.next = u64::from_le_bytes(buf.try_into().unwrap_or_default());
|
||||
} else if buf.len() > 8 {
|
||||
cycle_info.next = u64::from_le_bytes(buf[0..8].try_into().unwrap_or_default());
|
||||
if let Err(e) = cycle_info.unmarshal(&buf[8..]) {
|
||||
warn!("Failed to unmarshal cycle info: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
let mut ticker = tokio::time::interval(data_scanner_start_delay());
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ctx.cancelled() => {
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
|
||||
cycle_info.current = cycle_info.next;
|
||||
cycle_info.started = Utc::now();
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let background_heal_info = read_background_heal_info(storeapi.clone()).await;
|
||||
|
||||
let scan_mode = get_cycle_scan_mode(cycle_info.current, background_heal_info.bitrot_start_cycle, background_heal_info.bitrot_start_time);
|
||||
if background_heal_info.current_scan_mode != scan_mode {
|
||||
let mut new_heal_info = background_heal_info.clone();
|
||||
new_heal_info.current_scan_mode = scan_mode;
|
||||
|
||||
if scan_mode == HealScanMode::Deep {
|
||||
new_heal_info.bitrot_start_cycle = cycle_info.current;
|
||||
new_heal_info.bitrot_start_time = Some(Utc::now());
|
||||
}
|
||||
|
||||
save_background_heal_info(storeapi.clone(), new_heal_info).await;
|
||||
}
|
||||
|
||||
|
||||
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel::<DataUsageInfo>(1);
|
||||
let storeapi_clone = storeapi.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
store_data_usage_in_backend(ctx_clone, storeapi_clone, receiver).await;
|
||||
});
|
||||
|
||||
|
||||
if let Err(e) = storeapi.clone().nsscanner(ctx.clone(), sender, cycle_info.current, scan_mode).await {
|
||||
error!("Failed to scan namespace: {e}");
|
||||
} else {
|
||||
info!("Namespace scanned successfully");
|
||||
|
||||
cycle_info.next +=1;
|
||||
cycle_info.current = 0;
|
||||
cycle_info.cycle_completed.push(Utc::now());
|
||||
|
||||
if cycle_info.cycle_completed.len() >= data_usage_update_dir_cycles() as usize {
|
||||
cycle_info.cycle_completed = cycle_info.cycle_completed.split_off(data_usage_update_dir_cycles() as usize);
|
||||
}
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let cycle_info_buf = cycle_info.marshal().unwrap_or_default();
|
||||
|
||||
let mut buf = Vec::with_capacity(cycle_info_buf.len() + 8);
|
||||
buf.extend_from_slice(&cycle_info.next.to_le_bytes());
|
||||
buf.extend_from_slice(&cycle_info_buf);
|
||||
|
||||
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH, buf).await {
|
||||
error!("Failed to save data usage bloom name to {}: {}", &*DATA_USAGE_BLOOM_NAME_PATH, e);
|
||||
} else {
|
||||
info!("Data usage bloom name saved successfully");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
ticker.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
global_metrics().set_cycle(None).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Store data usage info in backend. Will store all objects sent on the receiver until closed.
|
||||
pub async fn store_data_usage_in_backend(
|
||||
ctx: CancellationToken,
|
||||
storeapi: Arc<ECStore>,
|
||||
mut receiver: mpsc::Receiver<DataUsageInfo>,
|
||||
) {
|
||||
let mut attempts = 1u32;
|
||||
|
||||
while let Some(data_usage_info) = receiver.recv().await {
|
||||
if ctx.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
debug!("store_data_usage_in_backend: received data usage info: {:?}", &data_usage_info);
|
||||
|
||||
// Serialize to JSON
|
||||
let data = match serde_json::to_vec(&data_usage_info) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!("Failed to marshal data usage info: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Save a backup every 10th update
|
||||
if attempts > 10 {
|
||||
let backup_path = format!("{:?}.bkp", &DATA_USAGE_OBJ_NAME_PATH);
|
||||
if let Err(e) = save_config(storeapi.clone(), &backup_path, data.clone()).await {
|
||||
warn!("Failed to save data usage backup to {}: {}", backup_path, e);
|
||||
}
|
||||
attempts = 1;
|
||||
}
|
||||
|
||||
// Save main configuration
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_OBJ_NAME_PATH, data).await {
|
||||
error!("Failed to save data usage info to {:?}: {e}", &DATA_USAGE_OBJ_NAME_PATH);
|
||||
}
|
||||
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
1209
crates/scanner/src/scanner_folder.rs
Normal file
1209
crates/scanner/src/scanner_folder.rs
Normal file
File diff suppressed because it is too large
Load Diff
629
crates/scanner/src/scanner_io.rs
Normal file
629
crates/scanner/src/scanner_io.rs
Normal file
@@ -0,0 +1,629 @@
|
||||
use crate::scanner_folder::{ScannerItem, scan_data_folder};
|
||||
use crate::{
|
||||
DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, DataUsageCache, DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo,
|
||||
DataUsageInfo, SizeSummary, TierStats,
|
||||
};
|
||||
use futures::future::join_all;
|
||||
use rand::seq::SliceRandom as _;
|
||||
use rustfs_common::heal_channel::HealScanMode;
|
||||
use rustfs_ecstore::bucket::bucket_target_sys::BucketTargetSys;
|
||||
use rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle;
|
||||
use rustfs_ecstore::bucket::metadata_sys::{get_lifecycle_config, get_object_lock_config, get_replication_config};
|
||||
use rustfs_ecstore::bucket::replication::{ReplicationConfig, ReplicationConfigurationExt};
|
||||
use rustfs_ecstore::bucket::versioning::VersioningApi as _;
|
||||
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use rustfs_ecstore::config::storageclass;
|
||||
use rustfs_ecstore::disk::STORAGE_FORMAT_FILE;
|
||||
use rustfs_ecstore::disk::{Disk, DiskAPI};
|
||||
use rustfs_ecstore::error::{Error, StorageError};
|
||||
use rustfs_ecstore::global::GLOBAL_TierConfigMgr;
|
||||
use rustfs_ecstore::new_object_layer_fn;
|
||||
use rustfs_ecstore::set_disk::SetDisks;
|
||||
use rustfs_ecstore::store_api::{BucketInfo, BucketOptions, ObjectInfo};
|
||||
use rustfs_ecstore::{StorageAPI, error::Result, store::ECStore};
|
||||
use rustfs_filemeta::FileMeta;
|
||||
use rustfs_utils::path::{SLASH_SEPARATOR_STR, path_join_buf};
|
||||
use s3s::dto::{BucketLifecycleConfiguration, ReplicationConfiguration};
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ScannerIO: Send + Sync + Debug + 'static {
|
||||
async fn nsscanner(
|
||||
&self,
|
||||
ctx: CancellationToken,
|
||||
updates: mpsc::Sender<DataUsageInfo>,
|
||||
want_cycle: u64,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ScannerIOCache: Send + Sync + Debug + 'static {
|
||||
async fn nsscanner_cache(
|
||||
self: Arc<Self>,
|
||||
ctx: CancellationToken,
|
||||
buckets: Vec<BucketInfo>,
|
||||
updates: mpsc::Sender<DataUsageCache>,
|
||||
want_cycle: u64,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ScannerIODisk: Send + Sync + Debug + 'static {
|
||||
async fn nsscanner_disk(
|
||||
&self,
|
||||
ctx: CancellationToken,
|
||||
cache: DataUsageCache,
|
||||
updates: Option<mpsc::Sender<DataUsageEntry>>,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<DataUsageCache>;
|
||||
|
||||
async fn get_size(&self, item: ScannerItem) -> Result<SizeSummary>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ScannerIO for ECStore {
|
||||
async fn nsscanner(
|
||||
&self,
|
||||
ctx: CancellationToken,
|
||||
updates: mpsc::Sender<DataUsageInfo>,
|
||||
want_cycle: u64,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<()> {
|
||||
let child_token = ctx.child_token();
|
||||
|
||||
let all_buckets = self.list_bucket(&BucketOptions::default()).await?;
|
||||
|
||||
if all_buckets.is_empty() {
|
||||
if let Err(e) = updates.send(DataUsageInfo::default()).await {
|
||||
error!("Failed to send data usage info: {}", e);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut total_results = 0;
|
||||
for pool in self.pools.iter() {
|
||||
total_results += pool.disk_set.len();
|
||||
}
|
||||
|
||||
let results = vec![DataUsageCache::default(); total_results];
|
||||
let results_mutex: Arc<Mutex<Vec<DataUsageCache>>> = Arc::new(Mutex::new(results));
|
||||
let first_err_mutex: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
|
||||
let mut results_index: i32 = -1_i32;
|
||||
let mut wait_futs = Vec::new();
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
for set in pool.disk_set.iter() {
|
||||
results_index += 1;
|
||||
|
||||
let results_index_clone = results_index as usize;
|
||||
// Clone the Arc to move it into the spawned task
|
||||
let set_clone: Arc<SetDisks> = Arc::clone(set);
|
||||
|
||||
let child_token_clone = child_token.clone();
|
||||
let want_cycle_clone = want_cycle;
|
||||
let scan_mode_clone = scan_mode;
|
||||
let results_mutex_clone = results_mutex.clone();
|
||||
let first_err_mutex_clone = first_err_mutex.clone();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<DataUsageCache>(1);
|
||||
|
||||
// Spawn task to receive and store results
|
||||
let receiver_fut = tokio::spawn(async move {
|
||||
while let Some(result) = rx.recv().await {
|
||||
let mut results = results_mutex_clone.lock().await;
|
||||
results[results_index_clone] = result;
|
||||
}
|
||||
});
|
||||
wait_futs.push(receiver_fut);
|
||||
|
||||
let all_buckets_clone = all_buckets.clone();
|
||||
// Spawn task to run the scanner
|
||||
let scanner_fut = tokio::spawn(async move {
|
||||
if let Err(e) = set_clone
|
||||
.nsscanner_cache(child_token_clone.clone(), all_buckets_clone, tx, want_cycle_clone, scan_mode_clone)
|
||||
.await
|
||||
{
|
||||
error!("Failed to scan set: {e}");
|
||||
let _ = first_err_mutex_clone.lock().await.insert(e);
|
||||
child_token_clone.cancel();
|
||||
}
|
||||
});
|
||||
wait_futs.push(scanner_fut);
|
||||
}
|
||||
}
|
||||
|
||||
let (update_tx, mut update_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
|
||||
let all_buckets_clone = all_buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>();
|
||||
tokio::spawn(async move {
|
||||
let mut last_update = SystemTime::now();
|
||||
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(30));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = child_token.cancelled() => {
|
||||
break;
|
||||
}
|
||||
res = &mut update_rx => {
|
||||
if res.is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
let results = results_mutex.lock().await;
|
||||
let mut all_merged = DataUsageCache::default();
|
||||
for result in results.iter() {
|
||||
if result.info.last_update.is_none() {
|
||||
return;
|
||||
}
|
||||
all_merged.merge(result);
|
||||
}
|
||||
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update
|
||||
&& let Err(e) = updates
|
||||
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
|
||||
.await {
|
||||
error!("Failed to send data usage info: {}", e);
|
||||
}
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
let results = results_mutex.lock().await;
|
||||
let mut all_merged = DataUsageCache::default();
|
||||
for result in results.iter() {
|
||||
if result.info.last_update.is_none() {
|
||||
return;
|
||||
}
|
||||
all_merged.merge(result);
|
||||
}
|
||||
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update {
|
||||
if let Err(e) = updates
|
||||
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
|
||||
.await {
|
||||
error!("Failed to send data usage info: {}", e);
|
||||
}
|
||||
last_update = all_merged.info.last_update.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let _ = join_all(wait_futs).await;
|
||||
|
||||
let _ = update_tx.send(());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ScannerIOCache for SetDisks {
|
||||
async fn nsscanner_cache(
|
||||
self: Arc<Self>,
|
||||
ctx: CancellationToken,
|
||||
buckets: Vec<BucketInfo>,
|
||||
updates: mpsc::Sender<DataUsageCache>,
|
||||
want_cycle: u64,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<()> {
|
||||
if buckets.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (disks, healing) = self.get_online_disks_with_healing(false).await;
|
||||
if disks.is_empty() {
|
||||
info!("No online disks available for set");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut old_cache = DataUsageCache::default();
|
||||
old_cache.load(self.clone(), DATA_USAGE_CACHE_NAME).await?;
|
||||
|
||||
let mut cache = DataUsageCache {
|
||||
info: DataUsageCacheInfo {
|
||||
name: DATA_USAGE_ROOT.to_string(),
|
||||
next_cycle: old_cache.info.next_cycle,
|
||||
..Default::default()
|
||||
},
|
||||
cache: HashMap::new(),
|
||||
};
|
||||
|
||||
let (bucket_tx, bucket_rx) = mpsc::channel::<BucketInfo>(buckets.len());
|
||||
|
||||
let mut permutes = buckets.clone();
|
||||
permutes.shuffle(&mut rand::rng());
|
||||
|
||||
for bucket in permutes.iter() {
|
||||
if old_cache.find(&bucket.name).is_none()
|
||||
&& let Err(e) = bucket_tx.send(bucket.clone()).await
|
||||
{
|
||||
error!("Failed to send bucket info: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
for bucket in permutes.iter() {
|
||||
if let Some(c) = old_cache.find(&bucket.name) {
|
||||
cache.replace(&bucket.name, DATA_USAGE_ROOT, c.clone());
|
||||
|
||||
if let Err(e) = bucket_tx.send(bucket.clone()).await {
|
||||
error!("Failed to send bucket info: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(bucket_tx);
|
||||
|
||||
let cache_mutex: Arc<Mutex<DataUsageCache>> = Arc::new(Mutex::new(cache));
|
||||
|
||||
let (bucket_result_tx, mut bucket_result_rx) = mpsc::channel::<DataUsageEntryInfo>(disks.len());
|
||||
|
||||
let cache_mutex_clone = cache_mutex.clone();
|
||||
let store_clone = self.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
let send_update_fut = tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(3 + rand::random::<u64>() % 10));
|
||||
|
||||
let mut last_update = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ctx_clone.cancelled() => {
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
|
||||
let cache = cache_mutex_clone.lock().await;
|
||||
if cache.info.last_update == last_update {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = cache.save(store_clone.clone(), DATA_USAGE_CACHE_NAME).await {
|
||||
error!("Failed to save data usage cache: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = updates.send(cache.clone()).await {
|
||||
error!("Failed to send data usage cache: {}", e);
|
||||
|
||||
}
|
||||
|
||||
last_update = cache.info.last_update;
|
||||
}
|
||||
res = bucket_result_rx.recv() => {
|
||||
if let Some(result) = res {
|
||||
let mut cache = cache_mutex_clone.lock().await;
|
||||
cache.replace(&result.name, &result.parent, result.entry);
|
||||
cache.info.last_update = Some(SystemTime::now());
|
||||
|
||||
} else {
|
||||
let mut cache = cache_mutex_clone.lock().await;
|
||||
cache.info.next_cycle =want_cycle;
|
||||
cache.info.last_update = Some(SystemTime::now());
|
||||
|
||||
if let Err(e) = cache.save(store_clone.clone(), DATA_USAGE_CACHE_NAME).await {
|
||||
error!("Failed to save data usage cache: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = updates.send(cache.clone()).await {
|
||||
error!("Failed to send data usage cache: {}", e);
|
||||
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut futs = Vec::new();
|
||||
|
||||
let bucket_rx_mutex: Arc<Mutex<mpsc::Receiver<BucketInfo>>> = Arc::new(Mutex::new(bucket_rx));
|
||||
let bucket_result_tx_clone: Arc<Mutex<mpsc::Sender<DataUsageEntryInfo>>> = Arc::new(Mutex::new(bucket_result_tx));
|
||||
for disk in disks.into_iter() {
|
||||
let bucket_rx_mutex_clone = bucket_rx_mutex.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
let store_clone_clone = self.clone();
|
||||
let bucket_result_tx_clone_clone = bucket_result_tx_clone.clone();
|
||||
futs.push(tokio::spawn(async move {
|
||||
while let Some(bucket) = bucket_rx_mutex_clone.lock().await.recv().await {
|
||||
if ctx_clone.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
debug!("nsscanner_disk: got bucket: {}", bucket.name);
|
||||
|
||||
let cache_name = path_join_buf(&[&bucket.name, DATA_USAGE_CACHE_NAME]);
|
||||
|
||||
let mut cache = DataUsageCache::default();
|
||||
if let Err(e) = cache.load(store_clone_clone.clone(), &cache_name).await {
|
||||
error!("Failed to load data usage cache: {}", e);
|
||||
}
|
||||
|
||||
if cache.info.name.is_empty() {
|
||||
cache.info.name = bucket.name.clone();
|
||||
}
|
||||
|
||||
cache.info.skip_healing = healing;
|
||||
cache.info.next_cycle = want_cycle;
|
||||
if cache.info.name != bucket.name {
|
||||
cache.info = DataUsageCacheInfo {
|
||||
name: bucket.name.clone(),
|
||||
next_cycle: want_cycle,
|
||||
..Default::default()
|
||||
};
|
||||
}
|
||||
|
||||
warn!("nsscanner_disk: cache.info.name: {:?}", cache.info.name);
|
||||
|
||||
let (updates_tx, mut updates_rx) = mpsc::channel::<DataUsageEntry>(1);
|
||||
|
||||
let ctx_clone_clone = ctx_clone.clone();
|
||||
let bucket_name_clone = bucket.name.clone();
|
||||
let bucket_result_tx_clone_clone_clone = bucket_result_tx_clone_clone.clone();
|
||||
let update_fut = tokio::spawn(async move {
|
||||
while let Some(result) = updates_rx.recv().await {
|
||||
if ctx_clone_clone.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = bucket_result_tx_clone_clone_clone
|
||||
.lock()
|
||||
.await
|
||||
.send(DataUsageEntryInfo {
|
||||
name: bucket_name_clone.clone(),
|
||||
parent: DATA_USAGE_ROOT.to_string(),
|
||||
entry: result,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("Failed to send data usage entry info: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let before = cache.info.last_update;
|
||||
|
||||
cache = match disk
|
||||
.clone()
|
||||
.nsscanner_disk(ctx_clone.clone(), cache.clone(), Some(updates_tx), scan_mode)
|
||||
.await
|
||||
{
|
||||
Ok(cache) => cache,
|
||||
Err(e) => {
|
||||
error!("Failed to scan disk: {}", e);
|
||||
|
||||
if let (Some(last_update), Some(before_update)) = (cache.info.last_update, before)
|
||||
&& last_update > before_update
|
||||
&& let Err(e) = cache.save(store_clone_clone.clone(), cache_name.as_str()).await
|
||||
{
|
||||
error!("Failed to save data usage cache: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = update_fut.await {
|
||||
error!("Failed to update data usage cache: {}", e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
debug!("nsscanner_disk: got cache: {}", cache.info.name);
|
||||
|
||||
if let Err(e) = update_fut.await {
|
||||
error!("nsscanner_disk: Failed to update data usage cache: {}", e);
|
||||
}
|
||||
|
||||
let root = if let Some(r) = cache.root() {
|
||||
cache.flatten(&r)
|
||||
} else {
|
||||
DataUsageEntry::default()
|
||||
};
|
||||
|
||||
if ctx_clone.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
debug!("nsscanner_disk: sending data usage entry info: {}", cache.info.name);
|
||||
|
||||
if let Err(e) = bucket_result_tx_clone_clone
|
||||
.lock()
|
||||
.await
|
||||
.send(DataUsageEntryInfo {
|
||||
name: cache.info.name.clone(),
|
||||
parent: DATA_USAGE_ROOT.to_string(),
|
||||
entry: root,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("nsscanner_disk: Failed to send data usage entry info: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = cache.save(store_clone_clone.clone(), &cache_name).await {
|
||||
error!("nsscanner_disk: Failed to save data usage cache: {}", e);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
let _ = join_all(futs).await;
|
||||
|
||||
warn!("nsscanner_cache: joining all futures");
|
||||
|
||||
drop(bucket_result_tx_clone);
|
||||
|
||||
warn!("nsscanner_cache: dropping bucket result tx");
|
||||
|
||||
send_update_fut.await?;
|
||||
|
||||
warn!("nsscanner_cache: done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ScannerIODisk for Disk {
|
||||
async fn get_size(&self, mut item: ScannerItem) -> Result<SizeSummary> {
|
||||
if !item.path.ends_with(&format!("{SLASH_SEPARATOR_STR}{STORAGE_FORMAT_FILE}")) {
|
||||
return Err(StorageError::other("skip file".to_string()));
|
||||
}
|
||||
|
||||
debug!("get_size: reading metadata for {}/{}", &item.bucket, &item.object_path());
|
||||
|
||||
let data = match self.read_metadata(&item.bucket, &item.object_path()).await {
|
||||
Ok(data) => data,
|
||||
Err(e) => return Err(StorageError::other(format!("Failed to read metadata: {e}"))),
|
||||
};
|
||||
|
||||
item.transform_meta_dir();
|
||||
|
||||
let meta = FileMeta::load(&data)?;
|
||||
let fivs = match meta.get_file_info_versions(item.bucket.as_str(), item.object_path().as_str(), false) {
|
||||
Ok(versions) => versions,
|
||||
Err(e) => {
|
||||
error!("Failed to get file info versions: {}", e);
|
||||
return Err(StorageError::other("skip file".to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
let versioned = BucketVersioningSys::get(&item.bucket)
|
||||
.await
|
||||
.map(|v| v.versioned(&item.object_path()))
|
||||
.unwrap_or(false);
|
||||
|
||||
let object_infos = fivs
|
||||
.versions
|
||||
.iter()
|
||||
.map(|v| ObjectInfo::from_file_info(v, item.bucket.as_str(), item.object_path().as_str(), versioned))
|
||||
.collect::<Vec<ObjectInfo>>();
|
||||
|
||||
let mut size_summary = SizeSummary::default();
|
||||
|
||||
let tiers = {
|
||||
let tier_config_mgr = GLOBAL_TierConfigMgr.read().await;
|
||||
tier_config_mgr.list_tiers()
|
||||
};
|
||||
|
||||
for tier in tiers.iter() {
|
||||
size_summary.tier_stats.insert(tier.name.clone(), TierStats::default());
|
||||
}
|
||||
if !size_summary.tier_stats.is_empty() {
|
||||
size_summary
|
||||
.tier_stats
|
||||
.insert(storageclass::STANDARD.to_string(), TierStats::default());
|
||||
size_summary
|
||||
.tier_stats
|
||||
.insert(storageclass::RRS.to_string(), TierStats::default());
|
||||
}
|
||||
|
||||
let lock_config = match get_object_lock_config(&item.bucket).await {
|
||||
Ok((cfg, _)) => Some(Arc::new(cfg)),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
let Some(ecstore) = new_object_layer_fn() else {
|
||||
error!("ECStore not available");
|
||||
return Err(StorageError::other("ECStore not available".to_string()));
|
||||
};
|
||||
|
||||
item.apply_actions(ecstore, object_infos, lock_config, &mut size_summary)
|
||||
.await;
|
||||
|
||||
// TODO: enqueueFreeVersion
|
||||
|
||||
Ok(size_summary)
|
||||
}
|
||||
async fn nsscanner_disk(
|
||||
&self,
|
||||
ctx: CancellationToken,
|
||||
cache: DataUsageCache,
|
||||
updates: Option<mpsc::Sender<DataUsageEntry>>,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<DataUsageCache> {
|
||||
// match self {
|
||||
// Disk::Local(local_disk) => local_disk.nsscanner_disk(ctx, cache, updates, scan_mode).await,
|
||||
// Disk::Remote(remote_disk) => remote_disk.nsscanner_disk(ctx, cache, updates, scan_mode).await,
|
||||
// }
|
||||
|
||||
let _guard = self.start_scan();
|
||||
|
||||
let mut cache = cache;
|
||||
|
||||
let (lifecycle_config, _) = get_lifecycle_config(&cache.info.name)
|
||||
.await
|
||||
.unwrap_or((BucketLifecycleConfiguration::default(), OffsetDateTime::now_utc()));
|
||||
|
||||
if lifecycle_config.has_active_rules("") {
|
||||
cache.info.lifecycle = Some(Arc::new(lifecycle_config));
|
||||
}
|
||||
|
||||
let (replication_config, _) = get_replication_config(&cache.info.name).await.unwrap_or((
|
||||
ReplicationConfiguration {
|
||||
role: "".to_string(),
|
||||
rules: vec![],
|
||||
},
|
||||
OffsetDateTime::now_utc(),
|
||||
));
|
||||
|
||||
if replication_config.has_active_rules("", true)
|
||||
&& let Ok(targets) = BucketTargetSys::get().list_bucket_targets(&cache.info.name).await
|
||||
{
|
||||
cache.info.replication = Some(Arc::new(ReplicationConfig {
|
||||
config: Some(replication_config),
|
||||
remotes: Some(targets),
|
||||
}));
|
||||
}
|
||||
|
||||
// TODO: object lock
|
||||
|
||||
let Some(ecstore) = new_object_layer_fn() else {
|
||||
error!("ECStore not available");
|
||||
return Err(StorageError::other("ECStore not available".to_string()));
|
||||
};
|
||||
|
||||
let disk_location = self.get_disk_location();
|
||||
|
||||
let (Some(pool_idx), Some(set_idx)) = (disk_location.pool_idx, disk_location.set_idx) else {
|
||||
error!("Disk location not available");
|
||||
return Err(StorageError::other("Disk location not available".to_string()));
|
||||
};
|
||||
|
||||
let disks_result = ecstore.get_disks(pool_idx, set_idx).await?;
|
||||
|
||||
let Some(disk_idx) = disk_location.disk_idx else {
|
||||
error!("Disk index not available");
|
||||
return Err(StorageError::other("Disk index not available".to_string()));
|
||||
};
|
||||
|
||||
let local_disk = if let Some(Some(local_disk)) = disks_result.get(disk_idx) {
|
||||
local_disk.clone()
|
||||
} else {
|
||||
error!("Local disk not available");
|
||||
return Err(StorageError::other("Local disk not available".to_string()));
|
||||
};
|
||||
|
||||
let disks = disks_result.into_iter().flatten().collect::<Vec<Arc<Disk>>>();
|
||||
|
||||
// Create we_sleep function (always return false for now, can be enhanced later)
|
||||
let we_sleep: Box<dyn Fn() -> bool + Send + Sync> = Box::new(|| false);
|
||||
|
||||
let result = scan_data_folder(ctx, disks, local_disk, cache, updates, scan_mode, we_sleep).await;
|
||||
|
||||
match result {
|
||||
Ok(mut data_usage_info) => {
|
||||
data_usage_info.info.last_update = Some(SystemTime::now());
|
||||
Ok(data_usage_info)
|
||||
}
|
||||
Err(e) => Err(StorageError::other(format!("Failed to scan data folder: {e}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,6 +61,7 @@ rustfs-s3select-query = { workspace = true }
|
||||
rustfs-targets = { workspace = true }
|
||||
rustfs-utils = { workspace = true, features = ["full"] }
|
||||
rustfs-zip = { workspace = true }
|
||||
rustfs-scanner = { workspace = true }
|
||||
|
||||
# Async Runtime and Networking
|
||||
async-trait = { workspace = true }
|
||||
|
||||
@@ -696,7 +696,6 @@ impl Stream for MetricsStream {
|
||||
type Item = Result<Bytes, StdError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
info!("MetricsStream poll_next");
|
||||
let this = Pin::into_inner(self);
|
||||
this.inner.poll_next_unpin(cx)
|
||||
}
|
||||
@@ -758,7 +757,6 @@ impl Operation for MetricsHandler {
|
||||
let body = Body::from(in_stream);
|
||||
spawn(async move {
|
||||
while n > 0 {
|
||||
info!("loop, n: {n}");
|
||||
let mut m = RealtimeMetrics::default();
|
||||
let m_local = collect_local_metrics(types, &opts).await;
|
||||
m.merge(m_local);
|
||||
@@ -775,7 +773,6 @@ impl Operation for MetricsHandler {
|
||||
// todo write resp
|
||||
match serde_json::to_vec(&m) {
|
||||
Ok(re) => {
|
||||
info!("got metrics, send it to client, m: {m:?}");
|
||||
let _ = tx.send(Ok(Bytes::from(re))).await;
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -36,10 +36,7 @@ use crate::server::{
|
||||
};
|
||||
use clap::Parser;
|
||||
use license::init_license;
|
||||
use rustfs_ahm::{
|
||||
Scanner, create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager,
|
||||
scanner::data_scanner::ScannerConfig, shutdown_ahm_services,
|
||||
};
|
||||
use rustfs_ahm::{create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager, shutdown_ahm_services};
|
||||
use rustfs_common::{GlobalReadiness, SystemStage, set_global_addr};
|
||||
use rustfs_credentials::init_global_action_credentials;
|
||||
use rustfs_ecstore::{
|
||||
@@ -59,6 +56,7 @@ use rustfs_ecstore::{
|
||||
};
|
||||
use rustfs_iam::init_iam_sys;
|
||||
use rustfs_obs::{init_obs, set_global_guard};
|
||||
use rustfs_scanner::init_data_scanner;
|
||||
use rustfs_utils::net::parse_and_resolve_address;
|
||||
use std::io::{Error, Result};
|
||||
use std::sync::Arc;
|
||||
@@ -334,23 +332,29 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
|
||||
// Initialize heal manager and scanner based on environment variables
|
||||
if enable_heal || enable_scanner {
|
||||
if enable_heal {
|
||||
// Initialize heal manager with channel processor
|
||||
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
|
||||
let heal_manager = init_heal_manager(heal_storage, None).await?;
|
||||
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
|
||||
|
||||
if enable_scanner {
|
||||
info!(target: "rustfs::main::run","Starting scanner with heal manager...");
|
||||
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
|
||||
scanner.start().await?;
|
||||
} else {
|
||||
info!(target: "rustfs::main::run","Scanner disabled, but heal manager is initialized and available");
|
||||
}
|
||||
} else if enable_scanner {
|
||||
info!("Starting scanner without heal manager...");
|
||||
let scanner = Scanner::new(Some(ScannerConfig::default()), None);
|
||||
scanner.start().await?;
|
||||
}
|
||||
init_heal_manager(heal_storage, None).await?;
|
||||
|
||||
init_data_scanner(ctx.clone(), store.clone()).await;
|
||||
|
||||
// if enable_heal {
|
||||
// // Initialize heal manager with channel processor
|
||||
// let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
|
||||
// let heal_manager = init_heal_manager(heal_storage, None).await?;
|
||||
|
||||
// if enable_scanner {
|
||||
// info!(target: "rustfs::main::run","Starting scanner with heal manager...");
|
||||
// let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
|
||||
// scanner.start().await?;
|
||||
// } else {
|
||||
// info!(target: "rustfs::main::run","Scanner disabled, but heal manager is initialized and available");
|
||||
// }
|
||||
// } else if enable_scanner {
|
||||
// info!("Starting scanner without heal manager...");
|
||||
// let scanner = Scanner::new(Some(ScannerConfig::default()), None);
|
||||
// scanner.start().await?;
|
||||
// }
|
||||
} else {
|
||||
info!(target: "rustfs::main::run","Both scanner and heal are disabled, skipping AHM service initialization");
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ pub async fn del_opts(
|
||||
let vid = if vid.is_none() {
|
||||
headers
|
||||
.get(RUSTFS_BUCKET_SOURCE_VERSION_ID)
|
||||
.map(|v| v.to_str().unwrap().to_owned())
|
||||
.and_then(|v| v.to_str().ok().map(|s| s.to_owned()))
|
||||
} else {
|
||||
vid
|
||||
};
|
||||
@@ -194,7 +194,7 @@ pub async fn put_opts(
|
||||
let vid = if vid.is_none() {
|
||||
headers
|
||||
.get(RUSTFS_BUCKET_SOURCE_VERSION_ID)
|
||||
.map(|v| v.to_str().unwrap().to_owned())
|
||||
.and_then(|v| v.to_str().ok().map(|s| s.to_owned()))
|
||||
} else {
|
||||
vid
|
||||
};
|
||||
|
||||
@@ -1072,6 +1072,29 @@ impl Node for NodeService {
|
||||
}))
|
||||
}
|
||||
}
|
||||
async fn read_metadata(&self, request: Request<ReadMetadataRequest>) -> Result<Response<ReadMetadataResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
if let Some(disk) = self.find_disk(&request.disk).await {
|
||||
match disk.read_metadata(&request.volume, &request.path).await {
|
||||
Ok(data) => Ok(Response::new(ReadMetadataResponse {
|
||||
success: true,
|
||||
data,
|
||||
error: None,
|
||||
})),
|
||||
Err(err) => Ok(Response::new(ReadMetadataResponse {
|
||||
success: false,
|
||||
data: Bytes::new(),
|
||||
error: Some(err.into()),
|
||||
})),
|
||||
}
|
||||
} else {
|
||||
Ok(Response::new(ReadMetadataResponse {
|
||||
success: false,
|
||||
data: Bytes::new(),
|
||||
error: Some(DiskError::other("can not find disk".to_string()).into()),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_metadata(&self, request: Request<UpdateMetadataRequest>) -> Result<Response<UpdateMetadataResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
|
||||
@@ -79,7 +79,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300 # Log flush interval in milliseconds
|
||||
#tokio runtime
|
||||
export RUSTFS_RUNTIME_WORKER_THREADS=16
|
||||
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
|
||||
# shellcheck disable=SC2125
|
||||
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
|
||||
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60
|
||||
@@ -90,30 +90,30 @@ export OTEL_INSTRUMENTATION_VERSION="0.1.1"
|
||||
export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0"
|
||||
export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production"
|
||||
|
||||
## notify
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_ENABLE="on" # Whether to enable webhook notification
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://127.0.0.1:3020/webhook" # Webhook notification address
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify"
|
||||
#
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook notification
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_PRIMARY="http://127.0.0.1:3020/webhook" # Webhook notification address
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/notify"
|
||||
#
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook notification
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://127.0.0.1:3020/webhook" # Webhook notification address
|
||||
#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify"
|
||||
#
|
||||
#export RUSTFS_AUDIT_WEBHOOK_ENABLE="on" # Whether to enable webhook audit
|
||||
#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT="http://127.0.0.1:3020/webhook" # Webhook audit address
|
||||
#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/audit"
|
||||
#
|
||||
#export RUSTFS_AUDIT_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook audit
|
||||
#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_PRIMARY="http://127.0.0.1:3020/webhook" # Webhook audit address
|
||||
#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/audit"
|
||||
#
|
||||
#export RUSTFS_AUDIT_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook audit
|
||||
#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_MASTER="http://127.0.0.1:3020/webhook" # Webhook audit address
|
||||
#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/audit"
|
||||
# # notify
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_ENABLE="on" # Whether to enable webhook notification
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # Webhook notification address
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify"
|
||||
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook notification
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_PRIMARY="http://[::]:3020/webhook" # Webhook notification address
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/notify"
|
||||
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook notification
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook notification address
|
||||
# export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify"
|
||||
|
||||
# export RUSTFS_AUDIT_WEBHOOK_ENABLE="on" # Whether to enable webhook audit
|
||||
# export RUSTFS_AUDIT_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # Webhook audit address
|
||||
# export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/audit"
|
||||
|
||||
# export RUSTFS_AUDIT_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook audit
|
||||
# export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_PRIMARY="http://[::]:3020/webhook" # Webhook audit address
|
||||
# export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/audit"
|
||||
|
||||
# export RUSTFS_AUDIT_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook audit
|
||||
# export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook audit address
|
||||
# export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/audit"
|
||||
|
||||
# export RUSTFS_POLICY_PLUGIN_URL="http://localhost:8181/v1/data/rustfs/authz/allow" # The URL of the OPA system
|
||||
# export RUSTFS_POLICY_PLUGIN_AUTH_TOKEN="your-opa-token" # The authentication token for the OPA system is optional
|
||||
@@ -170,7 +170,7 @@ export RUSTFS_NS_SCANNER_INTERVAL=60 # Object scanning interval in seconds
|
||||
|
||||
#export RUSTFS_REGION="us-east-1"
|
||||
|
||||
export RUSTFS_ENABLE_SCANNER=false
|
||||
export RUSTFS_ENABLE_SCANNER=true
|
||||
|
||||
export RUSTFS_ENABLE_HEAL=false
|
||||
|
||||
|
||||
Reference in New Issue
Block a user