Compare commits

...

19 Commits

Author SHA1 Message Date
likewu
e15c619ed5 test 2026-01-10 10:19:11 +08:00
likewu
ecf7a2e344 Merge branch 'fix/lifecycle' of https://github.com/rustfs/rustfs into fix/lifecycle
# Conflicts:
#	crates/ecstore/src/bucket/lifecycle/lifecycle.rs
2026-01-07 12:02:48 +08:00
likewu
e393add5ee lifecycle test 2026-01-07 12:00:48 +08:00
likewu
64c0430b84 lifecycle test 2026-01-07 11:58:35 +08:00
likewu
081616b81b test 2026-01-06 14:26:09 +08:00
likewu
d9125937b0 Merge branch 'weisd/scan' of https://github.com/rustfs/rustfs into fix/lifecycle 2026-01-04 11:38:30 +08:00
weisd
cb0860753f add log 2025-12-25 17:18:25 +08:00
weisd
87a73d9c36 merge main 2025-12-25 09:59:25 +08:00
weisd
d30675f376 add env RUSTFS_DATA_USAGE_UPDATE_DIR_CYCLES 2025-12-24 14:09:41 +08:00
weisd
0aad1ed6aa add ns lock 2025-12-24 11:41:04 +08:00
weisd
e437d42d31 merge main 2025-12-23 09:56:59 +08:00
weisd
008be7d061 merge main 2025-12-23 09:50:51 +08:00
weisd
8f227b2691 fix logger 2025-12-17 11:32:54 +08:00
weisd
c0cdad2192 add DEFAULT_HEAL_OBJECT_SELECT_PROB 2025-12-17 11:03:54 +08:00
weisd
6466cdbc54 refactor datascanner 2025-12-17 10:40:54 +08:00
likewu
2534087551 add python MinIO SDK lifecycle config test 2025-12-10 23:28:05 +08:00
likewu
5f03a1cbe0 add ansible bucket lifecycle config test 2025-12-10 23:27:09 +08:00
likewu
f9af9fe5da Merge branch 'main' of https://github.com/rustfs/rustfs into fix/lifecycle 2025-12-10 18:58:27 +08:00
likewu
59b8aa14d7 fix 2025-12-05 12:32:48 +08:00
54 changed files with 7992 additions and 196 deletions

19
.vscode/launch.json vendored
View File

@@ -121,6 +121,25 @@
"rust"
],
},
{
"type": "lldb",
"request": "launch",
"name": "Debug test_lifecycle_transition_basic",
"cargo": {
"args": [
"test",
"-p",
"rustfs-scanner",
"--test",
"lifecycle_integration_test",
"serial_tests::test_lifecycle_transition_basic",
"-j",
"1"
]
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug executable target/debug/test",
"type": "lldb",

36
Cargo.lock generated
View File

@@ -7074,6 +7074,7 @@ dependencies = [
"rustfs-rio",
"rustfs-s3select-api",
"rustfs-s3select-query",
"rustfs-scanner",
"rustfs-targets",
"rustfs-utils",
"rustfs-zip",
@@ -7605,6 +7606,41 @@ dependencies = [
"tracing",
]
[[package]]
name = "rustfs-scanner"
version = "0.0.5"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"heed",
"http 1.4.0",
"path-clean",
"rand 0.10.0-rc.5",
"rmp-serde",
"rustfs-ahm",
"rustfs-common",
"rustfs-config",
"rustfs-ecstore",
"rustfs-filemeta",
"rustfs-madmin",
"rustfs-utils",
"s3s",
"serde",
"serde_json",
"serial_test",
"tempfile",
"thiserror 2.0.17",
"time",
"tokio",
"tokio-test",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "rustfs-signer"
version = "0.0.5"

View File

@@ -34,6 +34,7 @@ members = [
"crates/targets", # Target-specific configurations and utilities
"crates/s3select-api", # S3 Select API interface
"crates/s3select-query", # S3 Select query engine
"crates/scanner", # Scanner for data integrity checks and health monitoring
"crates/signer", # client signer
"crates/checksums", # client checksums
"crates/utils", # Utility functions and helpers
@@ -86,6 +87,7 @@ rustfs-protos = { path = "crates/protos", version = "0.0.5" }
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
rustfs-scanner = { path = "crates/scanner", version = "0.0.5" }
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
rustfs-targets = { path = "crates/targets", version = "0.0.5" }
rustfs-utils = { path = "crates/utils", version = "0.0.5" }

View File

@@ -183,7 +183,7 @@ impl HealChannelProcessor {
HealType::Object {
bucket: request.bucket.clone(),
object: prefix.clone(),
version_id: None,
version_id: request.object_version_id.clone(),
}
} else {
HealType::Bucket {
@@ -366,6 +366,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
object_version_id: None,
disk: None,
priority: HealChannelPriority::Normal,
scan_mode: None,
@@ -394,6 +395,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: Some("test-object".to_string()),
object_version_id: None,
disk: None,
priority: HealChannelPriority::High,
scan_mode: Some(HealScanMode::Deep),
@@ -425,6 +427,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
object_version_id: None,
disk: Some("pool_0_set_1".to_string()),
priority: HealChannelPriority::Critical,
scan_mode: None,
@@ -453,6 +456,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
object_version_id: None,
disk: Some("invalid-disk-id".to_string()),
priority: HealChannelPriority::Normal,
scan_mode: None,
@@ -488,6 +492,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
object_version_id: None,
disk: None,
priority: channel_priority,
scan_mode: None,
@@ -516,6 +521,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: None,
object_version_id: None,
disk: None,
priority: HealChannelPriority::Normal,
scan_mode: None,
@@ -545,6 +551,7 @@ mod tests {
id: "test-id".to_string(),
bucket: "test-bucket".to_string(),
object_prefix: Some("".to_string()), // Empty prefix should be treated as bucket heal
object_version_id: None,
disk: None,
priority: HealChannelPriority::Normal,
scan_mode: None,

View File

@@ -495,6 +495,26 @@ mod serial_tests {
object_name,
} = &elm.1;
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
//eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await;
eval_inner(
&lifecycle::ObjectOpts {
name: oi.name.clone(),
user_tags: oi.user_tags.clone(),
version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(),
mod_time: oi.mod_time,
size: oi.size as usize,
is_latest: oi.is_latest,
num_versions: oi.num_versions,
delete_marker: oi.delete_marker,
successor_mod_time: oi.successor_mod_time,
restore_ongoing: oi.restore_ongoing,
restore_expires: oi.restore_expires,
transition_status: oi.transitioned_object.status.clone(),
..Default::default()
},
OffsetDateTime::now_utc(),
)
.await;
}
println!("row:{row:?}");
}
@@ -506,3 +526,261 @@ mod serial_tests {
println!("Lifecycle cache test completed");
}
}
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
let mut events = Vec::<Event>::new();
info!(
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
);
if obj.mod_time.expect("err").unix_timestamp() == 0 {
info!("eval_inner: mod_time is 0, returning default event");
return Event::default();
}
if let Some(restore_expires) = obj.restore_expires {
if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() {
let mut action = IlmAction::DeleteRestoredAction;
if !obj.is_latest {
action = IlmAction::DeleteRestoredVersionAction;
}
events.push(Event {
action,
due: Some(now),
rule_id: "".into(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
if let Some(ref lc_rules) = self.filter_rules(obj).await {
for rule in lc_rules.iter() {
if obj.expired_object_deletemarker() {
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
}
}
}
if obj.is_latest {
if let Some(ref expiration) = rule.expiration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days {
if noncurrent_days != 0 {
if let Some(successor_mod_time) = obj.successor_mod_time {
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
.clone()
.unwrap()
.as_str()
.to_string(),
..Default::default()
});
}
}
}
}
}
}
info!(
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
obj.is_latest,
obj.delete_marker,
obj.version_id,
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
);
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
info!("eval_inner: entering expiration check");
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
info!("eval_inner: expiration by date - date0={:?}", date0);
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
} else if let Some(days) = expiration.days {
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
info!(
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
days,
obj.mod_time.expect("err!"),
expected_expiry,
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()
);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
};
/*if rule.expiration.expect("err!").delete_all.val {
event.action = IlmAction::DeleteAllVersionsAction
}*/
events.push(event);
}
} else {
info!("eval_inner: expiration.days is None");
}
} else {
info!("eval_inner: rule.expiration is None");
}
if obj.transition_status != TRANSITION_COMPLETE {
if let Some(ref transitions) = rule.transitions {
let due = transitions[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
});
}
}
}
}
}
}
}
if events.len() > 0 {
events.sort_by(|a, b| {
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
{
match a.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Less;
}
_ => (),
}
match b.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Greater;
}
_ => (),
}
return Ordering::Less;
}
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
return Ordering::Less;
}
return Ordering::Greater;
});
return events[0].clone();
}
Event::default()
}

View File

@@ -263,6 +263,16 @@ async fn create_test_tier(server: u32) {
region: "".to_string(),
..Default::default()
})
} else if server == 2 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://m1ddns.pvtool.com:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
@@ -602,7 +612,7 @@ mod serial_tests {
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
create_test_tier(1).await;
create_test_tier(2).await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
@@ -610,8 +620,15 @@ mod serial_tests {
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
//create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
create_test_bucket(&ecstore, bucket_name.as_str()).await;
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(
&ecstore,
bucket_name.as_str(),
object_name,
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
)
.await;
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially

View File

@@ -212,6 +212,8 @@ pub struct HealChannelRequest {
pub bucket: String,
/// Object prefix (optional)
pub object_prefix: Option<String>,
/// Object version ID (optional)
pub object_version_id: Option<String>,
/// Force start heal
pub force_start: bool,
/// Priority
@@ -346,6 +348,7 @@ pub fn create_heal_request(
id: Uuid::new_v4().to_string(),
bucket,
object_prefix,
object_version_id: None,
force_start,
priority: priority.unwrap_or_default(),
pool_index: None,
@@ -374,6 +377,7 @@ pub fn create_heal_request_with_options(
id: Uuid::new_v4().to_string(),
bucket,
object_prefix,
object_version_id: None,
force_start,
priority: priority.unwrap_or_default(),
pool_index,
@@ -503,6 +507,7 @@ pub async fn send_heal_disk(set_disk_id: String, priority: Option<HealChannelPri
bucket: "".to_string(),
object_prefix: None,
disk: Some(set_disk_id),
object_version_id: None,
force_start: false,
priority: priority.unwrap_or_default(),
pool_index: None,

View File

@@ -21,5 +21,6 @@ pub(crate) mod heal;
pub(crate) mod object;
pub(crate) mod profiler;
pub(crate) mod runtime;
pub(crate) mod scanner;
pub(crate) mod targets;
pub(crate) mod tls;

View File

@@ -0,0 +1,28 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Environment variable name that specifies the data scanner start delay in seconds.
/// - Purpose: Define the delay between data scanner operations.
/// - Unit: seconds (u64).
/// - Valid values: any positive integer.
/// - Semantics: This delay controls how frequently the data scanner checks for and processes data; shorter delays lead to more responsive scanning but may increase system load.
/// - Example: `export RUSTFS_DATA_SCANNER_START_DELAY_SECS=10`
/// - Note: Choose an appropriate delay that balances scanning responsiveness with overall system performance.
pub const ENV_DATA_SCANNER_START_DELAY_SECS: &str = "RUSTFS_DATA_SCANNER_START_DELAY_SECS";
/// Default data scanner start delay in seconds if not specified in the environment variable.
/// - Value: 10 seconds.
/// - Rationale: This default interval provides a reasonable balance between scanning responsiveness and system load for most deployments.
/// - Adjustments: Users may modify this value via the `RUSTFS_DATA_SCANNER_START_DELAY_SECS` environment variable based on their specific scanning requirements and system performance.
pub const DEFAULT_DATA_SCANNER_START_DELAY_SECS: u64 = 60;

View File

@@ -33,6 +33,8 @@ pub use constants::profiler::*;
#[cfg(feature = "constants")]
pub use constants::runtime::*;
#[cfg(feature = "constants")]
pub use constants::scanner::*;
#[cfg(feature = "constants")]
pub use constants::targets::*;
#[cfg(feature = "constants")]
pub use constants::tls::*;

View File

@@ -953,7 +953,7 @@ impl LifecycleOps for ObjectInfo {
lifecycle::ObjectOpts {
name: self.name.clone(),
user_tags: self.user_tags.clone(),
version_id: self.version_id.map(|v| v.to_string()).unwrap_or_default(),
version_id: self.version_id.clone(),
mod_time: self.mod_time,
size: self.size as usize,
is_latest: self.is_latest,
@@ -1067,7 +1067,7 @@ pub async fn eval_action_from_lifecycle(
event
}
async fn apply_transition_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
pub async fn apply_transition_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
if oi.delete_marker || oi.is_dir {
return false;
}
@@ -1161,7 +1161,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
true
}
async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
pub async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
let mut expiry_state = GLOBAL_ExpiryState.write().await;
expiry_state.enqueue_by_days(oi, event, src).await;
true

View File

@@ -0,0 +1,192 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use s3s::dto::{
BucketLifecycleConfiguration, ObjectLockConfiguration, ObjectLockEnabled, ObjectLockLegalHoldStatus, ObjectLockRetentionMode,
};
use time::OffsetDateTime;
use tracing::info;
use crate::bucket::lifecycle::lifecycle::{Event, Lifecycle, ObjectOpts};
use crate::bucket::object_lock::ObjectLockStatusExt;
use crate::bucket::object_lock::objectlock::{get_object_legalhold_meta, get_object_retention_meta, utc_now_ntp};
use crate::bucket::replication::ReplicationConfig;
use rustfs_common::metrics::IlmAction;
/// Evaluator - evaluates lifecycle policy on objects for the given lifecycle
/// configuration, lock retention configuration and replication configuration.
pub struct Evaluator {
policy: Arc<BucketLifecycleConfiguration>,
lock_retention: Option<Arc<ObjectLockConfiguration>>,
repl_cfg: Option<Arc<ReplicationConfig>>,
}
impl Evaluator {
/// NewEvaluator - creates a new evaluator with the given lifecycle
pub fn new(policy: Arc<BucketLifecycleConfiguration>) -> Self {
Self {
policy,
lock_retention: None,
repl_cfg: None,
}
}
/// WithLockRetention - sets the lock retention configuration for the evaluator
pub fn with_lock_retention(mut self, lr: Option<Arc<ObjectLockConfiguration>>) -> Self {
self.lock_retention = lr;
self
}
/// WithReplicationConfig - sets the replication configuration for the evaluator
pub fn with_replication_config(mut self, rcfg: Option<Arc<ReplicationConfig>>) -> Self {
self.repl_cfg = rcfg;
self
}
/// IsPendingReplication checks if the object is pending replication.
pub fn is_pending_replication(&self, obj: &ObjectOpts) -> bool {
use crate::bucket::replication::ReplicationConfigurationExt;
if self.repl_cfg.is_none() {
return false;
}
if let Some(rcfg) = &self.repl_cfg {
if rcfg
.config
.as_ref()
.is_some_and(|config| config.has_active_rules(obj.name.as_str(), true))
&& !obj.version_purge_status.is_empty()
{
return true;
}
}
false
}
/// IsObjectLocked checks if it is appropriate to remove an
/// object according to locking configuration when this is lifecycle/ bucket quota asking.
/// (copied over from enforceRetentionForDeletion)
pub fn is_object_locked(&self, obj: &ObjectOpts) -> bool {
if self.lock_retention.as_ref().is_none_or(|v| {
v.object_lock_enabled
.as_ref()
.is_none_or(|v| v.as_str() != ObjectLockEnabled::ENABLED)
}) {
return false;
}
if obj.delete_marker {
return false;
}
let lhold = get_object_legalhold_meta(obj.user_defined.clone());
if lhold
.status
.is_some_and(|v| v.valid() && v.as_str() == ObjectLockLegalHoldStatus::ON)
{
return true;
}
let ret = get_object_retention_meta(obj.user_defined.clone());
if ret
.mode
.is_some_and(|v| matches!(v.as_str(), ObjectLockRetentionMode::COMPLIANCE | ObjectLockRetentionMode::GOVERNANCE))
{
let t = utc_now_ntp();
if let Some(retain_until) = ret.retain_until_date {
if OffsetDateTime::from(retain_until).gt(&t) {
return true;
}
}
}
false
}
/// eval will return a lifecycle event for each object in objs for a given time.
async fn eval_inner(&self, objs: &[ObjectOpts], now: OffsetDateTime) -> Vec<Event> {
let mut events = vec![Event::default(); objs.len()];
let mut newer_noncurrent_versions = 0;
'top_loop: {
for (i, obj) in objs.iter().enumerate() {
let mut event = self.policy.eval_inner(obj, now, newer_noncurrent_versions).await;
match event.action {
IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => {
// Skip if bucket has object locking enabled; To prevent the
// possibility of violating an object retention on one of the
// noncurrent versions of this object.
if self.lock_retention.as_ref().is_some_and(|v| {
v.object_lock_enabled
.as_ref()
.is_some_and(|v| v.as_str() == ObjectLockEnabled::ENABLED)
}) {
event = Event::default();
} else {
// No need to evaluate remaining versions' lifecycle
// events after DeleteAllVersionsAction*
events[i] = event;
info!("eval_inner: skipping remaining versions' lifecycle events after DeleteAllVersionsAction*");
break 'top_loop;
}
}
IlmAction::DeleteVersionAction | IlmAction::DeleteRestoredVersionAction => {
// Defensive code, should never happen
if obj.version_id.is_none_or(|v| v.is_nil()) {
event.action = IlmAction::NoneAction;
}
if self.is_object_locked(obj) {
event = Event::default();
}
if self.is_pending_replication(obj) {
event = Event::default();
}
}
_ => {}
}
if !obj.is_latest {
match event.action {
IlmAction::DeleteVersionAction => {
// this noncurrent version will be expired, nothing to add
}
_ => {
// this noncurrent version will be spared
newer_noncurrent_versions += 1;
}
}
}
events[i] = event;
}
}
events
}
/// Eval will return a lifecycle event for each object in objs
pub async fn eval(&self, objs: &[ObjectOpts]) -> Result<Vec<Event>, std::io::Error> {
if objs.is_empty() {
return Ok(vec![]);
}
if objs.len() != objs[0].num_versions {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("number of versions mismatch, expected {}, got {}", objs[0].num_versions, objs.len()),
));
}
Ok(self.eval_inner(objs, OffsetDateTime::now_utc()).await)
}
}

View File

@@ -18,19 +18,23 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition,
ObjectLockConfiguration, ObjectLockEnabled, Prefix, RestoreRequest, Transition,
};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use time::macros::{datetime, offset};
use time::{self, Duration, OffsetDateTime};
use tracing::info;
use uuid::Uuid;
use crate::bucket::lifecycle::rule::TransitionOps;
use crate::store_api::ObjectInfo;
pub const TRANSITION_COMPLETE: &str = "complete";
pub const TRANSITION_PENDING: &str = "pending";
@@ -131,11 +135,11 @@ impl RuleValidate for LifecycleRule {
pub trait Lifecycle {
async fn has_transition(&self) -> bool;
fn has_expiry(&self) -> bool;
async fn has_active_rules(&self, prefix: &str) -> bool;
fn has_active_rules(&self, prefix: &str) -> bool;
async fn validate(&self, lr: &ObjectLockConfiguration) -> Result<(), std::io::Error>;
async fn filter_rules(&self, obj: &ObjectOpts) -> Option<Vec<LifecycleRule>>;
async fn eval(&self, obj: &ObjectOpts) -> Event;
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event;
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime, newer_noncurrent_versions: usize) -> Event;
//fn set_prediction_headers(&self, w: http.ResponseWriter, obj: ObjectOpts);
async fn noncurrent_versions_expiration_limit(self: Arc<Self>, obj: &ObjectOpts) -> Event;
}
@@ -160,7 +164,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
false
}
async fn has_active_rules(&self, prefix: &str) -> bool {
fn has_active_rules(&self, prefix: &str) -> bool {
if self.rules.len() == 0 {
return false;
}
@@ -169,44 +173,51 @@ impl Lifecycle for BucketLifecycleConfiguration {
continue;
}
let rule_prefix = rule.prefix.as_ref().expect("err!");
let rule_prefix = &rule.prefix.clone().unwrap_or_default();
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
{
continue;
}
let rule_noncurrent_version_expiration = rule.noncurrent_version_expiration.as_ref().expect("err!");
if rule_noncurrent_version_expiration.noncurrent_days.expect("err!") > 0 {
if let Some(rule_noncurrent_version_expiration) = &rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = rule_noncurrent_version_expiration.noncurrent_days {
if noncurrent_days > 0 {
return true;
}
}
if let Some(newer_noncurrent_versions) = rule_noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
return true;
}
}
}
if rule.noncurrent_version_transitions.is_some() {
return true;
}
if rule_noncurrent_version_expiration.newer_noncurrent_versions.expect("err!") > 0 {
return true;
if let Some(rule_expiration) = &rule.expiration {
if let Some(date1) = rule_expiration.date.clone() {
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
return true;
}
}
if rule_expiration.date.is_some() {
return true;
}
if let Some(expired_object_delete_marker) = rule_expiration.expired_object_delete_marker
&& expired_object_delete_marker
{
return true;
}
}
if !rule.noncurrent_version_transitions.is_none() {
return true;
if let Some(rule_transitions) = &rule.transitions {
let rule_transitions_0 = rule_transitions[0].clone();
if let Some(date1) = rule_transitions_0.date {
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
return true;
}
}
}
let rule_expiration = rule.expiration.as_ref().expect("err!");
if !rule_expiration.date.is_none()
&& OffsetDateTime::from(rule_expiration.date.clone().expect("err!")).unix_timestamp()
< OffsetDateTime::now_utc().unix_timestamp()
{
return true;
}
if !rule_expiration.date.is_none() {
return true;
}
if rule_expiration.expired_object_delete_marker.expect("err!") {
return true;
}
let rule_transitions: &[Transition] = &rule.transitions.as_ref().expect("err!");
let rule_transitions_0 = rule_transitions[0].clone();
if !rule_transitions_0.date.is_none()
&& OffsetDateTime::from(rule_transitions_0.date.expect("err!")).unix_timestamp()
< OffsetDateTime::now_utc().unix_timestamp()
{
return true;
}
if !rule.transitions.is_none() {
if rule.transitions.is_some() {
return true;
}
}
@@ -274,10 +285,10 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
async fn eval(&self, obj: &ObjectOpts) -> Event {
self.eval_inner(obj, OffsetDateTime::now_utc()).await
self.eval_inner(obj, OffsetDateTime::now_utc(), 0).await
}
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime, newer_noncurrent_versions: usize) -> Event {
let mut events = Vec::<Event>::new();
info!(
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
@@ -436,10 +447,10 @@ impl Lifecycle for BucketLifecycleConfiguration {
obj.is_latest,
obj.delete_marker,
obj.version_id,
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
(obj.is_latest || obj.version_id.is_none_or(|v| v.is_nil())) && !obj.delete_marker
);
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
if (obj.is_latest || obj.version_id.is_none_or(|v| v.is_nil())) && !obj.delete_marker {
info!("eval_inner: entering expiration check");
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
@@ -659,7 +670,7 @@ pub struct ObjectOpts {
pub user_tags: String,
pub mod_time: Option<OffsetDateTime>,
pub size: usize,
pub version_id: String,
pub version_id: Option<Uuid>,
pub is_latest: bool,
pub delete_marker: bool,
pub num_versions: usize,
@@ -669,12 +680,37 @@ pub struct ObjectOpts {
pub restore_expires: Option<OffsetDateTime>,
pub versioned: bool,
pub version_suspended: bool,
pub user_defined: HashMap<String, String>,
pub version_purge_status: VersionPurgeStatusType,
pub replication_status: ReplicationStatusType,
}
impl ObjectOpts {
pub fn expired_object_deletemarker(&self) -> bool {
self.delete_marker && self.num_versions == 1
}
pub fn from_object_info(oi: &ObjectInfo) -> Self {
Self {
name: oi.name.clone(),
user_tags: oi.user_tags.clone(),
mod_time: oi.mod_time,
size: oi.size as usize,
version_id: oi.version_id.clone(),
is_latest: oi.is_latest,
delete_marker: oi.delete_marker,
num_versions: oi.num_versions,
successor_mod_time: oi.successor_mod_time,
transition_status: oi.transitioned_object.status.clone(),
restore_ongoing: oi.restore_ongoing,
restore_expires: oi.restore_expires,
versioned: false,
version_suspended: false,
user_defined: oi.user_defined.clone(),
version_purge_status: oi.version_purge_status.clone(),
replication_status: oi.replication_status.clone(),
}
}
}
#[derive(Debug, Clone)]

View File

@@ -14,6 +14,7 @@
pub mod bucket_lifecycle_audit;
pub mod bucket_lifecycle_ops;
pub mod evaluator;
pub mod lifecycle;
pub mod rule;
pub mod tier_last_day_stats;

View File

@@ -21,6 +21,7 @@
use sha2::{Digest, Sha256};
use std::any::Any;
use std::io::Write;
use uuid::Uuid;
use xxhash_rust::xxh64;
use super::bucket_lifecycle_ops::{ExpiryOp, GLOBAL_ExpiryState, TransitionedObject};
@@ -34,7 +35,7 @@ static XXHASH_SEED: u64 = 0;
struct ObjSweeper {
object: String,
bucket: String,
version_id: String,
version_id: Option<Uuid>,
versioned: bool,
suspended: bool,
transition_status: String,
@@ -54,8 +55,8 @@ impl ObjSweeper {
})
}
pub fn with_version(&mut self, vid: String) -> &Self {
self.version_id = vid;
pub fn with_version(&mut self, vid: Option<Uuid>) -> &Self {
self.version_id = vid.clone();
self
}
@@ -72,8 +73,8 @@ impl ObjSweeper {
version_suspended: self.suspended,
..Default::default()
};
if self.suspended && self.version_id == "" {
opts.version_id = String::from("");
if self.suspended && self.version_id.is_none_or(|v| v.is_nil()) {
opts.version_id = None;
}
opts
}
@@ -94,7 +95,7 @@ impl ObjSweeper {
if !self.versioned || self.suspended {
// 1, 2.a, 2.b
del_tier = true;
} else if self.versioned && self.version_id != "" {
} else if self.versioned && self.version_id.is_some_and(|v| !v.is_nil()) {
// 3.a
del_tier = true;
}

View File

@@ -175,6 +175,13 @@ pub async fn created_at(bucket: &str) -> Result<OffsetDateTime> {
bucket_meta_sys.created_at(bucket).await
}
pub async fn list_bucket_targets(bucket: &str) -> Result<BucketTargets> {
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_bucket_targets_config(bucket).await
}
#[derive(Debug)]
pub struct BucketMetadataSys {
metadata_map: RwLock<HashMap<String, Arc<BucketMetadata>>>,

View File

@@ -15,7 +15,7 @@
pub mod objectlock;
pub mod objectlock_sys;
use s3s::dto::{ObjectLockConfiguration, ObjectLockEnabled};
use s3s::dto::{ObjectLockConfiguration, ObjectLockEnabled, ObjectLockLegalHoldStatus};
pub trait ObjectLockApi {
fn enabled(&self) -> bool;
@@ -28,3 +28,13 @@ impl ObjectLockApi for ObjectLockConfiguration {
.is_some_and(|v| v.as_str() == ObjectLockEnabled::ENABLED)
}
}
pub trait ObjectLockStatusExt {
fn valid(&self) -> bool;
}
impl ObjectLockStatusExt for ObjectLockLegalHoldStatus {
fn valid(&self) -> bool {
matches!(self.as_str(), ObjectLockLegalHoldStatus::ON | ObjectLockLegalHoldStatus::OFF)
}
}

View File

@@ -9,8 +9,11 @@ use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use crate::bucket::bucket_target_sys::BucketTargetSys;
use crate::bucket::metadata_sys;
use crate::bucket::replication::replication_resyncer::{
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, ReplicationResyncer,
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, ReplicationConfig, ReplicationResyncer,
get_heal_replicate_object_info,
};
use crate::bucket::replication::replication_state::ReplicationStats;
use crate::config::com::read_config;
@@ -26,8 +29,10 @@ use rustfs_filemeta::ReplicationStatusType;
use rustfs_filemeta::ReplicationType;
use rustfs_filemeta::ReplicationWorkerOperation;
use rustfs_filemeta::ResyncDecision;
use rustfs_filemeta::VersionPurgeStatusType;
use rustfs_filemeta::replication_statuses_map;
use rustfs_filemeta::version_purge_statuses_map;
use rustfs_filemeta::{REPLICATE_EXISTING, REPLICATE_HEAL, REPLICATE_HEAL_DELETE};
use rustfs_utils::http::RESERVED_METADATA_PREFIX_LOWER;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
@@ -1033,3 +1038,152 @@ pub async fn schedule_replication_delete(dv: DeletedObjectReplicationInfo) {
}
}
}
/// QueueReplicationHeal is a wrapper for queue_replication_heal_internal
pub async fn queue_replication_heal(bucket: &str, oi: ObjectInfo, retry_count: u32) {
// ignore modtime zero objects
if oi.mod_time.is_none() || oi.mod_time == Some(OffsetDateTime::UNIX_EPOCH) {
return;
}
let rcfg = match metadata_sys::get_replication_config(bucket).await {
Ok((config, _)) => config,
Err(err) => {
warn!("Failed to get replication config for bucket {}: {}", bucket, err);
return;
}
};
let tgts = match BucketTargetSys::get().list_bucket_targets(bucket).await {
Ok(targets) => Some(targets),
Err(err) => {
warn!("Failed to list bucket targets for bucket {}: {}", bucket, err);
None
}
};
let rcfg_wrapper = ReplicationConfig::new(Some(rcfg), tgts);
queue_replication_heal_internal(bucket, oi, rcfg_wrapper, retry_count).await;
}
/// queue_replication_heal_internal enqueues objects that failed replication OR eligible for resyncing through
/// an ongoing resync operation or via existing objects replication configuration setting.
pub async fn queue_replication_heal_internal(
_bucket: &str,
oi: ObjectInfo,
rcfg: ReplicationConfig,
retry_count: u32,
) -> ReplicateObjectInfo {
let mut roi = ReplicateObjectInfo::default();
// ignore modtime zero objects
if oi.mod_time.is_none() || oi.mod_time == Some(OffsetDateTime::UNIX_EPOCH) {
return roi;
}
if rcfg.config.is_none() || rcfg.remotes.is_none() {
return roi;
}
roi = get_heal_replicate_object_info(&oi, &rcfg).await;
roi.retry_count = retry_count;
if !roi.dsc.replicate_any() {
return roi;
}
// early return if replication already done, otherwise we need to determine if this
// version is an existing object that needs healing.
if roi.replication_status == ReplicationStatusType::Completed
&& roi.version_purge_status.is_empty()
&& !roi.existing_obj_resync.must_resync()
{
return roi;
}
if roi.delete_marker || !roi.version_purge_status.is_empty() {
let (version_id, dm_version_id) = if roi.version_purge_status.is_empty() {
(None, roi.version_id)
} else {
(roi.version_id, None)
};
let dv = DeletedObjectReplicationInfo {
delete_object: crate::store_api::DeletedObject {
object_name: roi.name.clone(),
delete_marker_version_id: dm_version_id,
version_id,
replication_state: roi.replication_state.clone(),
delete_marker_mtime: roi.mod_time,
delete_marker: roi.delete_marker,
..Default::default()
},
bucket: roi.bucket.clone(),
op_type: ReplicationType::Heal,
event_type: REPLICATE_HEAL_DELETE.to_string(),
..Default::default()
};
// heal delete marker replication failure or versioned delete replication failure
if roi.replication_status == ReplicationStatusType::Pending
|| roi.replication_status == ReplicationStatusType::Failed
|| roi.version_purge_status == VersionPurgeStatusType::Failed
|| roi.version_purge_status == VersionPurgeStatusType::Pending
{
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
pool.queue_replica_delete_task(dv).await;
}
return roi;
}
// if replication status is Complete on DeleteMarker and existing object resync required
let existing_obj_resync = roi.existing_obj_resync.clone();
if existing_obj_resync.must_resync()
&& (roi.replication_status == ReplicationStatusType::Completed || roi.replication_status.is_empty())
{
queue_replicate_deletes_wrapper(dv, existing_obj_resync).await;
return roi;
}
return roi;
}
if roi.existing_obj_resync.must_resync() {
roi.op_type = ReplicationType::ExistingObject;
}
match roi.replication_status {
ReplicationStatusType::Pending | ReplicationStatusType::Failed => {
roi.event_type = REPLICATE_HEAL.to_string();
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
pool.queue_replica_task(roi.clone()).await;
}
return roi;
}
_ => {}
}
if roi.existing_obj_resync.must_resync() {
roi.event_type = REPLICATE_EXISTING.to_string();
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
pool.queue_replica_task(roi.clone()).await;
}
}
roi
}
/// Wrapper function for queueing replicate deletes with resync decision
async fn queue_replicate_deletes_wrapper(doi: DeletedObjectReplicationInfo, existing_obj_resync: ResyncDecision) {
for (k, v) in existing_obj_resync.targets.iter() {
if v.replicate {
let mut dv = doi.clone();
dv.reset_id = v.reset_id.clone();
dv.target_arn = k.clone();
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
pool.queue_replica_delete_task(dv).await;
}
}
}
}

View File

@@ -744,7 +744,7 @@ impl ReplicationWorkerOperation for DeletedObjectReplicationInfo {
}
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplicationConfig {
pub config: Option<ReplicationConfiguration>,
pub remotes: Option<BucketTargets>,

View File

@@ -15,7 +15,8 @@
use crate::disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskError, DiskInfo, DiskInfoOptions, DiskLocation, Endpoint, Error,
FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, Result, UpdateMetadataOpts, VolumeInfo,
WalkDirOptions, local::LocalDisk,
WalkDirOptions,
local::{LocalDisk, ScanGuard},
};
use bytes::Bytes;
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
@@ -259,6 +260,7 @@ impl LocalDiskWrapper {
debug!("health check: performing health check");
if Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, true, CHECK_TIMEOUT_DURATION).await.is_err() && health.swap_ok_to_faulty() {
// Health check failed, disk is considered faulty
warn!("health check: failed, disk is considered faulty");
health.increment_waiting(); // Balance the increment from failed operation
@@ -428,7 +430,7 @@ impl LocalDiskWrapper {
{
// Check if disk is faulty
if self.health.is_faulty() {
warn!("disk {} health is faulty, returning error", self.to_string());
warn!("local disk {} health is faulty, returning error", self.to_string());
return Err(DiskError::FaultyDisk);
}
@@ -475,6 +477,15 @@ impl LocalDiskWrapper {
#[async_trait::async_trait]
impl DiskAPI for LocalDiskWrapper {
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
self.track_disk_health(|| async { self.disk.read_metadata(volume, path).await }, Duration::ZERO)
.await
}
fn start_scan(&self) -> ScanGuard {
self.disk.start_scan()
}
fn to_string(&self) -> String {
self.disk.to_string()
}

View File

@@ -89,7 +89,7 @@ pub struct LocalDisk {
pub format_info: RwLock<FormatInfo>,
pub endpoint: Endpoint,
pub disk_info_cache: Arc<Cache<DiskInfo>>,
pub scanning: AtomicU32,
pub scanning: Arc<AtomicU32>,
pub rotational: bool,
pub fstype: String,
pub major: u64,
@@ -215,7 +215,7 @@ impl LocalDisk {
format_path,
format_info: RwLock::new(format_info),
disk_info_cache: Arc::new(cache),
scanning: AtomicU32::new(0),
scanning: Arc::new(AtomicU32::new(0)),
rotational: Default::default(),
fstype: Default::default(),
minor: Default::default(),
@@ -673,6 +673,8 @@ impl LocalDisk {
return Err(DiskError::FileNotFound);
}
debug!("read_raw: file_path: {:?}", file_path.as_ref());
let meta_path = file_path.as_ref().join(Path::new(STORAGE_FORMAT_FILE));
let res = {
@@ -682,6 +684,7 @@ impl LocalDisk {
match self.read_metadata_with_dmtime(meta_path).await {
Ok(res) => Ok(res),
Err(err) => {
warn!("read_raw: error: {:?}", err);
if err == Error::FileNotFound
&& !skip_access_checks(volume_dir.as_ref().to_string_lossy().to_string().as_str())
{
@@ -707,20 +710,6 @@ impl LocalDisk {
Ok((buf, mtime))
}
async fn read_metadata(&self, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
// Try to use cached file content reading for better performance, with safe fallback
let path = file_path.as_ref().to_path_buf();
// First, try the cache
if let Ok(bytes) = get_global_file_cache().get_file_content(path.clone()).await {
return Ok(bytes.to_vec());
}
// Fallback to direct read if cache fails
let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?;
Ok(data)
}
async fn read_metadata_with_dmtime(&self, file_path: impl AsRef<Path>) -> Result<(Vec<u8>, Option<OffsetDateTime>)> {
check_path_length(file_path.as_ref().to_string_lossy().as_ref())?;
@@ -882,7 +871,7 @@ impl LocalDisk {
}
// write_all_private with check_path_length
#[tracing::instrument(level = "debug", skip_all)]
#[tracing::instrument(level = "debug", skip(self, buf, sync, skip_parent))]
pub async fn write_all_private(&self, volume: &str, path: &str, buf: Bytes, sync: bool, skip_parent: &Path) -> Result<()> {
let volume_dir = self.get_bucket_path(volume)?;
let file_path = volume_dir.join(Path::new(&path));
@@ -1074,7 +1063,7 @@ impl LocalDisk {
if entry.ends_with(STORAGE_FORMAT_FILE) {
let metadata = self
.read_metadata(self.get_object_path(bucket, format!("{}/{}", &current, &entry).as_str())?)
.read_metadata(bucket, format!("{}/{}", &current, &entry).as_str())
.await?;
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
@@ -1090,7 +1079,7 @@ impl LocalDisk {
out.write_obj(&MetaCacheEntry {
name: name.clone(),
metadata,
metadata: metadata.to_vec(),
..Default::default()
})
.await?;
@@ -1157,14 +1146,14 @@ impl LocalDisk {
let fname = format!("{}/{}", &meta.name, STORAGE_FORMAT_FILE);
match self.read_metadata(self.get_object_path(&opts.bucket, fname.as_str())?).await {
match self.read_metadata(&opts.bucket, fname.as_str()).await {
Ok(res) => {
if is_dir_obj {
meta.name = meta.name.trim_end_matches(GLOBAL_DIR_SUFFIX_WITH_SLASH).to_owned();
meta.name.push_str(SLASH_SEPARATOR);
}
meta.metadata = res;
meta.metadata = res.to_vec();
out.write_obj(&meta).await?;
@@ -1211,6 +1200,14 @@ impl LocalDisk {
}
}
pub struct ScanGuard(pub Arc<AtomicU32>);
impl Drop for ScanGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::Relaxed);
}
}
fn is_root_path(path: impl AsRef<Path>) -> bool {
path.as_ref().components().count() == 1 && path.as_ref().has_root()
}
@@ -1858,19 +1855,20 @@ impl DiskAPI for LocalDisk {
let mut objs_returned = 0;
if opts.base_dir.ends_with(SLASH_SEPARATOR) {
let fpath = self.get_object_path(
&opts.bucket,
path_join_buf(&[
format!("{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR), GLOBAL_DIR_SUFFIX).as_str(),
STORAGE_FORMAT_FILE,
])
.as_str(),
)?;
if let Ok(data) = self.read_metadata(fpath).await {
if let Ok(data) = self
.read_metadata(
&opts.bucket,
path_join_buf(&[
format!("{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR), GLOBAL_DIR_SUFFIX).as_str(),
STORAGE_FORMAT_FILE,
])
.as_str(),
)
.await
{
let meta = MetaCacheEntry {
name: opts.base_dir.clone(),
metadata: data,
metadata: data.to_vec(),
..Default::default()
};
out.write_obj(&meta).await?;
@@ -2449,6 +2447,26 @@ impl DiskAPI for LocalDisk {
Ok(info)
}
#[tracing::instrument(skip(self))]
fn start_scan(&self) -> ScanGuard {
self.scanning.fetch_add(1, Ordering::Relaxed);
ScanGuard(Arc::clone(&self.scanning))
}
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
// Try to use cached file content reading for better performance, with safe fallback
let file_path = self.get_object_path(volume, path)?;
// let file_path = file_path.join(Path::new(STORAGE_FORMAT_FILE));
// First, try the cache
if let Ok(bytes) = get_global_file_cache().get_file_content(file_path.clone()).await {
return Ok(bytes);
}
// Fallback to direct read if cache fails
let (data, _) = self.read_metadata_with_dmtime(file_path).await?;
Ok(data.into())
}
}
async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> {

View File

@@ -32,6 +32,7 @@ pub const STORAGE_FORMAT_FILE: &str = "xl.meta";
pub const STORAGE_FORMAT_FILE_BACKUP: &str = "xl.meta.bkp";
use crate::disk::disk_store::LocalDiskWrapper;
use crate::disk::local::ScanGuard;
use crate::rpc::RemoteDisk;
use bytes::Bytes;
use endpoint::Endpoint;
@@ -395,6 +396,20 @@ impl DiskAPI for Disk {
Disk::Remote(remote_disk) => remote_disk.disk_info(opts).await,
}
}
fn start_scan(&self) -> ScanGuard {
match self {
Disk::Local(local_disk) => local_disk.start_scan(),
Disk::Remote(remote_disk) => remote_disk.start_scan(),
}
}
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
match self {
Disk::Local(local_disk) => local_disk.read_metadata(volume, path).await,
Disk::Remote(remote_disk) => remote_disk.read_metadata(volume, path).await,
}
}
}
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
@@ -458,6 +473,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
opts: &ReadOptions,
) -> Result<FileInfo>;
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo>;
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes>;
async fn rename_data(
&self,
src_volume: &str,
@@ -489,6 +505,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo>;
fn start_scan(&self) -> ScanGuard;
}
#[derive(Debug, Default, Serialize, Deserialize)]

View File

@@ -440,11 +440,11 @@ impl PoolMeta {
}
}
fn path2_bucket_object(name: &str) -> (String, String) {
pub fn path2_bucket_object(name: &str) -> (String, String) {
path2_bucket_object_with_base_path("", name)
}
fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, String) {
pub fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, String) {
// Trim the base path and leading slash
let trimmed_path = path
.strip_prefix(base_path)
@@ -452,7 +452,11 @@ fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, S
.strip_prefix(SLASH_SEPARATOR)
.unwrap_or(path);
// Find the position of the first '/'
let pos = trimmed_path.find(SLASH_SEPARATOR).unwrap_or(trimmed_path.len());
#[cfg(windows)]
let trimmed_path = trimmed_path.replace('\\', "/");
let Some(pos) = trimmed_path.find(SLASH_SEPARATOR) else {
return (trimmed_path.to_string(), "".to_string());
};
// Split into bucket and prefix
let bucket = &trimmed_path[0..pos];
let prefix = &trimmed_path[pos + 1..]; // +1 to skip the '/' character if it exists

View File

@@ -12,29 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
path::PathBuf,
sync::{Arc, atomic::Ordering},
time::Duration,
};
use bytes::Bytes;
use futures::lock::Mutex;
use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE};
use rustfs_protos::{
node_service_time_out_client,
proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
},
};
use rustfs_utils::string::parse_bool_with_default;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
@@ -42,6 +19,7 @@ use crate::disk::{
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE, get_max_timeout_duration,
},
endpoint::Endpoint,
local::ScanGuard,
};
use crate::disk::{FileReader, FileWriter};
use crate::disk::{disk_store::DiskHealthTracker, error::DiskError};
@@ -49,11 +27,35 @@ use crate::{
disk::error::{Error, Result},
rpc::build_auth_headers,
};
use bytes::Bytes;
use futures::lock::Mutex;
use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE};
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_protos::{
node_service_time_out_client,
proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
ReadMetadataRequest, ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest,
RenameFileRequest, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
},
};
use rustfs_rio::{HttpReader, HttpWriter};
use rustfs_utils::string::parse_bool_with_default;
use std::{
path::PathBuf,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::Duration,
};
use tokio::time;
use tokio::{io::AsyncWrite, net::TcpStream, time::timeout};
use tokio_util::sync::CancellationToken;
use tonic::Request;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug)]
@@ -63,6 +65,7 @@ pub struct RemoteDisk {
pub url: url::Url,
pub root: PathBuf,
endpoint: Endpoint,
pub scanning: Arc<AtomicU32>,
/// Whether health checking is enabled
health_check: bool,
/// Health tracker for connection monitoring
@@ -91,6 +94,7 @@ impl RemoteDisk {
url: ep.url.clone(),
root,
endpoint: ep.clone(),
scanning: Arc::new(AtomicU32::new(0)),
health_check: opt.health_check && env_health_check,
health: Arc::new(DiskHealthTracker::new()),
cancel_token: CancellationToken::new(),
@@ -227,7 +231,7 @@ impl RemoteDisk {
{
// Check if disk is faulty
if self.health.is_faulty() {
warn!("disk {} health is faulty, returning error", self.to_string());
warn!("remote disk {} health is faulty, returning error", self.to_string());
return Err(DiskError::FaultyDisk);
}
@@ -726,6 +730,25 @@ impl DiskAPI for RemoteDisk {
.await
}
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadMetadataRequest {
volume: volume.to_string(),
path: path.to_string(),
disk: self.endpoint.to_string(),
});
let response = client.read_metadata(request).await?.into_inner();
if !response.success {
return Err(response.error.unwrap_or_default().into());
}
Ok(response.data)
}
#[tracing::instrument(skip(self))]
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()> {
info!("update_metadata");
@@ -1331,6 +1354,12 @@ impl DiskAPI for RemoteDisk {
Ok(disk_info)
}
#[tracing::instrument(skip(self))]
fn start_scan(&self) -> ScanGuard {
self.scanning.fetch_add(1, Ordering::Relaxed);
ScanGuard(Arc::clone(&self.scanning))
}
}
#[cfg(test)]

View File

@@ -73,6 +73,7 @@ use rustfs_filemeta::{
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
RawFileInfo, ReplicationStatusType, VersionPurgeStatusType, file_info_from_raw, merge_file_meta_versions,
};
use rustfs_lock::FastLockGuard;
use rustfs_lock::fast_lock::types::LockResult;
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, WarpReader};
@@ -1486,20 +1487,8 @@ impl SetDisks {
let object = object.clone();
let version_id = version_id.clone();
tokio::spawn(async move {
if let Some(disk) = disk
&& disk.is_online().await
{
if version_id.is_empty() {
match disk.read_xl(&bucket, &object, read_data).await {
Ok(info) => {
let fi = file_info_from_raw(info, &bucket, &object, read_data).await?;
Ok(fi)
}
Err(err) => Err(err),
}
} else {
disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await
}
if let Some(disk) = disk {
disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await
} else {
Err(DiskError::DiskNotFound)
}
@@ -2693,7 +2682,7 @@ impl SetDisks {
let (mut parts_metadata, errs) = {
let mut retry_count = 0;
loop {
let (parts, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, version_id, true, true).await?;
let (parts, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, version_id, false, false).await?;
// Check if we have enough valid metadata to proceed
// If we have too many errors, and we haven't exhausted retries, try again
@@ -2720,7 +2709,14 @@ impl SetDisks {
retry_count += 1;
}
};
info!(parts_count = parts_metadata.len(), ?errs, "File info read complete");
info!(
parts_count = parts_metadata.len(),
bucket = bucket,
object = object,
version_id = version_id,
?errs,
"File info read complete"
);
if DiskError::is_all_not_found(&errs) {
warn!(
"heal_object failed, all obj part not found, bucket: {}, obj: {}, version_id: {}",
@@ -4080,6 +4076,14 @@ impl ObjectIO for SetDisks {
#[async_trait::async_trait]
impl StorageAPI for SetDisks {
#[tracing::instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
self.fast_lock_manager
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))
}
#[tracing::instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
unimplemented!()
@@ -4848,10 +4852,18 @@ impl StorageAPI for SetDisks {
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
let transition_etag = rustfs_utils::path::trim_etag(&opts.transition.etag);
let stored_etag = rustfs_utils::path::trim_etag(&get_raw_etag(&fi.metadata));
if opts.mod_time.expect("err").unix_timestamp() != fi.mod_time.as_ref().expect("err").unix_timestamp()
|| transition_etag != stored_etag
{
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
if let Some(mod_time1) = opts.mod_time {
if let Some(mod_time2) = fi.mod_time.as_ref() {
if mod_time1.unix_timestamp() != mod_time2.unix_timestamp()
/*|| transition_etag != stored_etag*/
{
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
}
} else {
return Err(Error::other("mod_time 2 error.".to_string()));
}
} else {
return Err(Error::other("mod_time 1 error.".to_string()));
}
if fi.transition_status == TRANSITION_COMPLETE {
return Ok(());
@@ -6534,6 +6546,10 @@ async fn disks_with_all_parts(
let corrupted = !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir);
if corrupted {
warn!(
"disks_with_all_partsv2: metadata is corrupted, object_name={}, index: {index}",
object_name
);
meta_errs[index] = Some(DiskError::FileCorrupt);
parts_metadata[index] = FileInfo::default();
continue;
@@ -6541,6 +6557,10 @@ async fn disks_with_all_parts(
if erasure_distribution_reliable {
if !meta.is_valid() {
warn!(
"disks_with_all_partsv2: metadata is not valid, object_name={}, index: {index}",
object_name
);
parts_metadata[index] = FileInfo::default();
meta_errs[index] = Some(DiskError::FileCorrupt);
continue;
@@ -6551,6 +6571,10 @@ async fn disks_with_all_parts(
// Erasure distribution is not the same as onlineDisks
// attempt a fix if possible, assuming other entries
// might have the right erasure distribution.
warn!(
"disks_with_all_partsv2: erasure distribution is not the same as onlineDisks, object_name={}, index: {index}",
object_name
);
parts_metadata[index] = FileInfo::default();
meta_errs[index] = Some(DiskError::FileCorrupt);
continue;

View File

@@ -45,6 +45,7 @@ use rustfs_common::{
};
use rustfs_filemeta::FileInfo;
use rustfs_lock::FastLockGuard;
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_utils::{crc_hash, path::path_join_buf, sip_hash};
use tokio::sync::RwLock;
@@ -366,6 +367,10 @@ impl ObjectIO for Sets {
#[async_trait::async_trait]
impl StorageAPI for Sets {
#[tracing::instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
self.disk_set[0].new_ns_lock(bucket, object).await
}
#[tracing::instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
unimplemented!()

View File

@@ -58,6 +58,7 @@ use rand::Rng as _;
use rustfs_common::heal_channel::{HealItemType, HealOpts};
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT};
use rustfs_filemeta::FileInfo;
use rustfs_lock::FastLockGuard;
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf};
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
@@ -1151,6 +1152,10 @@ lazy_static! {
#[async_trait::async_trait]
impl StorageAPI for ECStore {
#[instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
self.pools[0].new_ns_lock(bucket, object).await
}
#[instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
let (standard_sc_parity, rr_sc_parity) = {

View File

@@ -30,6 +30,7 @@ use rustfs_filemeta::{
FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState,
ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
};
use rustfs_lock::FastLockGuard;
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_rio::Checksum;
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
@@ -1299,6 +1300,7 @@ pub trait ObjectIO: Send + Sync + Debug + 'static {
#[allow(clippy::too_many_arguments)]
pub trait StorageAPI: ObjectIO + Debug {
// NewNSLock TODO:
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard>;
// Shutdown TODO:
// NSScanner TODO:

View File

@@ -942,6 +942,41 @@ impl FileMeta {
}
}
pub fn get_file_info_versions(&self, volume: &str, path: &str, include_free_versions: bool) -> Result<FileInfoVersions> {
let mut versions = self.into_file_info_versions(volume, path, true)?;
let mut n = 0;
let mut versions_vec = Vec::new();
for fi in versions.versions.iter() {
if fi.tier_free_version() {
if !include_free_versions {
versions.free_versions.push(fi.clone());
}
} else {
if !include_free_versions {
versions_vec.push(fi.clone());
}
n += 1;
}
}
if !include_free_versions {
versions.versions = versions_vec;
}
for fi in versions.free_versions.iter_mut() {
fi.num_versions = n;
}
Ok(versions)
}
pub fn get_all_file_info_versions(&self, volume: &str, path: &str, all_parts: bool) -> Result<FileInfoVersions> {
self.into_file_info_versions(volume, path, all_parts)
}
pub fn into_file_info_versions(&self, volume: &str, path: &str, all_parts: bool) -> Result<FileInfoVersions> {
let mut versions = Vec::new();
for version in self.versions.iter() {

View File

@@ -709,7 +709,7 @@ pub fn parse_replicate_decision(_bucket: &str, s: &str) -> std::io::Result<Repli
// }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplicateObjectInfo {
pub name: String,
pub size: i64,

View File

@@ -438,6 +438,24 @@ pub struct DeletePathsResponse {
pub error: ::core::option::Option<Error>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ReadMetadataRequest {
#[prost(string, tag = "1")]
pub disk: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub volume: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub path: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ReadMetadataResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(message, optional, tag = "2")]
pub error: ::core::option::Option<Error>,
#[prost(bytes = "bytes", tag = "3")]
pub data: ::prost::bytes::Bytes,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct UpdateMetadataRequest {
#[prost(string, tag = "1")]
pub disk: ::prost::alloc::string::String,
@@ -1524,6 +1542,21 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "UpdateMetadata"));
self.inner.unary(req, path, codec).await
}
pub async fn read_metadata(
&mut self,
request: impl tonic::IntoRequest<super::ReadMetadataRequest>,
) -> std::result::Result<tonic::Response<super::ReadMetadataResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadMetadata");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "ReadMetadata"));
self.inner.unary(req, path, codec).await
}
pub async fn write_metadata(
&mut self,
request: impl tonic::IntoRequest<super::WriteMetadataRequest>,
@@ -2403,6 +2436,10 @@ pub mod node_service_server {
&self,
request: tonic::Request<super::UpdateMetadataRequest>,
) -> std::result::Result<tonic::Response<super::UpdateMetadataResponse>, tonic::Status>;
async fn read_metadata(
&self,
request: tonic::Request<super::ReadMetadataRequest>,
) -> std::result::Result<tonic::Response<super::ReadMetadataResponse>, tonic::Status>;
async fn write_metadata(
&self,
request: tonic::Request<super::WriteMetadataRequest>,
@@ -3407,6 +3444,34 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/ReadMetadata" => {
#[allow(non_camel_case_types)]
struct ReadMetadataSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::ReadMetadataRequest> for ReadMetadataSvc<T> {
type Response = super::ReadMetadataResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::ReadMetadataRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::read_metadata(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ReadMetadataSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/WriteMetadata" => {
#[allow(non_camel_case_types)]
struct WriteMetadataSvc<T: NodeService>(pub Arc<T>);

View File

@@ -313,6 +313,18 @@ message DeletePathsResponse {
optional Error error = 2;
}
message ReadMetadataRequest {
string disk = 1;
string volume = 2;
string path = 3;
}
message ReadMetadataResponse {
bool success = 1;
optional Error error = 2;
bytes data = 3;
}
message UpdateMetadataRequest {
string disk = 1;
string volume = 2;
@@ -786,6 +798,7 @@ service NodeService {
rpc StatVolume(StatVolumeRequest) returns (StatVolumeResponse) {};
rpc DeletePaths(DeletePathsRequest) returns (DeletePathsResponse) {};
rpc UpdateMetadata(UpdateMetadataRequest) returns (UpdateMetadataResponse) {};
rpc ReadMetadata(ReadMetadataRequest) returns (ReadMetadataResponse) {};
rpc WriteMetadata(WriteMetadataRequest) returns (WriteMetadataResponse) {};
rpc ReadVersion(ReadVersionRequest) returns (ReadVersionResponse) {};
rpc ReadXL(ReadXLRequest) returns (ReadXLResponse) {};
@@ -794,6 +807,7 @@ service NodeService {
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
rpc DiskInfo(DiskInfoRequest) returns (DiskInfoResponse) {};
/* -------------------------------lock service-------------------------- */

63
crates/scanner/Cargo.toml Normal file
View File

@@ -0,0 +1,63 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-scanner"
version.workspace = true
edition.workspace = true
authors = ["RustFS Team"]
license.workspace = true
repository.workspace = true
rust-version.workspace = true
homepage.workspace = true
description = "RustFS Scanner provides scanning capabilities for data integrity checks, health monitoring, and storage analysis."
keywords = ["RustFS", "scanner", "health-monitoring", "data-integrity", "storage-analysis", "Minio"]
categories = ["web-programming", "development-tools", "filesystem"]
documentation = "https://docs.rs/rustfs-scanner/latest/rustfs_scanner/"
[lints]
workspace = true
[dependencies]
rustfs-config = { workspace = true }
rustfs-common = { workspace = true }
rustfs-utils = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
anyhow = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
time = { workspace = true }
chrono = { workspace = true }
path-clean = { workspace = true }
rmp-serde = { workspace = true }
rustfs-filemeta = { workspace = true }
rustfs-madmin = { workspace = true }
tokio-util = { workspace = true }
rustfs-ecstore = { workspace = true }
rustfs-ahm = { workspace = true }
http = { workspace = true }
rand = { workspace = true }
s3s = { workspace = true }
[dev-dependencies]
tokio-test = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }
serial_test = { workspace = true }
heed = { workspace = true }

36
crates/scanner/README.md Normal file
View File

@@ -0,0 +1,36 @@
# RustFS Scanner
RustFS Scanner 提供了数据完整性检查、健康监控和存储分析等扫描功能。
## 功能特性
- 数据完整性扫描
- 健康监控
- 存储分析
- 可扩展的扫描框架
## 使用示例
```rust
use rustfs_scanner::ScannerError;
// TODO: 添加使用示例
```
## 开发
### 构建
```bash
cargo build --package rustfs-scanner
```
### 测试
```bash
cargo test --package rustfs-scanner
```
## 许可证
Apache License 2.0

View File

@@ -0,0 +1,39 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ecstore::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use rustfs_utils::path::SLASH_SEPARATOR;
use std::sync::LazyLock;
// Data usage constants
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR;
const DATA_USAGE_OBJ_NAME: &str = ".usage.json";
const DATA_USAGE_BLOOM_NAME: &str = ".bloomcycle.bin";
pub const DATA_USAGE_CACHE_NAME: &str = ".usage-cache.bin";
// Data usage paths (computed at runtime)
pub static DATA_USAGE_BUCKET: LazyLock<String> =
LazyLock::new(|| format!("{RUSTFS_META_BUCKET}{SLASH_SEPARATOR}{BUCKET_META_PREFIX}"));
pub static DATA_USAGE_OBJ_NAME_PATH: LazyLock<String> =
LazyLock::new(|| format!("{BUCKET_META_PREFIX}{SLASH_SEPARATOR}{DATA_USAGE_OBJ_NAME}"));
pub static DATA_USAGE_BLOOM_NAME_PATH: LazyLock<String> =
LazyLock::new(|| format!("{BUCKET_META_PREFIX}{SLASH_SEPARATOR}{DATA_USAGE_BLOOM_NAME}"));
pub static BACKGROUND_HEAL_INFO_PATH: LazyLock<String> =
LazyLock::new(|| format!("{BUCKET_META_PREFIX}{SLASH_SEPARATOR}.background-heal.json"));

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,36 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use thiserror::Error;
/// Scanner-related errors
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum ScannerError {
/// Configuration error
#[error("Configuration error: {0}")]
Config(String),
/// I/O error
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
/// Serialization error
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
/// Generic error
#[error("Scanner error: {0}")]
Other(String),
}

View File

@@ -0,0 +1,886 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[allow(dead_code)]
#[derive(Debug, Default)]
pub struct TimedAction {
count: u64,
acc_time: u64,
min_time: Option<u64>,
max_time: Option<u64>,
bytes: u64,
}
#[allow(dead_code)]
impl TimedAction {
// Avg returns the average time spent on the action.
pub fn avg(&self) -> Option<Duration> {
if self.count == 0 {
return None;
}
Some(Duration::from_nanos(self.acc_time / self.count))
}
// AvgBytes returns the average bytes processed.
pub fn avg_bytes(&self) -> u64 {
if self.count == 0 {
return 0;
}
self.bytes / self.count
}
// Merge other into t.
pub fn merge(&mut self, other: TimedAction) {
self.count += other.count;
self.acc_time += other.acc_time;
self.bytes += other.bytes;
if self.count == 0 {
self.min_time = other.min_time;
}
if let Some(other_min) = other.min_time {
self.min_time = self.min_time.map_or(Some(other_min), |min| Some(min.min(other_min)));
}
self.max_time = self
.max_time
.map_or(other.max_time, |max| Some(max.max(other.max_time.unwrap_or(0))));
}
}
#[allow(dead_code)]
#[derive(Debug)]
enum SizeCategory {
SizeLessThan1KiB = 0,
SizeLessThan1MiB,
SizeLessThan10MiB,
SizeLessThan100MiB,
SizeLessThan1GiB,
SizeGreaterThan1GiB,
// Add new entries here
SizeLastElemMarker,
}
impl std::fmt::Display for SizeCategory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match *self {
SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB",
SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB",
SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB",
SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB",
SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB",
SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB",
SizeCategory::SizeLastElemMarker => "SizeLastElemMarker",
};
write!(f, "{s}")
}
}
#[derive(Clone, Debug, Default, Copy)]
pub struct AccElem {
pub total: u64,
pub size: u64,
pub n: u64,
}
impl AccElem {
pub fn add(&mut self, dur: &Duration) {
let dur = dur.as_secs();
self.total = self.total.wrapping_add(dur);
self.n = self.n.wrapping_add(1);
}
pub fn merge(&mut self, b: &AccElem) {
self.n = self.n.wrapping_add(b.n);
self.total = self.total.wrapping_add(b.total);
self.size = self.size.wrapping_add(b.size);
}
pub fn avg(&self) -> Duration {
if self.n >= 1 && self.total > 0 {
return Duration::from_secs(self.total / self.n);
}
Duration::from_secs(0)
}
}
#[derive(Clone, Debug)]
pub struct LastMinuteLatency {
pub totals: Vec<AccElem>,
pub last_sec: u64,
}
impl Default for LastMinuteLatency {
fn default() -> Self {
Self {
totals: vec![AccElem::default(); 60],
last_sec: Default::default(),
}
}
}
impl LastMinuteLatency {
pub fn merge(&mut self, o: &LastMinuteLatency) -> LastMinuteLatency {
let mut merged = LastMinuteLatency::default();
let mut x = o.clone();
if self.last_sec > o.last_sec {
x.forward_to(self.last_sec);
merged.last_sec = self.last_sec;
} else {
self.forward_to(o.last_sec);
merged.last_sec = o.last_sec;
}
for i in 0..merged.totals.len() {
merged.totals[i] = AccElem {
total: self.totals[i].total + o.totals[i].total,
n: self.totals[i].n + o.totals[i].n,
size: self.totals[i].size + o.totals[i].size,
}
}
merged
}
pub fn add(&mut self, t: &Duration) {
let sec = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
self.forward_to(sec);
let win_idx = sec % 60;
self.totals[win_idx as usize].add(t);
self.last_sec = sec;
}
pub fn add_all(&mut self, sec: u64, a: &AccElem) {
self.forward_to(sec);
let win_idx = sec % 60;
self.totals[win_idx as usize].merge(a);
self.last_sec = sec;
}
pub fn get_total(&mut self) -> AccElem {
let mut res = AccElem::default();
let sec = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
self.forward_to(sec);
for elem in self.totals.iter() {
res.merge(elem);
}
res
}
pub fn forward_to(&mut self, t: u64) {
if self.last_sec >= t {
return;
}
if t - self.last_sec >= 60 {
self.totals = vec![AccElem::default(); 60];
self.last_sec = t;
return;
}
while self.last_sec != t {
let idx = (self.last_sec + 1) % 60;
self.totals[idx as usize] = AccElem::default();
self.last_sec += 1;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_acc_elem_default() {
let elem = AccElem::default();
assert_eq!(elem.total, 0);
assert_eq!(elem.size, 0);
assert_eq!(elem.n, 0);
}
#[test]
fn test_acc_elem_add_single_duration() {
let mut elem = AccElem::default();
let duration = Duration::from_secs(5);
elem.add(&duration);
assert_eq!(elem.total, 5);
assert_eq!(elem.n, 1);
assert_eq!(elem.size, 0); // size is not modified by add
}
#[test]
fn test_acc_elem_add_multiple_durations() {
let mut elem = AccElem::default();
elem.add(&Duration::from_secs(3));
elem.add(&Duration::from_secs(7));
elem.add(&Duration::from_secs(2));
assert_eq!(elem.total, 12);
assert_eq!(elem.n, 3);
assert_eq!(elem.size, 0);
}
#[test]
fn test_acc_elem_add_zero_duration() {
let mut elem = AccElem::default();
let duration = Duration::from_secs(0);
elem.add(&duration);
assert_eq!(elem.total, 0);
assert_eq!(elem.n, 1);
}
#[test]
fn test_acc_elem_add_subsecond_duration() {
let mut elem = AccElem::default();
// Duration less than 1 second should be truncated to 0
let duration = Duration::from_millis(500);
elem.add(&duration);
assert_eq!(elem.total, 0); // as_secs() truncates subsecond values
assert_eq!(elem.n, 1);
}
#[test]
fn test_acc_elem_merge_empty_elements() {
let mut elem1 = AccElem::default();
let elem2 = AccElem::default();
elem1.merge(&elem2);
assert_eq!(elem1.total, 0);
assert_eq!(elem1.size, 0);
assert_eq!(elem1.n, 0);
}
#[test]
fn test_acc_elem_merge_with_data() {
let mut elem1 = AccElem {
total: 10,
size: 100,
n: 2,
};
let elem2 = AccElem {
total: 15,
size: 200,
n: 3,
};
elem1.merge(&elem2);
assert_eq!(elem1.total, 25);
assert_eq!(elem1.size, 300);
assert_eq!(elem1.n, 5);
}
#[test]
fn test_acc_elem_merge_one_empty() {
let mut elem1 = AccElem {
total: 10,
size: 100,
n: 2,
};
let elem2 = AccElem::default();
elem1.merge(&elem2);
assert_eq!(elem1.total, 10);
assert_eq!(elem1.size, 100);
assert_eq!(elem1.n, 2);
}
#[test]
fn test_acc_elem_avg_with_data() {
let elem = AccElem {
total: 15,
size: 0,
n: 3,
};
let avg = elem.avg();
assert_eq!(avg, Duration::from_secs(5)); // 15 / 3 = 5
}
#[test]
fn test_acc_elem_avg_zero_count() {
let elem = AccElem {
total: 10,
size: 0,
n: 0,
};
let avg = elem.avg();
assert_eq!(avg, Duration::from_secs(0));
}
#[test]
fn test_acc_elem_avg_zero_total() {
let elem = AccElem { total: 0, size: 0, n: 5 };
let avg = elem.avg();
assert_eq!(avg, Duration::from_secs(0));
}
#[test]
fn test_acc_elem_avg_rounding() {
let elem = AccElem {
total: 10,
size: 0,
n: 3,
};
let avg = elem.avg();
assert_eq!(avg, Duration::from_secs(3)); // 10 / 3 = 3 (integer division)
}
#[test]
fn test_last_minute_latency_default() {
let latency = LastMinuteLatency::default();
assert_eq!(latency.totals.len(), 60);
assert_eq!(latency.last_sec, 0);
// All elements should be default (empty)
for elem in &latency.totals {
assert_eq!(elem.total, 0);
assert_eq!(elem.size, 0);
assert_eq!(elem.n, 0);
}
}
#[test]
fn test_last_minute_latency_forward_to_same_time() {
let mut latency = LastMinuteLatency {
last_sec: 100,
..Default::default()
};
// Add some data to verify it's not cleared
latency.totals[0].total = 10;
latency.totals[0].n = 1;
latency.forward_to(100); // Same time
assert_eq!(latency.last_sec, 100);
assert_eq!(latency.totals[0].total, 10); // Data should remain
assert_eq!(latency.totals[0].n, 1);
}
#[test]
fn test_last_minute_latency_forward_to_past_time() {
let mut latency = LastMinuteLatency {
last_sec: 100,
..Default::default()
};
// Add some data to verify it's not cleared
latency.totals[0].total = 10;
latency.totals[0].n = 1;
latency.forward_to(50); // Past time
assert_eq!(latency.last_sec, 100); // Should not change
assert_eq!(latency.totals[0].total, 10); // Data should remain
assert_eq!(latency.totals[0].n, 1);
}
#[test]
fn test_last_minute_latency_forward_to_large_gap() {
let mut latency = LastMinuteLatency {
last_sec: 100,
..Default::default()
};
// Add some data to verify it's cleared
latency.totals[0].total = 10;
latency.totals[0].n = 1;
latency.forward_to(200); // Gap >= 60 seconds
assert_eq!(latency.last_sec, 200); // last_sec should be updated to target time
// All data should be cleared
for elem in &latency.totals {
assert_eq!(elem.total, 0);
assert_eq!(elem.size, 0);
assert_eq!(elem.n, 0);
}
}
#[test]
fn test_last_minute_latency_forward_to_small_gap() {
let mut latency = LastMinuteLatency {
last_sec: 100,
..Default::default()
};
// Add data at specific indices
latency.totals[41].total = 10; // (100 + 1) % 60 = 41
latency.totals[42].total = 20; // (100 + 2) % 60 = 42
latency.forward_to(102); // Forward by 2 seconds
assert_eq!(latency.last_sec, 102);
// The slots that were advanced should be cleared
assert_eq!(latency.totals[41].total, 0); // Cleared during forward
assert_eq!(latency.totals[42].total, 0); // Cleared during forward
}
#[test]
fn test_last_minute_latency_add_all() {
let mut latency = LastMinuteLatency::default();
let acc_elem = AccElem {
total: 15,
size: 100,
n: 3,
};
latency.add_all(1000, &acc_elem);
assert_eq!(latency.last_sec, 1000);
let idx = 1000 % 60; // Should be 40
assert_eq!(latency.totals[idx as usize].total, 15);
assert_eq!(latency.totals[idx as usize].size, 100);
assert_eq!(latency.totals[idx as usize].n, 3);
}
#[test]
fn test_last_minute_latency_add_all_multiple() {
let mut latency = LastMinuteLatency::default();
let acc_elem1 = AccElem {
total: 10,
size: 50,
n: 2,
};
let acc_elem2 = AccElem {
total: 20,
size: 100,
n: 4,
};
latency.add_all(1000, &acc_elem1);
latency.add_all(1000, &acc_elem2); // Same second
let idx = 1000 % 60;
assert_eq!(latency.totals[idx as usize].total, 30); // 10 + 20
assert_eq!(latency.totals[idx as usize].size, 150); // 50 + 100
assert_eq!(latency.totals[idx as usize].n, 6); // 2 + 4
}
#[test]
fn test_last_minute_latency_merge_same_time() {
let mut latency1 = LastMinuteLatency::default();
let mut latency2 = LastMinuteLatency::default();
latency1.last_sec = 1000;
latency2.last_sec = 1000;
// Add data to both
latency1.totals[0].total = 10;
latency1.totals[0].n = 2;
latency2.totals[0].total = 20;
latency2.totals[0].n = 3;
let merged = latency1.merge(&latency2);
assert_eq!(merged.last_sec, 1000);
assert_eq!(merged.totals[0].total, 30); // 10 + 20
assert_eq!(merged.totals[0].n, 5); // 2 + 3
}
#[test]
fn test_last_minute_latency_merge_different_times() {
let mut latency1 = LastMinuteLatency::default();
let mut latency2 = LastMinuteLatency::default();
latency1.last_sec = 1000;
latency2.last_sec = 1010; // 10 seconds later
// Add data to both
latency1.totals[0].total = 10;
latency2.totals[0].total = 20;
let merged = latency1.merge(&latency2);
assert_eq!(merged.last_sec, 1010); // Should use the later time
assert_eq!(merged.totals[0].total, 30);
}
#[test]
fn test_last_minute_latency_merge_empty() {
let mut latency1 = LastMinuteLatency::default();
let latency2 = LastMinuteLatency::default();
let merged = latency1.merge(&latency2);
assert_eq!(merged.last_sec, 0);
for elem in &merged.totals {
assert_eq!(elem.total, 0);
assert_eq!(elem.size, 0);
assert_eq!(elem.n, 0);
}
}
#[test]
fn test_last_minute_latency_window_wraparound() {
let mut latency = LastMinuteLatency::default();
// Test that indices wrap around correctly
for sec in 0..120 {
// Test for 2 minutes
let acc_elem = AccElem {
total: sec,
size: 0,
n: 1,
};
latency.add_all(sec, &acc_elem);
let expected_idx = sec % 60;
assert_eq!(latency.totals[expected_idx as usize].total, sec);
}
}
#[test]
fn test_last_minute_latency_time_progression() {
let mut latency = LastMinuteLatency::default();
// Add data at time 1000
latency.add_all(
1000,
&AccElem {
total: 10,
size: 0,
n: 1,
},
);
// Forward to time 1030 (30 seconds later)
latency.forward_to(1030);
// Original data should still be there
let idx_1000 = 1000 % 60;
assert_eq!(latency.totals[idx_1000 as usize].total, 10);
// Forward to time 1070 (70 seconds from original, > 60 seconds)
latency.forward_to(1070);
// All data should be cleared due to large gap
for elem in &latency.totals {
assert_eq!(elem.total, 0);
assert_eq!(elem.n, 0);
}
}
#[test]
fn test_last_minute_latency_realistic_scenario() {
let mut latency = LastMinuteLatency::default();
let base_time = 1000u64;
// Add data for exactly 60 seconds to fill the window
for i in 0..60 {
let current_time = base_time + i;
let duration_secs = i % 10 + 1; // Varying durations 1-10 seconds
let acc_elem = AccElem {
total: duration_secs,
size: 1024 * (i % 5 + 1), // Varying sizes
n: 1,
};
latency.add_all(current_time, &acc_elem);
}
// Count non-empty slots after filling the window
let mut non_empty_count = 0;
let mut total_n = 0;
let mut total_sum = 0;
for elem in &latency.totals {
if elem.n > 0 {
non_empty_count += 1;
total_n += elem.n;
total_sum += elem.total;
}
}
// We should have exactly 60 non-empty slots (one for each second in the window)
assert_eq!(non_empty_count, 60);
assert_eq!(total_n, 60); // 60 data points total
assert!(total_sum > 0);
// Test manual total calculation (get_total uses system time which interferes with test)
let mut manual_total = AccElem::default();
for elem in &latency.totals {
manual_total.merge(elem);
}
assert_eq!(manual_total.n, 60);
assert_eq!(manual_total.total, total_sum);
}
#[test]
fn test_acc_elem_clone_and_debug() {
let elem = AccElem {
total: 100,
size: 200,
n: 5,
};
let cloned = elem;
assert_eq!(elem.total, cloned.total);
assert_eq!(elem.size, cloned.size);
assert_eq!(elem.n, cloned.n);
// Test Debug trait
let debug_str = format!("{elem:?}");
assert!(debug_str.contains("100"));
assert!(debug_str.contains("200"));
assert!(debug_str.contains("5"));
}
#[test]
fn test_last_minute_latency_clone() {
let mut latency = LastMinuteLatency {
last_sec: 1000,
..Default::default()
};
latency.totals[0].total = 100;
latency.totals[0].n = 5;
let cloned = latency.clone();
assert_eq!(latency.last_sec, cloned.last_sec);
assert_eq!(latency.totals[0].total, cloned.totals[0].total);
assert_eq!(latency.totals[0].n, cloned.totals[0].n);
}
#[test]
fn test_edge_case_max_values() {
let mut elem = AccElem {
total: u64::MAX - 50,
size: u64::MAX - 50,
n: u64::MAX - 50,
};
let other = AccElem {
total: 100,
size: 100,
n: 100,
};
// This should not panic due to overflow, values will wrap around
elem.merge(&other);
// Values should wrap around due to overflow (wrapping_add behavior)
assert_eq!(elem.total, 49); // (u64::MAX - 50) + 100 wraps to 49
assert_eq!(elem.size, 49);
assert_eq!(elem.n, 49);
}
#[test]
fn test_forward_to_boundary_conditions() {
let mut latency = LastMinuteLatency {
last_sec: 59,
..Default::default()
};
// Add data at the last slot
latency.totals[59].total = 100;
latency.totals[59].n = 1;
// Forward exactly 60 seconds (boundary case)
latency.forward_to(119);
// All data should be cleared
for elem in &latency.totals {
assert_eq!(elem.total, 0);
assert_eq!(elem.n, 0);
}
}
#[test]
fn test_get_total_with_data() {
let mut latency = LastMinuteLatency::default();
// Set a recent timestamp to avoid forward_to clearing data
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
latency.last_sec = current_time;
// Add data to multiple slots
latency.totals[0] = AccElem {
total: 10,
size: 100,
n: 1,
};
latency.totals[1] = AccElem {
total: 20,
size: 200,
n: 2,
};
latency.totals[59] = AccElem {
total: 30,
size: 300,
n: 3,
};
let total = latency.get_total();
assert_eq!(total.total, 60);
assert_eq!(total.size, 600);
assert_eq!(total.n, 6);
}
#[test]
fn test_window_index_calculation() {
// Test that window index calculation works correctly
let _latency = LastMinuteLatency::default();
let acc_elem = AccElem { total: 1, size: 1, n: 1 };
// Test various timestamps
let test_cases = [(0, 0), (1, 1), (59, 59), (60, 0), (61, 1), (119, 59), (120, 0)];
for (timestamp, expected_idx) in test_cases {
let mut test_latency = LastMinuteLatency::default();
test_latency.add_all(timestamp, &acc_elem);
assert_eq!(
test_latency.totals[expected_idx].n, 1,
"Failed for timestamp {timestamp} (expected index {expected_idx})"
);
}
}
#[test]
fn test_concurrent_safety_simulation() {
// Simulate concurrent access patterns
let mut latency = LastMinuteLatency::default();
// Use current time to ensure data doesn't get cleared by get_total
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
// Simulate rapid additions within a 60-second window
for i in 0..1000 {
let acc_elem = AccElem {
total: (i % 10) + 1, // Ensure non-zero values
size: (i % 100) + 1,
n: 1,
};
// Keep all timestamps within the current minute window
latency.add_all(current_time - (i % 60), &acc_elem);
}
let total = latency.get_total();
assert!(total.n > 0, "Total count should be greater than 0");
assert!(total.total > 0, "Total time should be greater than 0");
}
#[test]
fn test_acc_elem_debug_format() {
let elem = AccElem {
total: 123,
size: 456,
n: 789,
};
let debug_str = format!("{elem:?}");
assert!(debug_str.contains("123"));
assert!(debug_str.contains("456"));
assert!(debug_str.contains("789"));
}
#[test]
fn test_large_values() {
let mut elem = AccElem::default();
// Test with large duration values
let large_duration = Duration::from_secs(u64::MAX / 2);
elem.add(&large_duration);
assert_eq!(elem.total, u64::MAX / 2);
assert_eq!(elem.n, 1);
// Test average calculation with large values
let avg = elem.avg();
assert_eq!(avg, Duration::from_secs(u64::MAX / 2));
}
#[test]
fn test_zero_duration_handling() {
let mut elem = AccElem::default();
let zero_duration = Duration::from_secs(0);
elem.add(&zero_duration);
assert_eq!(elem.total, 0);
assert_eq!(elem.n, 1);
assert_eq!(elem.avg(), Duration::from_secs(0));
}
}
const SIZE_LAST_ELEM_MARKER: usize = 10; // Assumed marker size is 10, modify according to actual situation
#[allow(dead_code)]
#[derive(Debug, Default)]
pub struct LastMinuteHistogram {
histogram: Vec<LastMinuteLatency>,
size: u32,
}
impl LastMinuteHistogram {
pub fn merge(&mut self, other: &LastMinuteHistogram) {
for i in 0..self.histogram.len() {
self.histogram[i].merge(&other.histogram[i]);
}
}
pub fn add(&mut self, size: i64, t: Duration) {
let index = size_to_tag(size);
self.histogram[index].add(&t);
}
pub fn get_avg_data(&mut self) -> [AccElem; SIZE_LAST_ELEM_MARKER] {
let mut res = [AccElem::default(); SIZE_LAST_ELEM_MARKER];
for (i, elem) in self.histogram.iter_mut().enumerate() {
res[i] = elem.get_total();
}
res
}
}
fn size_to_tag(size: i64) -> usize {
match size {
_ if size < 1024 => 0, // sizeLessThan1KiB
_ if size < 1024 * 1024 => 1, // sizeLessThan1MiB
_ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB
_ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB
_ if size < 1024 * 1024 * 1024 => 4, // sizeLessThan1GiB
_ => 5, // sizeGreaterThan1GiB
}
}

34
crates/scanner/src/lib.rs Normal file
View File

@@ -0,0 +1,34 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![warn(
// missing_docs,
rustdoc::missing_crate_level_docs,
unreachable_pub,
rust_2018_idioms
)]
pub mod data_usage;
pub mod data_usage_define;
pub mod error;
pub mod last_minute;
pub mod metrics;
pub mod scanner;
pub mod scanner_folder;
pub mod scanner_io;
pub use data_usage_define::*;
pub use error::ScannerError;
pub use scanner::init_data_scanner;

View File

@@ -0,0 +1,576 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::last_minute::{AccElem, LastMinuteLatency};
use chrono::{DateTime, Utc};
use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::Display,
pin::Pin,
sync::{
Arc, OnceLock,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use tokio::sync::{Mutex, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IlmAction {
NoneAction = 0,
DeleteAction,
DeleteVersionAction,
TransitionAction,
TransitionVersionAction,
DeleteRestoredAction,
DeleteRestoredVersionAction,
DeleteAllVersionsAction,
DelMarkerDeleteAllVersionsAction,
ActionCount,
}
impl IlmAction {
pub fn delete_restored(&self) -> bool {
*self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_versioned(&self) -> bool {
*self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_all(&self) -> bool {
*self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction
}
pub fn delete(&self) -> bool {
if self.delete_restored() {
return true;
}
*self == Self::DeleteVersionAction
|| *self == Self::DeleteAction
|| *self == Self::DeleteAllVersionsAction
|| *self == Self::DelMarkerDeleteAllVersionsAction
}
}
impl Display for IlmAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
pub static GLOBAL_METRICS: OnceLock<Arc<Metrics>> = OnceLock::new();
pub fn global_metrics() -> &'static Arc<Metrics> {
GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new()))
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum Metric {
// START Realtime metrics, that only records
// last minute latencies and total operation count.
ReadMetadata = 0,
CheckMissing,
SaveUsage,
ApplyAll,
ApplyVersion,
TierObjSweep,
HealCheck,
Ilm,
CheckReplication,
Yield,
CleanAbandoned,
ApplyNonCurrent,
HealAbandonedVersion,
// START Trace metrics:
StartTrace,
ScanObject, // Scan object. All operations included.
HealAbandonedObject,
// END realtime metrics:
LastRealtime,
// Trace only metrics:
ScanFolder, // Scan a folder on disk, recursively.
ScanCycle, // Full cycle, cluster global.
ScanBucketDrive, // Single bucket on one drive.
CompactFolder, // Folder compacted.
// Must be last:
Last,
}
impl Metric {
/// Convert to string representation for metrics
pub fn as_str(self) -> &'static str {
match self {
Self::ReadMetadata => "read_metadata",
Self::CheckMissing => "check_missing",
Self::SaveUsage => "save_usage",
Self::ApplyAll => "apply_all",
Self::ApplyVersion => "apply_version",
Self::TierObjSweep => "tier_obj_sweep",
Self::HealCheck => "heal_check",
Self::Ilm => "ilm",
Self::CheckReplication => "check_replication",
Self::Yield => "yield",
Self::CleanAbandoned => "clean_abandoned",
Self::ApplyNonCurrent => "apply_non_current",
Self::HealAbandonedVersion => "heal_abandoned_version",
Self::StartTrace => "start_trace",
Self::ScanObject => "scan_object",
Self::HealAbandonedObject => "heal_abandoned_object",
Self::LastRealtime => "last_realtime",
Self::ScanFolder => "scan_folder",
Self::ScanCycle => "scan_cycle",
Self::ScanBucketDrive => "scan_bucket_drive",
Self::CompactFolder => "compact_folder",
Self::Last => "last",
}
}
/// Convert from index back to enum (safe version)
pub fn from_index(index: usize) -> Option<Self> {
if index >= Self::Last as usize {
return None;
}
// Safe conversion using match instead of unsafe transmute
match index {
0 => Some(Self::ReadMetadata),
1 => Some(Self::CheckMissing),
2 => Some(Self::SaveUsage),
3 => Some(Self::ApplyAll),
4 => Some(Self::ApplyVersion),
5 => Some(Self::TierObjSweep),
6 => Some(Self::HealCheck),
7 => Some(Self::Ilm),
8 => Some(Self::CheckReplication),
9 => Some(Self::Yield),
10 => Some(Self::CleanAbandoned),
11 => Some(Self::ApplyNonCurrent),
12 => Some(Self::HealAbandonedVersion),
13 => Some(Self::StartTrace),
14 => Some(Self::ScanObject),
15 => Some(Self::HealAbandonedObject),
16 => Some(Self::LastRealtime),
17 => Some(Self::ScanFolder),
18 => Some(Self::ScanCycle),
19 => Some(Self::ScanBucketDrive),
20 => Some(Self::CompactFolder),
21 => Some(Self::Last),
_ => None,
}
}
}
/// Thread-safe wrapper for LastMinuteLatency with atomic operations
#[derive(Default)]
pub struct LockedLastMinuteLatency {
latency: Arc<Mutex<LastMinuteLatency>>,
}
impl Clone for LockedLastMinuteLatency {
fn clone(&self) -> Self {
Self {
latency: Arc::clone(&self.latency),
}
}
}
impl LockedLastMinuteLatency {
pub fn new() -> Self {
Self {
latency: Arc::new(Mutex::new(LastMinuteLatency::default())),
}
}
/// Add a duration measurement
pub async fn add(&self, duration: Duration) {
self.add_size(duration, 0).await;
}
/// Add a duration measurement with size
pub async fn add_size(&self, duration: Duration, size: u64) {
let mut latency = self.latency.lock().await;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elem = AccElem {
n: 1,
total: duration.as_secs(),
size,
};
latency.add_all(now, &elem);
}
/// Get total accumulated metrics for the last minute
pub async fn total(&self) -> AccElem {
let mut latency = self.latency.lock().await;
latency.get_total()
}
}
/// Current path tracker for monitoring active scan paths
struct CurrentPathTracker {
current_path: Arc<RwLock<String>>,
}
impl CurrentPathTracker {
fn new(initial_path: String) -> Self {
Self {
current_path: Arc::new(RwLock::new(initial_path)),
}
}
async fn update_path(&self, path: String) {
*self.current_path.write().await = path;
}
async fn get_path(&self) -> String {
self.current_path.read().await.clone()
}
}
/// Main scanner metrics structure
pub struct Metrics {
// All fields must be accessed atomically and aligned.
operations: Vec<AtomicU64>,
latency: Vec<LockedLastMinuteLatency>,
actions: Vec<AtomicU64>,
actions_latency: Vec<LockedLastMinuteLatency>,
// Current paths contains disk -> tracker mappings
current_paths: Arc<RwLock<HashMap<String, Arc<CurrentPathTracker>>>>,
// Cycle information
cycle_info: Arc<RwLock<Option<CurrentCycle>>>,
}
// This is a placeholder. We'll need to define this struct.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct CurrentCycle {
pub current: u64,
pub next: u64,
pub cycle_completed: Vec<DateTime<Utc>>,
pub started: DateTime<Utc>,
}
impl CurrentCycle {
pub fn unmarshal(&mut self, buf: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
*self = rmp_serde::from_slice(buf)?;
Ok(())
}
pub fn marshal(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
Ok(rmp_serde::to_vec(self)?)
}
}
impl Metrics {
pub fn new() -> Self {
let operations = (0..Metric::Last as usize).map(|_| AtomicU64::new(0)).collect();
let latency = (0..Metric::LastRealtime as usize)
.map(|_| LockedLastMinuteLatency::new())
.collect();
Self {
operations,
latency,
actions: (0..IlmAction::ActionCount as usize).map(|_| AtomicU64::new(0)).collect(),
actions_latency: vec![LockedLastMinuteLatency::default(); IlmAction::ActionCount as usize],
current_paths: Arc::new(RwLock::new(HashMap::new())),
cycle_info: Arc::new(RwLock::new(None)),
}
}
/// Log scanner action with custom metadata - compatible with existing usage
pub fn log(metric: Metric) -> impl Fn(&HashMap<String, String>) {
let metric = metric as usize;
let start_time = SystemTime::now();
move |_custom: &HashMap<String, String>| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task for this)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
});
}
// Log trace metrics
if metric as u8 > Metric::StartTrace as u8 {
//debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric");
}
}
}
/// Time scanner action with size - returns function that takes size
pub fn time_size(metric: Metric) -> impl Fn(u64) {
let metric = metric as usize;
let start_time = SystemTime::now();
move |size: u64| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics with size (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add_size(duration, size).await;
});
}
}
}
/// Time a scanner action - returns a closure to call when done
pub fn time(metric: Metric) -> impl Fn() {
let metric = metric as usize;
let start_time = SystemTime::now();
move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
});
}
}
}
/// Time N scanner actions - returns function that takes count, then returns completion function
pub fn time_n(metric: Metric) -> Box<dyn Fn(usize) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let metric = metric as usize;
let start_time = SystemTime::now();
Box::new(move |count: usize| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(count as u64, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
});
}
})
})
}
/// Time ILM action with versions - returns function that takes versions, then returns completion function
pub fn time_ilm(a: IlmAction) -> Box<dyn Fn(u64) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let a_clone = a as usize;
if a_clone == IlmAction::NoneAction as usize || a_clone >= IlmAction::ActionCount as usize {
return Box::new(move |_: u64| Box::new(move || {}));
}
let start = SystemTime::now();
Box::new(move |versions: u64| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
tokio::spawn(async move {
global_metrics().actions[a_clone].fetch_add(versions, Ordering::Relaxed);
global_metrics().actions_latency[a_clone].add(duration).await;
});
})
})
}
/// Increment time with specific duration
pub async fn inc_time(metric: Metric, duration: Duration) {
let metric = metric as usize;
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics
if (metric) < Metric::LastRealtime as usize {
global_metrics().latency[metric].add(duration).await;
}
}
/// Get lifetime operation count for a metric
pub fn lifetime(&self, metric: Metric) -> u64 {
let metric = metric as usize;
if (metric) >= Metric::Last as usize {
return 0;
}
self.operations[metric].load(Ordering::Relaxed)
}
/// Get last minute statistics for a metric
pub async fn last_minute(&self, metric: Metric) -> AccElem {
let metric = metric as usize;
if (metric) >= Metric::LastRealtime as usize {
return AccElem::default();
}
self.latency[metric].total().await
}
/// Set current cycle information
pub async fn set_cycle(&self, cycle: Option<CurrentCycle>) {
*self.cycle_info.write().await = cycle;
}
/// Get current cycle information
pub async fn get_cycle(&self) -> Option<CurrentCycle> {
self.cycle_info.read().await.clone()
}
/// Get current active paths
pub async fn get_current_paths(&self) -> Vec<String> {
let mut result = Vec::new();
let paths = self.current_paths.read().await;
for (disk, tracker) in paths.iter() {
let path = tracker.get_path().await;
result.push(format!("{disk}/{path}"));
}
result
}
/// Get number of active drives
pub async fn active_drives(&self) -> usize {
self.current_paths.read().await.len()
}
/// Generate metrics report
pub async fn report(&self) -> M_ScannerMetrics {
let mut metrics = M_ScannerMetrics::default();
// Set cycle information
if let Some(cycle) = self.get_cycle().await {
metrics.current_cycle = cycle.current;
metrics.cycles_completed_at = cycle.cycle_completed;
metrics.current_started = cycle.started;
}
metrics.collected_at = Utc::now();
metrics.active_paths = self.get_current_paths().await;
// Lifetime operations
for i in 0..Metric::Last as usize {
let count = self.operations[i].load(Ordering::Relaxed);
if count > 0 {
if let Some(metric) = Metric::from_index(i) {
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
}
}
}
// Last minute statistics for realtime metrics
for i in 0..Metric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
if last_min.n > 0 {
if let Some(_metric) = Metric::from_index(i) {
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
}
}
}
metrics
}
}
// Type aliases for compatibility with existing code
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
/// Create a current path updater for tracking scan progress
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let tracker = Arc::new(CurrentPathTracker::new(initial.to_string()));
let disk_name = disk.to_string();
// Store the tracker in global metrics
let tracker_clone = Arc::clone(&tracker);
let disk_clone = disk_name.clone();
tokio::spawn(async move {
global_metrics().current_paths.write().await.insert(disk_clone, tracker_clone);
});
let update_fn = {
let tracker = Arc::clone(&tracker);
Arc::new(move |path: &str| -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let tracker = Arc::clone(&tracker);
let path = path.to_string();
Box::pin(async move {
tracker.update_path(path).await;
})
})
};
let done_fn = {
let disk_name = disk_name.clone();
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
global_metrics().current_paths.write().await.remove(&disk_name);
})
})
};
(update_fn, done_fn)
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
pub struct CloseDiskGuard(CloseDiskFn);
impl CloseDiskGuard {
pub fn new(close_disk: CloseDiskFn) -> Self {
Self(close_disk)
}
pub async fn close(&self) {
(self.0)().await;
}
}
impl Drop for CloseDiskGuard {
fn drop(&mut self) {
// Drop cannot be async, so we spawn the async cleanup task
// The task will run in the background and complete asynchronously
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let close_fn = self.0.clone();
handle.spawn(async move {
close_fn().await;
});
} else {
// If we're not in a tokio runtime context, we can't spawn
// This is a best-effort cleanup, so we just skip it
}
}
}

View File

@@ -0,0 +1,269 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::data_usage::{BACKGROUND_HEAL_INFO_PATH, DATA_USAGE_BLOOM_NAME_PATH, DATA_USAGE_OBJ_NAME_PATH};
use crate::metrics::CurrentCycle;
use crate::metrics::global_metrics;
use crate::scanner_folder::data_usage_update_dir_cycles;
use crate::scanner_io::ScannerIO;
use crate::{DataUsageInfo, ScannerError};
use chrono::{DateTime, Utc};
use rustfs_common::heal_channel::HealScanMode;
use rustfs_config::{DEFAULT_DATA_SCANNER_START_DELAY_SECS, ENV_DATA_SCANNER_START_DELAY_SECS};
use rustfs_ecstore::StorageAPI as _;
use rustfs_ecstore::config::com::{read_config, save_config};
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
use rustfs_ecstore::error::Error as EcstoreError;
use rustfs_ecstore::global::is_erasure_sd;
use rustfs_ecstore::store::ECStore;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
fn data_scanner_start_delay() -> Duration {
let secs = rustfs_utils::get_env_u64(ENV_DATA_SCANNER_START_DELAY_SECS, DEFAULT_DATA_SCANNER_START_DELAY_SECS);
Duration::from_secs(secs)
}
pub async fn init_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) {
let ctx_clone = ctx.clone();
let storeapi_clone = storeapi.clone();
tokio::spawn(async move {
let sleep_time = Duration::from_secs(rand::random::<u64>() % 5);
tokio::time::sleep(sleep_time).await;
loop {
if ctx_clone.is_cancelled() {
break;
}
if let Err(e) = run_data_scanner(ctx_clone.clone(), storeapi_clone.clone()).await {
error!("Failed to run data scanner: {e}");
}
tokio::time::sleep(data_scanner_start_delay()).await;
}
});
}
fn get_cycle_scan_mode(_current_cycle: u64, _bitrot_start_cycle: u64, _bitrot_start_time: Option<DateTime<Utc>>) -> HealScanMode {
// TODO: from config
HealScanMode::Normal
}
/// Background healing information
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BackgroundHealInfo {
/// Bitrot scan start time
pub bitrot_start_time: Option<DateTime<Utc>>,
/// Bitrot scan start cycle
pub bitrot_start_cycle: u64,
/// Current scan mode
pub current_scan_mode: HealScanMode,
}
/// Read background healing information from storage
pub async fn read_background_heal_info(storeapi: Arc<ECStore>) -> BackgroundHealInfo {
// Skip for ErasureSD setup
if is_erasure_sd().await {
return BackgroundHealInfo::default();
}
// Get last healing information
match read_config(storeapi, &BACKGROUND_HEAL_INFO_PATH).await {
Ok(buf) => match serde_json::from_slice::<BackgroundHealInfo>(&buf) {
Ok(info) => info,
Err(e) => {
error!("Failed to unmarshal background heal info from {}: {}", &*BACKGROUND_HEAL_INFO_PATH, e);
BackgroundHealInfo::default()
}
},
Err(e) => {
// Only log if it's not a ConfigNotFound error
if e != EcstoreError::ConfigNotFound {
warn!("Failed to read background heal info from {}: {}", &*BACKGROUND_HEAL_INFO_PATH, e);
}
BackgroundHealInfo::default()
}
}
}
/// Save background healing information to storage
pub async fn save_background_heal_info(storeapi: Arc<ECStore>, info: BackgroundHealInfo) {
// Skip for ErasureSD setup
if is_erasure_sd().await {
return;
}
// Serialize to JSON
let data = match serde_json::to_vec(&info) {
Ok(data) => data,
Err(e) => {
error!("Failed to marshal background heal info: {}", e);
return;
}
};
// Save configuration
if let Err(e) = save_config(storeapi, &BACKGROUND_HEAL_INFO_PATH, data).await {
warn!("Failed to save background heal info to {}: {}", &*BACKGROUND_HEAL_INFO_PATH, e);
}
}
pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) -> Result<(), ScannerError> {
// TODO: leader lock
let _guard = match storeapi.new_ns_lock(RUSTFS_META_BUCKET, "leader.lock").await {
Ok(guard) => guard,
Err(e) => {
error!("run_data_scanner: other node is running, failed to acquire leader lock: {e}");
return Ok(());
}
};
let mut cycle_info = CurrentCycle::default();
let buf = read_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
.await
.unwrap_or_default();
if buf.len() == 8 {
cycle_info.next = u64::from_le_bytes(buf.try_into().unwrap_or_default());
} else if buf.len() > 8 {
cycle_info.next = u64::from_le_bytes(buf[0..8].try_into().unwrap_or_default());
if let Err(e) = cycle_info.unmarshal(&buf[8..]) {
warn!("Failed to unmarshal cycle info: {e}");
}
}
let mut ticker = tokio::time::interval(data_scanner_start_delay());
loop {
tokio::select! {
_ = ctx.cancelled() => {
break;
}
_ = ticker.tick() => {
cycle_info.current = cycle_info.next;
cycle_info.started = Utc::now();
global_metrics().set_cycle(Some(cycle_info.clone())).await;
let background_heal_info = read_background_heal_info(storeapi.clone()).await;
let scan_mode = get_cycle_scan_mode(cycle_info.current, background_heal_info.bitrot_start_cycle, background_heal_info.bitrot_start_time);
if background_heal_info.current_scan_mode != scan_mode {
let mut new_heal_info = background_heal_info.clone();
new_heal_info.current_scan_mode = scan_mode;
if scan_mode == HealScanMode::Deep {
new_heal_info.bitrot_start_cycle = cycle_info.current;
new_heal_info.bitrot_start_time = Some(Utc::now());
}
save_background_heal_info(storeapi.clone(), new_heal_info).await;
}
let (sender, receiver) = tokio::sync::mpsc::channel::<DataUsageInfo>(1);
let storeapi_clone = storeapi.clone();
let ctx_clone = ctx.clone();
tokio::spawn(async move {
store_data_usage_in_backend(ctx_clone, storeapi_clone, receiver).await;
});
if let Err(e) = storeapi.clone().nsscanner(ctx.clone(), sender, cycle_info.current, scan_mode).await {
error!("Failed to scan namespace: {e}");
} else {
info!("Namespace scanned successfully");
cycle_info.next +=1;
cycle_info.current = 0;
cycle_info.cycle_completed.push(Utc::now());
if cycle_info.cycle_completed.len() >= data_usage_update_dir_cycles() as usize {
cycle_info.cycle_completed = cycle_info.cycle_completed.split_off(data_usage_update_dir_cycles() as usize);
}
global_metrics().set_cycle(Some(cycle_info.clone())).await;
let cycle_info_buf = cycle_info.marshal().unwrap_or_default();
let mut buf = Vec::with_capacity(cycle_info_buf.len() + 8);
buf.extend_from_slice(&cycle_info.next.to_le_bytes());
buf.extend_from_slice(&cycle_info_buf);
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH, buf).await {
error!("Failed to save data usage bloom name to {}: {}", &*DATA_USAGE_BLOOM_NAME_PATH, e);
} else {
info!("Data usage bloom name saved successfully");
}
}
ticker.reset();
}
}
}
global_metrics().set_cycle(None).await;
Ok(())
}
/// Store data usage info in backend. Will store all objects sent on the receiver until closed.
pub async fn store_data_usage_in_backend(
ctx: CancellationToken,
storeapi: Arc<ECStore>,
mut receiver: mpsc::Receiver<DataUsageInfo>,
) {
let mut attempts = 1u32;
while let Some(data_usage_info) = receiver.recv().await {
if ctx.is_cancelled() {
break;
}
debug!("store_data_usage_in_backend: received data usage info: {:?}", &data_usage_info);
// Serialize to JSON
let data = match serde_json::to_vec(&data_usage_info) {
Ok(data) => data,
Err(e) => {
error!("Failed to marshal data usage info: {}", e);
continue;
}
};
// Save a backup every 10th update
if attempts > 10 {
let backup_path = format!("{:?}.bkp", &DATA_USAGE_OBJ_NAME_PATH);
if let Err(e) = save_config(storeapi.clone(), &backup_path, data.clone()).await {
warn!("Failed to save data usage backup to {}: {}", backup_path, e);
}
attempts = 1;
}
// Save main configuration
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_OBJ_NAME_PATH, data).await {
error!("Failed to save data usage info to {:?}: {e}", &DATA_USAGE_OBJ_NAME_PATH);
}
attempts += 1;
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,631 @@
use crate::scanner_folder::{ScannerItem, scan_data_folder};
use crate::{
DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, DataUsageCache, DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo,
DataUsageInfo, SizeSummary, TierStats,
};
use futures::future::join_all;
use rand::seq::SliceRandom as _;
use rustfs_common::heal_channel::HealScanMode;
use rustfs_ecstore::bucket::bucket_target_sys::BucketTargetSys;
use rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle;
use rustfs_ecstore::bucket::metadata_sys::{get_lifecycle_config, get_object_lock_config, get_replication_config};
use rustfs_ecstore::bucket::replication::{ReplicationConfig, ReplicationConfigurationExt};
use rustfs_ecstore::bucket::versioning::VersioningApi as _;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::config::storageclass;
use rustfs_ecstore::disk::STORAGE_FORMAT_FILE;
use rustfs_ecstore::disk::{Disk, DiskAPI};
use rustfs_ecstore::error::{Error, StorageError};
use rustfs_ecstore::global::GLOBAL_TierConfigMgr;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::SetDisks;
use rustfs_ecstore::store_api::{BucketInfo, BucketOptions, ObjectInfo};
use rustfs_ecstore::{StorageAPI, error::Result, store::ECStore};
use rustfs_filemeta::FileMeta;
use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf};
use s3s::dto::{BucketLifecycleConfiguration, ReplicationConfiguration};
use std::collections::HashMap;
use std::time::SystemTime;
use std::{fmt::Debug, sync::Arc};
use time::OffsetDateTime;
use tokio::sync::{Mutex, mpsc};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
#[async_trait::async_trait]
pub trait ScannerIO: Send + Sync + Debug + 'static {
async fn nsscanner(
&self,
ctx: CancellationToken,
updates: mpsc::Sender<DataUsageInfo>,
want_cycle: u64,
scan_mode: HealScanMode,
) -> Result<()>;
}
#[async_trait::async_trait]
pub trait ScannerIOCache: Send + Sync + Debug + 'static {
async fn nsscanner_cache(
self: Arc<Self>,
ctx: CancellationToken,
buckets: Vec<BucketInfo>,
updates: mpsc::Sender<DataUsageCache>,
want_cycle: u64,
scan_mode: HealScanMode,
) -> Result<()>;
}
#[async_trait::async_trait]
pub trait ScannerIODisk: Send + Sync + Debug + 'static {
async fn nsscanner_disk(
&self,
ctx: CancellationToken,
cache: DataUsageCache,
updates: Option<mpsc::Sender<DataUsageEntry>>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache>;
async fn get_size(&self, item: ScannerItem) -> Result<SizeSummary>;
}
#[async_trait::async_trait]
impl ScannerIO for ECStore {
async fn nsscanner(
&self,
ctx: CancellationToken,
updates: mpsc::Sender<DataUsageInfo>,
want_cycle: u64,
scan_mode: HealScanMode,
) -> Result<()> {
let child_token = ctx.child_token();
let all_buckets = self.list_bucket(&BucketOptions::default()).await?;
if all_buckets.is_empty() {
if let Err(e) = updates.send(DataUsageInfo::default()).await {
error!("Failed to send data usage info: {}", e);
}
return Ok(());
}
let mut total_results = 0;
for pool in self.pools.iter() {
total_results += pool.disk_set.len();
}
let results = vec![DataUsageCache::default(); total_results];
let results_mutex: Arc<Mutex<Vec<DataUsageCache>>> = Arc::new(Mutex::new(results));
let first_err_mutex: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
let mut results_index: i32 = -1_i32;
let mut wait_futs = Vec::new();
for pool in self.pools.iter() {
for set in pool.disk_set.iter() {
results_index += 1;
let results_index_clone = results_index as usize;
// Clone the Arc to move it into the spawned task
let set_clone: Arc<SetDisks> = Arc::clone(set);
let child_token_clone = child_token.clone();
let want_cycle_clone = want_cycle;
let scan_mode_clone = scan_mode;
let results_mutex_clone = results_mutex.clone();
let first_err_mutex_clone = first_err_mutex.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel::<DataUsageCache>(1);
// Spawn task to receive and store results
let receiver_fut = tokio::spawn(async move {
while let Some(result) = rx.recv().await {
let mut results = results_mutex_clone.lock().await;
results[results_index_clone] = result;
}
});
wait_futs.push(receiver_fut);
let all_buckets_clone = all_buckets.clone();
// Spawn task to run the scanner
let scanner_fut = tokio::spawn(async move {
if let Err(e) = set_clone
.nsscanner_cache(child_token_clone.clone(), all_buckets_clone, tx, want_cycle_clone, scan_mode_clone)
.await
{
error!("Failed to scan set: {e}");
let _ = first_err_mutex_clone.lock().await.insert(e);
child_token_clone.cancel();
}
});
wait_futs.push(scanner_fut);
}
}
let (update_tx, mut update_rx) = tokio::sync::oneshot::channel::<()>();
let all_buckets_clone = all_buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>();
tokio::spawn(async move {
let mut last_update = SystemTime::now();
let mut ticker = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = child_token.cancelled() => {
break;
}
res = &mut update_rx => {
if res.is_err() {
break;
}
let results = results_mutex.lock().await;
let mut all_merged = DataUsageCache::default();
for result in results.iter() {
if result.info.last_update.is_none() {
return;
}
all_merged.merge(result);
}
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update {
if let Err(e) = updates
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
.await {
error!("Failed to send data usage info: {}", e);
}
}
break;
}
_ = ticker.tick() => {
let results = results_mutex.lock().await;
let mut all_merged = DataUsageCache::default();
for result in results.iter() {
if result.info.last_update.is_none() {
return;
}
all_merged.merge(result);
}
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update {
if let Err(e) = updates
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
.await {
error!("Failed to send data usage info: {}", e);
}
last_update = all_merged.info.last_update.unwrap();
}
}
}
}
});
let _ = join_all(wait_futs).await;
let _ = update_tx.send(());
Ok(())
}
}
#[async_trait::async_trait]
impl ScannerIOCache for SetDisks {
async fn nsscanner_cache(
self: Arc<Self>,
ctx: CancellationToken,
buckets: Vec<BucketInfo>,
updates: mpsc::Sender<DataUsageCache>,
want_cycle: u64,
scan_mode: HealScanMode,
) -> Result<()> {
if buckets.is_empty() {
return Ok(());
}
let (disks, healing) = self.get_online_disks_with_healing(false).await;
if disks.is_empty() {
info!("No online disks available for set");
return Ok(());
}
let mut old_cache = DataUsageCache::default();
old_cache.load(self.clone(), DATA_USAGE_CACHE_NAME).await?;
let mut cache = DataUsageCache {
info: DataUsageCacheInfo {
name: DATA_USAGE_ROOT.to_string(),
next_cycle: old_cache.info.next_cycle,
..Default::default()
},
cache: HashMap::new(),
};
let (bucket_tx, bucket_rx) = mpsc::channel::<BucketInfo>(buckets.len());
let mut permutes = buckets.clone();
permutes.shuffle(&mut rand::rng());
for bucket in permutes.iter() {
if old_cache.find(&bucket.name).is_none() {
if let Err(e) = bucket_tx.send(bucket.clone()).await {
error!("Failed to send bucket info: {}", e);
}
}
}
for bucket in permutes.iter() {
if let Some(c) = old_cache.find(&bucket.name) {
cache.replace(&bucket.name, DATA_USAGE_ROOT, c.clone());
if let Err(e) = bucket_tx.send(bucket.clone()).await {
error!("Failed to send bucket info: {}", e);
}
}
}
drop(bucket_tx);
let cache_mutex: Arc<Mutex<DataUsageCache>> = Arc::new(Mutex::new(cache));
let (bucket_result_tx, mut bucket_result_rx) = mpsc::channel::<DataUsageEntryInfo>(disks.len());
let cache_mutex_clone = cache_mutex.clone();
let store_clone = self.clone();
let ctx_clone = ctx.clone();
let send_update_fut = tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(3 + rand::random::<u64>() % 10));
let mut last_update = None;
loop {
tokio::select! {
_ = ctx_clone.cancelled() => {
break;
}
_ = ticker.tick() => {
let cache = cache_mutex_clone.lock().await;
if cache.info.last_update == last_update {
continue;
}
if let Err(e) = cache.save(store_clone.clone(), DATA_USAGE_CACHE_NAME).await {
error!("Failed to save data usage cache: {}", e);
}
if let Err(e) = updates.send(cache.clone()).await {
error!("Failed to send data usage cache: {}", e);
}
last_update = cache.info.last_update;
}
res = bucket_result_rx.recv() => {
if let Some(result) = res {
let mut cache = cache_mutex_clone.lock().await;
cache.replace(&result.name, &result.parent, result.entry);
cache.info.last_update = Some(SystemTime::now());
} else {
let mut cache = cache_mutex_clone.lock().await;
cache.info.next_cycle =want_cycle;
cache.info.last_update = Some(SystemTime::now());
if let Err(e) = cache.save(store_clone.clone(), DATA_USAGE_CACHE_NAME).await {
error!("Failed to save data usage cache: {}", e);
}
if let Err(e) = updates.send(cache.clone()).await {
error!("Failed to send data usage cache: {}", e);
}
return;
}
}
}
}
});
let mut futs = Vec::new();
let bucket_rx_mutex: Arc<Mutex<mpsc::Receiver<BucketInfo>>> = Arc::new(Mutex::new(bucket_rx));
let bucket_result_tx_clone: Arc<Mutex<mpsc::Sender<DataUsageEntryInfo>>> = Arc::new(Mutex::new(bucket_result_tx));
for disk in disks.into_iter() {
let bucket_rx_mutex_clone = bucket_rx_mutex.clone();
let ctx_clone = ctx.clone();
let store_clone_clone = self.clone();
let bucket_result_tx_clone_clone = bucket_result_tx_clone.clone();
futs.push(tokio::spawn(async move {
while let Some(bucket) = bucket_rx_mutex_clone.lock().await.recv().await {
if ctx_clone.is_cancelled() {
break;
}
debug!("nsscanner_disk: got bucket: {}", bucket.name);
let cache_name = path_join_buf(&[&bucket.name, DATA_USAGE_CACHE_NAME]);
let mut cache = DataUsageCache::default();
if let Err(e) = cache.load(store_clone_clone.clone(), &cache_name).await {
error!("Failed to load data usage cache: {}", e);
}
if cache.info.name.is_empty() {
cache.info.name = bucket.name.clone();
}
cache.info.skip_healing = healing;
cache.info.next_cycle = want_cycle;
if cache.info.name != bucket.name {
cache.info = DataUsageCacheInfo {
name: bucket.name.clone(),
next_cycle: want_cycle,
..Default::default()
};
}
warn!("nsscanner_disk: cache.info.name: {:?}", cache.info.name);
let (updates_tx, mut updates_rx) = mpsc::channel::<DataUsageEntry>(1);
let ctx_clone_clone = ctx_clone.clone();
let bucket_name_clone = bucket.name.clone();
let bucket_result_tx_clone_clone_clone = bucket_result_tx_clone_clone.clone();
let update_fut = tokio::spawn(async move {
while let Some(result) = updates_rx.recv().await {
if ctx_clone_clone.is_cancelled() {
break;
}
if let Err(e) = bucket_result_tx_clone_clone_clone
.lock()
.await
.send(DataUsageEntryInfo {
name: bucket_name_clone.clone(),
parent: DATA_USAGE_ROOT.to_string(),
entry: result,
})
.await
{
error!("Failed to send data usage entry info: {}", e);
}
}
});
let before = cache.info.last_update;
cache = match disk
.clone()
.nsscanner_disk(ctx_clone.clone(), cache.clone(), Some(updates_tx), scan_mode)
.await
{
Ok(cache) => cache,
Err(e) => {
error!("Failed to scan disk: {}", e);
if let (Some(last_update), Some(before_update)) = (cache.info.last_update, before) {
if last_update > before_update {
if let Err(e) = cache.save(store_clone_clone.clone(), cache_name.as_str()).await {
error!("Failed to save data usage cache: {}", e);
}
}
}
if let Err(e) = update_fut.await {
error!("Failed to update data usage cache: {}", e);
}
continue;
}
};
debug!("nsscanner_disk: got cache: {}", cache.info.name);
if let Err(e) = update_fut.await {
error!("nsscanner_disk: Failed to update data usage cache: {}", e);
}
let root = if let Some(r) = cache.root() {
cache.flatten(&r)
} else {
DataUsageEntry::default()
};
if ctx_clone.is_cancelled() {
break;
}
debug!("nsscanner_disk: sending data usage entry info: {}", cache.info.name);
if let Err(e) = bucket_result_tx_clone_clone
.lock()
.await
.send(DataUsageEntryInfo {
name: cache.info.name.clone(),
parent: DATA_USAGE_ROOT.to_string(),
entry: root,
})
.await
{
error!("nsscanner_disk: Failed to send data usage entry info: {}", e);
}
if let Err(e) = cache.save(store_clone_clone.clone(), &cache_name).await {
error!("nsscanner_disk: Failed to save data usage cache: {}", e);
}
}
}));
}
let _ = join_all(futs).await;
warn!("nsscanner_cache: joining all futures");
drop(bucket_result_tx_clone);
warn!("nsscanner_cache: dropping bucket result tx");
send_update_fut.await?;
warn!("nsscanner_cache: done");
Ok(())
}
}
#[async_trait::async_trait]
impl ScannerIODisk for Disk {
async fn get_size(&self, mut item: ScannerItem) -> Result<SizeSummary> {
if !item.path.ends_with(&format!("{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}")) {
return Err(StorageError::other("skip file".to_string()));
}
debug!("get_size: reading metadata for {}/{}", &item.bucket, &item.object_path());
let data = match self.read_metadata(&item.bucket, &item.object_path()).await {
Ok(data) => data,
Err(e) => return Err(StorageError::other(format!("Failed to read metadata: {e}"))),
};
item.transform_meta_dir();
let meta = FileMeta::load(&data)?;
let fivs = match meta.get_file_info_versions(item.bucket.as_str(), item.object_path().as_str(), false) {
Ok(versions) => versions,
Err(e) => {
error!("Failed to get file info versions: {}", e);
return Err(StorageError::other("skip file".to_string()));
}
};
let versioned = BucketVersioningSys::get(&item.bucket)
.await
.map(|v| v.versioned(&item.object_path()))
.unwrap_or(false);
let object_infos = fivs
.versions
.iter()
.map(|v| ObjectInfo::from_file_info(v, item.bucket.as_str(), item.object_path().as_str(), versioned))
.collect::<Vec<ObjectInfo>>();
let mut size_summary = SizeSummary::default();
let tiers = {
let tier_config_mgr = GLOBAL_TierConfigMgr.read().await;
tier_config_mgr.list_tiers()
};
for tier in tiers.iter() {
size_summary.tier_stats.insert(tier.name.clone(), TierStats::default());
}
if !size_summary.tier_stats.is_empty() {
size_summary
.tier_stats
.insert(storageclass::STANDARD.to_string(), TierStats::default());
size_summary
.tier_stats
.insert(storageclass::RRS.to_string(), TierStats::default());
}
let lock_config = match get_object_lock_config(&item.bucket).await {
Ok((cfg, _)) => Some(Arc::new(cfg)),
Err(_) => None,
};
let Some(ecstore) = new_object_layer_fn() else {
error!("ECStore not available");
return Err(StorageError::other("ECStore not available".to_string()));
};
item.apply_actions(ecstore, object_infos, lock_config, &mut size_summary)
.await;
// TODO: enqueueFreeVersion
Ok(size_summary)
}
async fn nsscanner_disk(
&self,
ctx: CancellationToken,
cache: DataUsageCache,
updates: Option<mpsc::Sender<DataUsageEntry>>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache> {
// match self {
// Disk::Local(local_disk) => local_disk.nsscanner_disk(ctx, cache, updates, scan_mode).await,
// Disk::Remote(remote_disk) => remote_disk.nsscanner_disk(ctx, cache, updates, scan_mode).await,
// }
let _guard = self.start_scan();
let mut cache = cache;
let (lifecycle_config, _) = get_lifecycle_config(&cache.info.name)
.await
.unwrap_or((BucketLifecycleConfiguration::default(), OffsetDateTime::now_utc()));
if lifecycle_config.has_active_rules("") {
cache.info.lifecycle = Some(Arc::new(lifecycle_config));
}
let (replication_config, _) = get_replication_config(&cache.info.name).await.unwrap_or((
ReplicationConfiguration {
role: "".to_string(),
rules: vec![],
},
OffsetDateTime::now_utc(),
));
if replication_config.has_active_rules("", true) {
if let Ok(targets) = BucketTargetSys::get().list_bucket_targets(&cache.info.name).await {
cache.info.replication = Some(Arc::new(ReplicationConfig {
config: Some(replication_config),
remotes: Some(targets),
}));
}
}
// TODO: object lock
let Some(ecstore) = new_object_layer_fn() else {
error!("ECStore not available");
return Err(StorageError::other("ECStore not available".to_string()));
};
let disk_location = self.get_disk_location();
let (Some(pool_idx), Some(set_idx)) = (disk_location.pool_idx, disk_location.set_idx) else {
error!("Disk location not available");
return Err(StorageError::other("Disk location not available".to_string()));
};
let disks_result = ecstore.get_disks(pool_idx, set_idx).await?;
let Some(disk_idx) = disk_location.disk_idx else {
error!("Disk index not available");
return Err(StorageError::other("Disk index not available".to_string()));
};
let local_disk = if let Some(Some(local_disk)) = disks_result.get(disk_idx) {
local_disk.clone()
} else {
error!("Local disk not available");
return Err(StorageError::other("Local disk not available".to_string()));
};
let disks = disks_result.into_iter().flatten().collect::<Vec<Arc<Disk>>>();
// Create we_sleep function (always return false for now, can be enhanced later)
let we_sleep: Box<dyn Fn() -> bool + Send + Sync> = Box::new(|| false);
let result = scan_data_folder(ctx, disks, local_disk, cache, updates, scan_mode, we_sleep).await;
match result {
Ok(mut data_usage_info) => {
data_usage_info.info.last_update = Some(SystemTime::now());
Ok(data_usage_info)
}
Err(e) => Err(StorageError::other(format!("Failed to scan data folder: {e}"))),
}
}
}

View File

@@ -0,0 +1,786 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use heed::byteorder::BigEndian;
use heed::types::*;
use heed::{BoxedError, BytesDecode, BytesEncode, Database, DatabaseFlags, Env, EnvOpenOptions};
use rustfs_ecstore::{
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{MakeBucketOptions, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader, StorageAPI},
};
use rustfs_scanner::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome};
use serial_test::serial;
use std::{
borrow::Cow,
path::PathBuf,
sync::{Arc, Once, OnceLock},
};
//use heed_traits::Comparator;
use time::OffsetDateTime;
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use uuid::Uuid;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
static _LIFECYCLE_EXPIRY_CURRENT_DAYS: i32 = 1;
static _LIFECYCLE_EXPIRY_NONCURRENT_DAYS: i32 = 1;
static _LIFECYCLE_TRANSITION_CURRENT_DAYS: i32 = 1;
static _LIFECYCLE_TRANSITION_NONCURRENT_DAYS: i32 = 1;
static GLOBAL_LMDB_ENV: OnceLock<Env> = OnceLock::new();
static GLOBAL_LMDB_DB: OnceLock<Database<I64<BigEndian>, LifecycleContentCodec>> = OnceLock::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
init_tracing();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_ahm_lifecyclecache_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
// create 4 disk dirs
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.unwrap();
}
// create EndpointServerPools
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
// set correct index
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
let port = 9002; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.unwrap();
// init bucket metadata system
let buckets_list = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.unwrap();
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
//lmdb env
// User home directory
/*if let Ok(home_dir) = env::var("HOME").or_else(|_| env::var("USERPROFILE")) {
let mut path = PathBuf::from(home_dir);
path.push(format!(".{DEFAULT_LOG_FILENAME}"));
path.push(DEFAULT_LOG_DIR);
if ensure_directory_writable(&path) {
//return path;
}
}*/
let test_lmdb_lifecycle_dir = "/tmp/lmdb_lifecycle".to_string();
let temp_dir = std::path::PathBuf::from(&test_lmdb_lifecycle_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
let lmdb_env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&test_lmdb_lifecycle_dir).unwrap() };
let bucket_name = format!("test-lc-cache-{}", "00000");
let mut wtxn = lmdb_env.write_txn().unwrap();
let db = match lmdb_env
.database_options()
.name(&format!("bucket_{bucket_name}"))
.types::<I64<BigEndian>, LifecycleContentCodec>()
.flags(DatabaseFlags::DUP_SORT)
//.dup_sort_comparator::<>()
.create(&mut wtxn)
{
Ok(db) => db,
Err(err) => {
panic!("lmdb error: {err}");
}
};
let _ = wtxn.commit();
let _ = GLOBAL_LMDB_ENV.set(lmdb_env);
let _ = GLOBAL_LMDB_DB.set(db);
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}
/// Test helper: Create a test bucket
#[allow(dead_code)]
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Create a test lock bucket
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(
bucket_name,
&MakeBucketOptions {
lock_enabled: true,
versioning_enabled: true,
..Default::default()
},
)
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
println!("object_info1: {object_info:?}");
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
Ok(info) => !info.delete_marker,
Err(_) => false,
}
}
fn ns_to_offset_datetime(ns: i128) -> Option<OffsetDateTime> {
OffsetDateTime::from_unix_timestamp_nanos(ns).ok()
}
fn convert_record_to_object_info(record: &LocalObjectRecord) -> ObjectInfo {
let usage = &record.usage;
ObjectInfo {
bucket: usage.bucket.clone(),
name: usage.object.clone(),
size: usage.total_size as i64,
delete_marker: !usage.has_live_object && usage.delete_markers_count > 0,
mod_time: usage.last_modified_ns.and_then(ns_to_offset_datetime),
..Default::default()
}
}
#[allow(dead_code)]
fn to_object_info(
bucket: &str,
object: &str,
total_size: i64,
delete_marker: bool,
mod_time: OffsetDateTime,
version_id: &str,
) -> ObjectInfo {
ObjectInfo {
bucket: bucket.to_string(),
name: object.to_string(),
size: total_size,
delete_marker,
mod_time: Some(mod_time),
version_id: Some(Uuid::parse_str(version_id).unwrap()),
..Default::default()
}
}
#[derive(Debug, PartialEq, Eq)]
enum LifecycleType {
ExpiryCurrent,
ExpiryNoncurrent,
TransitionCurrent,
TransitionNoncurrent,
}
#[derive(Debug, PartialEq, Eq)]
pub struct LifecycleContent {
ver_no: u8,
ver_id: String,
mod_time: OffsetDateTime,
type_: LifecycleType,
object_name: String,
}
pub struct LifecycleContentCodec;
impl BytesEncode<'_> for LifecycleContentCodec {
type EItem = LifecycleContent;
fn bytes_encode(lcc: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
let (ver_no_byte, ver_id_bytes, mod_timestamp_bytes, type_byte, object_name_bytes) = match lcc {
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::ExpiryCurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
0,
object_name.clone().into_bytes(),
),
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::ExpiryNoncurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
1,
object_name.clone().into_bytes(),
),
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::TransitionCurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
2,
object_name.clone().into_bytes(),
),
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::TransitionNoncurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
3,
object_name.clone().into_bytes(),
),
};
let mut output = Vec::<u8>::new();
output.push(*ver_no_byte);
output.extend_from_slice(&ver_id_bytes);
output.extend_from_slice(&mod_timestamp_bytes);
output.push(type_byte);
output.extend_from_slice(&object_name_bytes);
Ok(Cow::Owned(output))
}
}
impl<'a> BytesDecode<'a> for LifecycleContentCodec {
type DItem = LifecycleContent;
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
use std::mem::size_of;
let ver_no = match bytes.get(..size_of::<u8>()) {
Some(bytes) => bytes.try_into().map(u8::from_be_bytes).unwrap(),
None => return Err("invalid LifecycleContent: cannot extract ver_no".into()),
};
let ver_id = match bytes.get(size_of::<u8>()..(36 + 1)) {
Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() },
None => return Err("invalid LifecycleContent: cannot extract ver_id".into()),
};
let mod_timestamp = match bytes.get((36 + 1)..(size_of::<i64>() + 36 + 1)) {
Some(bytes) => bytes.try_into().map(i64::from_be_bytes).unwrap(),
None => return Err("invalid LifecycleContent: cannot extract mod_time timestamp".into()),
};
let type_ = match bytes.get(size_of::<i64>() + 36 + 1) {
Some(&0) => LifecycleType::ExpiryCurrent,
Some(&1) => LifecycleType::ExpiryNoncurrent,
Some(&2) => LifecycleType::TransitionCurrent,
Some(&3) => LifecycleType::TransitionNoncurrent,
Some(_) => return Err("invalid LifecycleContent: invalid LifecycleType".into()),
None => return Err("invalid LifecycleContent: cannot extract LifecycleType".into()),
};
let object_name = match bytes.get((size_of::<i64>() + 36 + 1 + 1)..) {
Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() },
None => return Err("invalid LifecycleContent: cannot extract object_name".into()),
};
Ok(LifecycleContent {
ver_no,
ver_id,
mod_time: OffsetDateTime::from_unix_timestamp(mod_timestamp).unwrap(),
type_,
object_name,
})
}
}
mod serial_tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
//#[ignore]
async fn test_lifecycle_chche_build() {
let (_disk_paths, ecstore) = setup_test_env().await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("test-lc-cache-{}", &suffix[..8]);
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
println!("✅ Object exists before lifecycle processing");
let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
Ok(outcome) => outcome,
Err(err) => {
warn!("Local usage scan failed: {}", err);
LocalScanOutcome::default()
}
};
let bucket_objects_map = &scan_outcome.bucket_objects;
let records = match bucket_objects_map.get(&bucket_name) {
Some(records) => records,
None => {
debug!("No local snapshot entries found for bucket {}; skipping lifecycle/integrity", bucket_name);
&vec![]
}
};
if let Some(lmdb_env) = GLOBAL_LMDB_ENV.get() {
if let Some(lmdb) = GLOBAL_LMDB_DB.get() {
let mut wtxn = lmdb_env.write_txn().unwrap();
/*if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
if let Ok(object_info) = ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config,
None,
None,
&object_info,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&object_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
}
}*/
for record in records {
if !record.usage.has_live_object {
continue;
}
let object_info = convert_record_to_object_info(record);
println!("object_info2: {object_info:?}");
let mod_time = object_info.mod_time.unwrap_or(OffsetDateTime::now_utc());
let expiry_time = rustfs_ecstore::bucket::lifecycle::lifecycle::expected_expiry_time(mod_time, 1);
let version_id = if let Some(version_id) = object_info.version_id {
version_id.to_string()
} else {
"zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz".to_string()
};
lmdb.put(
&mut wtxn,
&expiry_time.unix_timestamp(),
&LifecycleContent {
ver_no: 0,
ver_id: version_id,
mod_time,
type_: LifecycleType::TransitionNoncurrent,
object_name: object_info.name,
},
)
.unwrap();
}
wtxn.commit().unwrap();
let mut wtxn = lmdb_env.write_txn().unwrap();
let iter = lmdb.iter_mut(&mut wtxn).unwrap();
//let _ = unsafe { iter.del_current().unwrap() };
for row in iter {
if let Ok(ref elm) = row {
let LifecycleContent {
ver_no,
ver_id,
mod_time,
type_,
object_name,
} = &elm.1;
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
//eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await;
eval_inner(
&lifecycle::ObjectOpts {
name: oi.name.clone(),
user_tags: oi.user_tags.clone(),
version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(),
mod_time: oi.mod_time,
size: oi.size as usize,
is_latest: oi.is_latest,
num_versions: oi.num_versions,
delete_marker: oi.delete_marker,
successor_mod_time: oi.successor_mod_time,
restore_ongoing: oi.restore_ongoing,
restore_expires: oi.restore_expires,
transition_status: oi.transitioned_object.status.clone(),
..Default::default()
},
OffsetDateTime::now_utc(),
)
.await;
}
println!("row:{row:?}");
}
//drop(iter);
wtxn.commit().unwrap();
}
}
println!("Lifecycle cache test completed");
}
}
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
let mut events = Vec::<Event>::new();
info!(
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
);
if obj.mod_time.expect("err").unix_timestamp() == 0 {
info!("eval_inner: mod_time is 0, returning default event");
return Event::default();
}
if let Some(restore_expires) = obj.restore_expires {
if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() {
let mut action = IlmAction::DeleteRestoredAction;
if !obj.is_latest {
action = IlmAction::DeleteRestoredVersionAction;
}
events.push(Event {
action,
due: Some(now),
rule_id: "".into(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
if let Some(ref lc_rules) = self.filter_rules(obj).await {
for rule in lc_rules.iter() {
if obj.expired_object_deletemarker() {
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
}
}
}
if obj.is_latest {
if let Some(ref expiration) = rule.expiration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days {
if noncurrent_days != 0 {
if let Some(successor_mod_time) = obj.successor_mod_time {
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
.clone()
.unwrap()
.as_str()
.to_string(),
..Default::default()
});
}
}
}
}
}
}
info!(
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
obj.is_latest,
obj.delete_marker,
obj.version_id,
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
);
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
info!("eval_inner: entering expiration check");
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
info!("eval_inner: expiration by date - date0={:?}", date0);
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
} else if let Some(days) = expiration.days {
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
info!(
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
days,
obj.mod_time.expect("err!"),
expected_expiry,
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()
);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
};
/*if rule.expiration.expect("err!").delete_all.val {
event.action = IlmAction::DeleteAllVersionsAction
}*/
events.push(event);
}
} else {
info!("eval_inner: expiration.days is None");
}
} else {
info!("eval_inner: rule.expiration is None");
}
if obj.transition_status != TRANSITION_COMPLETE {
if let Some(ref transitions) = rule.transitions {
let due = transitions[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
});
}
}
}
}
}
}
}
if events.len() > 0 {
events.sort_by(|a, b| {
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
{
match a.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Less;
}
_ => (),
}
match b.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Greater;
}
_ => (),
}
return Ordering::Less;
}
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
return Ordering::Less;
}
return Ordering::Greater;
});
return events[0].clone();
}
Event::default()
}

View File

@@ -0,0 +1,444 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
bucket::metadata_sys,
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
global::GLOBAL_TierConfigMgr,
store::ECStore,
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use rustfs_ahm::{heal::storage::ECStoreHealStorage, init_heal_manager};
use rustfs_scanner::scanner::init_data_scanner;
use serial_test::serial;
use std::{
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::info;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
init_tracing();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_scanner_lifecycle_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
// create 4 disk dirs
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.unwrap();
}
// create EndpointServerPools
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
// set correct index
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
let port = 9002; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.unwrap();
// init bucket metadata system
let buckets_list = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.unwrap();
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
// Initialize background expiry workers
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await;
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}
/// Test helper: Create a test bucket
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Create a test lock bucket
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(
bucket_name,
&MakeBucketOptions {
lock_enabled: true,
versioning_enabled: true,
..Default::default()
},
)
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
<ExpiredObjectDeleteMarker>true</ExpiredObjectDeleteMarker>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
#[allow(dead_code)]
async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Transition>
<Days>0</Days>
<StorageClass>COLDTIER44</StorageClass>
</Transition>
</Rule>
<Rule>
<ID>test-rule2</ID>
<Status>Disabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<NoncurrentVersionTransition>
<NoncurrentDays>0</NoncurrentDays>
<StorageClass>COLDTIER44</StorageClass>
</NoncurrentVersionTransition>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Create a test tier
#[allow(dead_code)]
async fn create_test_tier(server: u32) {
let args = TierConfig {
version: "v1".to_string(),
tier_type: TierType::MinIO,
name: "COLDTIER44".to_string(),
s3: None,
aliyun: None,
tencent: None,
huaweicloud: None,
azure: None,
gcs: None,
r2: None,
rustfs: None,
minio: if server == 1 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "hello".to_string(),
endpoint: "http://39.105.198.204:9000".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else if server == 2 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://m1ddns.pvtool.com:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://127.0.0.1:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
},
};
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
if let Err(err) = tier_config_mgr.add(args, false).await {
println!("tier_config_mgr add failed, e: {err:?}");
panic!("tier add failed. {err}");
}
if let Err(e) = tier_config_mgr.save().await {
println!("tier_config_mgr save failed, e: {e:?}");
panic!("tier save failed");
}
println!("Created test tier: COLDTIER44");
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
Ok(info) => !info.delete_marker,
Err(_) => false,
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {oi:?}");
oi.delete_marker
} else {
println!("object_is_delete_marker is error");
panic!("object_is_delete_marker is error");
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {oi:?}");
!oi.transitioned_object.status.is_empty()
} else {
println!("object_is_transitioned is error");
panic!("object_is_transitioned is error");
}
}
async fn wait_for_object_absence(ecstore: &Arc<ECStore>, bucket: &str, object: &str, timeout: Duration) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if !object_exists(ecstore, bucket, object).await {
return true;
}
if tokio::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
mod serial_tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial]
//#[ignore]
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
create_test_tier(2).await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("test-lc-transition-{}", &suffix[..8]);
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(
&ecstore,
bucket_name.as_str(),
object_name,
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
)
.await;
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
println!("✅ Object exists before lifecycle processing");
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
set_bucket_lifecycle_transition(bucket_name.as_str())
.await
.expect("Failed to set lifecycle configuration");
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
// Verify lifecycle configuration was set
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await {
Ok(bucket_meta) => {
assert!(bucket_meta.lifecycle_config.is_some());
println!("✅ Bucket metadata retrieved successfully");
}
Err(e) => {
println!("❌ Error retrieving bucket metadata: {e:?}");
}
}
let ctx = CancellationToken::new();
// Start scanner
let heal_storage = Arc::new(ECStoreHealStorage::new(ecstore.clone()));
init_heal_manager(heal_storage, None).await;
init_data_scanner(ctx.clone(), ecstore.clone()).await;
println!("✅ Scanner started");
// Wait for scanner to process lifecycle rules
tokio::time::sleep(Duration::from_secs(1200)).await;
// Check if object has been expired (deleted)
let check_result = object_is_transitioned(&ecstore, &bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {check_result}");
if check_result {
println!("✅ Object was transitioned by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
Ok(obj_info) => {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
println!("Object info: transitioned_object={:?}", obj_info.transitioned_object);
}
Err(e) => {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("❌ Object was not transitioned by lifecycle processing");
}
assert!(check_result);
println!("✅ Object successfully transitioned");
// Stop scanner
ctx.cancel();
println!("✅ Scanner stopped");
println!("Lifecycle transition basic test completed");
}
}

View File

@@ -60,6 +60,7 @@ rustfs-s3select-query = { workspace = true }
rustfs-targets = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-zip = { workspace = true }
rustfs-scanner = { workspace = true }
# Async Runtime and Networking
async-trait = { workspace = true }

View File

@@ -687,7 +687,6 @@ impl Stream for MetricsStream {
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
info!("MetricsStream poll_next");
let this = Pin::into_inner(self);
this.inner.poll_next_unpin(cx)
}
@@ -749,7 +748,6 @@ impl Operation for MetricsHandler {
let body = Body::from(in_stream);
spawn(async move {
while n > 0 {
info!("loop, n: {n}");
let mut m = RealtimeMetrics::default();
let m_local = collect_local_metrics(types, &opts).await;
m.merge(m_local);
@@ -766,7 +764,6 @@ impl Operation for MetricsHandler {
// todo write resp
match serde_json::to_vec(&m) {
Ok(re) => {
info!("got metrics, send it to client, m: {m:?}");
let _ = tx.send(Ok(Bytes::from(re))).await;
}
Err(e) => {

View File

@@ -34,10 +34,7 @@ use crate::server::{
};
use clap::Parser;
use license::init_license;
use rustfs_ahm::{
Scanner, create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager,
scanner::data_scanner::ScannerConfig, shutdown_ahm_services,
};
use rustfs_ahm::{create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager, shutdown_ahm_services};
use rustfs_common::{GlobalReadiness, SystemStage, set_global_addr};
use rustfs_ecstore::{
StorageAPI,
@@ -56,6 +53,7 @@ use rustfs_ecstore::{
};
use rustfs_iam::init_iam_sys;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_scanner::init_data_scanner;
use rustfs_utils::net::parse_and_resolve_address;
use std::io::{Error, Result};
use std::sync::Arc;
@@ -303,23 +301,29 @@ async fn run(opt: config::Opt) -> Result<()> {
// Initialize heal manager and scanner based on environment variables
if enable_heal || enable_scanner {
if enable_heal {
// Initialize heal manager with channel processor
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
let heal_manager = init_heal_manager(heal_storage, None).await?;
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
if enable_scanner {
info!(target: "rustfs::main::run","Starting scanner with heal manager...");
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
scanner.start().await?;
} else {
info!(target: "rustfs::main::run","Scanner disabled, but heal manager is initialized and available");
}
} else if enable_scanner {
info!("Starting scanner without heal manager...");
let scanner = Scanner::new(Some(ScannerConfig::default()), None);
scanner.start().await?;
}
init_heal_manager(heal_storage, None).await?;
init_data_scanner(ctx.clone(), store.clone()).await;
// if enable_heal {
// // Initialize heal manager with channel processor
// let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
// let heal_manager = init_heal_manager(heal_storage, None).await?;
// if enable_scanner {
// info!(target: "rustfs::main::run","Starting scanner with heal manager...");
// let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
// scanner.start().await?;
// } else {
// info!(target: "rustfs::main::run","Scanner disabled, but heal manager is initialized and available");
// }
// } else if enable_scanner {
// info!("Starting scanner without heal manager...");
// let scanner = Scanner::new(Some(ScannerConfig::default()), None);
// scanner.start().await?;
// }
} else {
info!(target: "rustfs::main::run","Both scanner and heal are disabled, skipping AHM service initialization");
}

View File

@@ -1091,22 +1091,22 @@ impl S3 for FS {
}*/
let Ok(opts) = post_restore_opts(&version_id.unwrap(), &bucket, &object).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrPostRestoreOpts".into()),
"restore object failed.",
));
};
let Ok(mut obj_info) = store.get_object_info(&bucket, &object, &opts).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
"restore object failed.",
));
};
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrInvalidTransitionedState".into()),
"restore object failed.",
));
}
@@ -1117,14 +1117,14 @@ impl S3 for FS {
//api_err = to_api_err(ErrMalformedXML);
//api_err.description = err.to_string();
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrValidRestoreObject".into()),
"restore object failed",
));
} else {
if obj_info.restore_ongoing && (rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT") {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"post restore object failed",
"restore object failed.",
));
}
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
@@ -1183,8 +1183,8 @@ impl S3 for FS {
.await
{
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrCopyObject".into()),
"restore object failed",
));
}
if already_restored {

View File

@@ -1072,6 +1072,29 @@ impl Node for NodeService {
}))
}
}
async fn read_metadata(&self, request: Request<ReadMetadataRequest>) -> Result<Response<ReadMetadataResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_metadata(&request.volume, &request.path).await {
Ok(data) => Ok(Response::new(ReadMetadataResponse {
success: true,
data,
error: None,
})),
Err(err) => Ok(Response::new(ReadMetadataResponse {
success: false,
data: Bytes::new(),
error: Some(err.into()),
})),
}
} else {
Ok(Response::new(ReadMetadataResponse {
success: false,
data: Bytes::new(),
error: Some(DiskError::other("can not find disk".to_string()).into()),
}))
}
}
async fn update_metadata(&self, request: Request<UpdateMetadataRequest>) -> Result<Response<UpdateMetadataResponse>, Status> {
let request = request.into_inner();

View File

@@ -0,0 +1 @@
ansible-playbook rustfs/tests/lifecycle_ansible_test.yml

View File

@@ -0,0 +1,15 @@
---
- hosts: webservers
remote_user: leafcolor
vars:
ansible_ssh_pass: "123456"
tasks:
- name: Configure S3 bucket lifecycle
community.aws.s3_lifecycle:
endpoint_url: "http://m1ddns.pvtool.com:9000"
access_key: "rustfsadmin"
secret_key: "rustfsadmin"
name: "mblock"
rule_id: "rule1"
state: present
expiration_days: "1"

View File

@@ -0,0 +1,120 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import uuid
from fastapi import UploadFile
from minio import Minio
from minio.commonconfig import Filter
from minio.error import S3Error
from minio.lifecycleconfig import Expiration, LifecycleConfig, Rule
import os
from collections import namedtuple
Settings = namedtuple("Settings", ['oss_endpoint',
'oss_access_key',
'oss_secret_key',
'oss_bucket_name',
'oss_lifecycle_days',
'oss_secure',
'oss_region'])
settings = Settings(
oss_endpoint = "m1ddns.pvtool.com:9000",
oss_access_key = "rustfsadmin",
oss_secret_key = "rustfsadmin",
oss_bucket_name = "mblock99",
oss_lifecycle_days = 1,
oss_secure = False,
oss_region = ""
)
class OSS:
def __init__(self):
self.bucket_name = settings.oss_bucket_name
self.lifecycle_days = settings.oss_lifecycle_days
self.client = Minio(
endpoint=settings.oss_endpoint,
access_key=settings.oss_access_key,
secret_key=settings.oss_secret_key,
secure=settings.oss_secure,
region=settings.oss_region,
)
def new_uuid(self):
return str(uuid.uuid4())
def create_bucket(self):
found = self.client.bucket_exists(self.bucket_name)
if not found:
try:
self.client.make_bucket(self.bucket_name)
self.set_lifecycle_expiration(days=self.lifecycle_days)
print(f"Bucket {self.bucket_name} created successfully")
except S3Error as exc:
if exc.code == "BucketAlreadyOwnedByYou":
print(f"Bucket {self.bucket_name} already owned by you; continuing")
else:
raise
else:
print(f"Bucket {self.bucket_name} already exists")
def set_lifecycle_expiration(self,days: int = 1, prefix: str = "") -> None:
"""
设置按天自动过期删除MinIO 按天、每天巡检一次;<1天不生效
"""
rule_filter = Filter(prefix=prefix or "")
rule = Rule(
rule_id=f"expire-{prefix or 'all'}-{days}d",
status="Enabled",
rule_filter=rule_filter,
expiration=Expiration(days=int(days)),
)
cfg = LifecycleConfig([rule])
self.client.set_bucket_lifecycle(self.bucket_name, cfg)
def upload_file(self, file: UploadFile):
"""
上传文件到OSS返回文件的UUID
"""
ext = os.path.splitext(file.filename)[1]
uuid = self.new_uuid()
filename = f'{uuid}{ext}'
file.file.seek(0)
self.client.put_object(
self.bucket_name,
filename,
file.file,
length=-1,
part_size=10*1024*1024,
)
return filename
def get_presigned_url(self, filename: str):
"""
获取文件的预签名URL用于下载文件
"""
return self.client.presigned_get_object(
self.bucket_name, filename, expires=timedelta(days=self.lifecycle_days)
)
def get_oss():
"""
获取OSS实例
"""
return OSS()
if __name__ == "__main__":
oss = get_oss()
oss.create_bucket()

View File

@@ -79,7 +79,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300 # Log flush interval in milliseconds
#tokio runtime
export RUSTFS_RUNTIME_WORKER_THREADS=16
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
# shellcheck disable=SC2125
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60
@@ -90,30 +90,30 @@ export OTEL_INSTRUMENTATION_VERSION="0.1.1"
export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0"
export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production"
## notify
#export RUSTFS_NOTIFY_WEBHOOK_ENABLE="on" # Whether to enable webhook notification
#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://127.0.0.1:3020/webhook" # Webhook notification address
#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify"
#
#export RUSTFS_NOTIFY_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook notification
#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_PRIMARY="http://127.0.0.1:3020/webhook" # Webhook notification address
#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/notify"
#
#export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook notification
#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://127.0.0.1:3020/webhook" # Webhook notification address
#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify"
#
#export RUSTFS_AUDIT_WEBHOOK_ENABLE="on" # Whether to enable webhook audit
#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT="http://127.0.0.1:3020/webhook" # Webhook audit address
#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/audit"
#
#export RUSTFS_AUDIT_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook audit
#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_PRIMARY="http://127.0.0.1:3020/webhook" # Webhook audit address
#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/audit"
#
#export RUSTFS_AUDIT_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook audit
#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_MASTER="http://127.0.0.1:3020/webhook" # Webhook audit address
#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/audit"
# # notify
# export RUSTFS_NOTIFY_WEBHOOK_ENABLE="on" # Whether to enable webhook notification
# export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # Webhook notification address
# export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify"
# export RUSTFS_NOTIFY_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook notification
# export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_PRIMARY="http://[::]:3020/webhook" # Webhook notification address
# export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/notify"
# export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook notification
# export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook notification address
# export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify"
# export RUSTFS_AUDIT_WEBHOOK_ENABLE="on" # Whether to enable webhook audit
# export RUSTFS_AUDIT_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # Webhook audit address
# export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/audit"
# export RUSTFS_AUDIT_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook audit
# export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_PRIMARY="http://[::]:3020/webhook" # Webhook audit address
# export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/audit"
# export RUSTFS_AUDIT_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook audit
# export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook audit address
# export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/audit"
# export RUSTFS_POLICY_PLUGIN_URL="http://localhost:8181/v1/data/rustfs/authz/allow" # The URL of the OPA system
# export RUSTFS_POLICY_PLUGIN_AUTH_TOKEN="your-opa-token" # The authentication token for the OPA system is optional
@@ -170,7 +170,7 @@ export RUSTFS_NS_SCANNER_INTERVAL=60 # Object scanning interval in seconds
#export RUSTFS_REGION="us-east-1"
export RUSTFS_ENABLE_SCANNER=false
export RUSTFS_ENABLE_SCANNER=true
export RUSTFS_ENABLE_HEAL=false