diff --git a/crates/filemeta/src/fileinfo.rs b/crates/filemeta/src/fileinfo.rs index edf3c1eb..22e78f06 100644 --- a/crates/filemeta/src/fileinfo.rs +++ b/crates/filemeta/src/fileinfo.rs @@ -461,13 +461,7 @@ pub struct FileInfoVersions { } impl FileInfoVersions { - pub fn find_version_index(&self, v: &str) -> Option { - if v.is_empty() { - return None; - } - - let vid = Uuid::parse_str(v).unwrap_or_default(); - + pub fn find_version_index(&self, vid: Uuid) -> Option { self.versions.iter().position(|v| v.version_id == Some(vid)) } diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index 56f5447b..6c25eba6 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -2006,11 +2006,7 @@ impl MetaObject { if *status == TRANSITION_COMPLETE.as_bytes().to_vec() { let vid = Uuid::parse_str(&fi.tier_free_version_id()); if let Err(err) = vid { - panic!( - "Invalid Tier Object delete marker versionId {} {}", - fi.tier_free_version_id(), - err - ); + panic!("Invalid Tier Object delete marker versionId {} {}", fi.tier_free_version_id(), err); } let vid = vid.unwrap(); let mut free_entry = FileMetaVersion { diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index fdbc430d..81269c94 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,19 +1,19 @@ use crate::OtelConfig; -use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode}; +use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style}; use nu_ansi_term::Color; use opentelemetry::trace::TracerProvider; -use opentelemetry::{global, KeyValue}; +use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::{ + Resource, metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, - Resource, }; use opentelemetry_semantic_conventions::{ - attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, SCHEMA_URL, + attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, }; use rustfs_config::{ APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, @@ -28,7 +28,7 @@ use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::time::LocalTime; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt}; /// A guard object that manages the lifecycle of OpenTelemetry components. /// diff --git a/crates/utils/src/os/windows.rs b/crates/utils/src/os/windows.rs index 35c043d7..40116506 100644 --- a/crates/utils/src/os/windows.rs +++ b/crates/utils/src/os/windows.rs @@ -40,14 +40,12 @@ pub fn get_info(p: impl AsRef) -> std::io::Result { let free = unsafe { *lp_total_number_of_free_bytes.QuadPart() }; if free > total { - return Err(Error::other( - format!( - "detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'", - free, - total, - p.as_ref().display() - ), - )); + return Err(Error::other(format!( + "detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'", + free, + total, + p.as_ref().display() + ))); } let mut lp_sectors_per_cluster: DWORD = 0; diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index 3a0b85fb..d78b754b 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -65,4 +65,4 @@ impl Stream for RetryTimer { pub fn new_retry_timer(_max_retry: i32, _base_sleep: Duration, _max_sleep: Duration, _jitter: f64) -> Vec { todo!(); -} \ No newline at end of file +} diff --git a/ecstore/src/bucket/lifecycle/lifecycle.rs b/ecstore/src/bucket/lifecycle/lifecycle.rs index f0cd8ded..d4e3ba93 100644 --- a/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -198,7 +198,8 @@ impl Lifecycle for BucketLifecycleConfiguration { } let rule_prefix = rule.prefix.as_ref().expect("err!"); - if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix) { + if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix) + { continue; } @@ -425,7 +426,8 @@ impl Lifecycle for BucketLifecycleConfiguration { if !obj.is_latest { if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions { if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class { - if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE { + if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE + { let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj); if due.is_some() && (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unwrap().unix_timestamp()) @@ -452,7 +454,9 @@ impl Lifecycle for BucketLifecycleConfiguration { if let Some(ref expiration) = rule.expiration { if let Some(ref date) = expiration.date { let date0 = OffsetDateTime::from(date.clone()); - if date0.unix_timestamp() != 0 && (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp()) { + if date0.unix_timestamp() != 0 + && (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp()) + { events.push(Event { action: IlmAction::DeleteAction, rule_id: rule.id.clone().expect("err!"), diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 5c4a82ca..305b52fb 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -4690,8 +4690,7 @@ impl StorageAPI for SetDisks { continue; } } - let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err")) - .await; + let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err")).await; break; } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index d68fca70..ebd4706f 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -567,7 +567,92 @@ impl ObjectInfo { } } - pub async fn from_meta_cache_entries_sorted( + pub async fn from_meta_cache_entries_sorted_versions( + entries: &MetaCacheEntriesSorted, + bucket: &str, + prefix: &str, + delimiter: Option, + after_version_id: Option, + ) -> Vec { + let vcfg = get_versioning_config(bucket).await.ok(); + let mut objects = Vec::with_capacity(entries.entries().len()); + let mut prev_prefix = ""; + for entry in entries.entries() { + if entry.is_object() { + if let Some(delimiter) = &delimiter { + if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { + let idx = prefix.len() + idx + delimiter.len(); + if let Some(curr_prefix) = entry.name.get(0..idx) { + if curr_prefix == prev_prefix { + continue; + } + + prev_prefix = curr_prefix; + + objects.push(ObjectInfo { + is_dir: true, + bucket: bucket.to_owned(), + name: curr_prefix.to_owned(), + ..Default::default() + }); + } + continue; + } + } + + let file_infos = match entry.file_info_versions(bucket) { + Ok(res) => res, + Err(err) => { + warn!("file_info_versions err {:?}", err); + continue; + } + }; + + let versions = if let Some(vid) = after_version_id { + if let Some(idx) = file_infos.find_version_index(vid) { + &file_infos.versions[idx + 1..] + } else { + &file_infos.versions + } + } else { + &file_infos.versions + }; + + for fi in versions.iter() { + // TODO:VersionPurgeStatus + let versioned = vcfg.clone().map(|v| v.0.versioned(&entry.name)).unwrap_or_default(); + objects.push(ObjectInfo::from_file_info(fi, bucket, &entry.name, versioned)); + } + continue; + } + + if entry.is_dir() { + if let Some(delimiter) = &delimiter { + if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { + let idx = prefix.len() + idx + delimiter.len(); + if let Some(curr_prefix) = entry.name.get(0..idx) { + if curr_prefix == prev_prefix { + continue; + } + + prev_prefix = curr_prefix; + + objects.push(ObjectInfo { + is_dir: true, + bucket: bucket.to_owned(), + name: curr_prefix.to_owned(), + ..Default::default() + }); + } + } + } + } + } + + objects + } + + pub async fn from_meta_cache_entries_sorted_infos( entries: &MetaCacheEntriesSorted, bucket: &str, prefix: &str, @@ -599,11 +684,18 @@ impl ObjectInfo { } } - if let Ok(fi) = entry.to_fileinfo(bucket) { - // TODO:VersionPurgeStatus - let versioned = vcfg.clone().map(|v| v.0.versioned(&entry.name)).unwrap_or_default(); - objects.push(ObjectInfo::from_file_info(&fi, bucket, &entry.name, versioned)); - } + let fi = match entry.to_fileinfo(bucket) { + Ok(res) => res, + Err(err) => { + warn!("file_info_versions err {:?}", err); + continue; + } + }; + + // TODO:VersionPurgeStatus + let versioned = vcfg.clone().map(|v| v.0.versioned(&entry.name)).unwrap_or_default(); + objects.push(ObjectInfo::from_file_info(&fi, bucket, &entry.name, versioned)); + continue; } diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 4aed6cab..757bd8d8 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast::{self, Receiver as B_Receiver}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::{error, warn}; +use tracing::{error, info}; use uuid::Uuid; const MAX_OBJECT_LIST: i32 = 1000; @@ -246,8 +246,6 @@ impl ECStore { ..Default::default() }; - // warn!("list_objects_generic opts {:?}", &opts); - // use get if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() { match self @@ -295,7 +293,7 @@ impl ECStore { // contextCanceled - let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted( + let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos( &list_result.entries.unwrap_or_default(), bucket, prefix, @@ -364,10 +362,15 @@ impl ECStore { max_keys: i32, ) -> Result { if marker.is_none() && version_marker.is_some() { - warn!("inner_list_object_versions: marker is none and version_marker is some"); return Err(StorageError::NotImplemented); } + let version_marker = if let Some(marker) = version_marker { + Some(Uuid::parse_str(&marker)?) + } else { + None + }; + // if marker set, limit +1 let opts = ListPathOptions { bucket: bucket.to_owned(), @@ -399,11 +402,12 @@ impl ECStore { result.forward_past(opts.marker); } - let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted( + let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_versions( &list_result.entries.unwrap_or_default(), bucket, prefix, delimiter.clone(), + version_marker, ) .await; @@ -1068,7 +1072,7 @@ async fn merge_entry_channels( } }, _ = rx.recv()=>{ - warn!("merge_entry_channels rx.recv() cancel"); + info!("merge_entry_channels rx.recv() cancel"); return Ok(()) }, } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 777e02d7..d308fea4 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1931,13 +1931,22 @@ impl S3 for FS { let rcfg = match metadata_sys::get_replication_config(&bucket).await { Ok((cfg, _created)) => Some(cfg), Err(err) => { + if err == StorageError::ConfigNotFound { + return Err(S3Error::with_message( + S3ErrorCode::ReplicationConfigurationNotFoundError, + "replication not found".to_string(), + )); + } error!("get_replication_config err {:?}", err); return Err(ApiError::from(err).into()); } }; if rcfg.is_none() { - return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "replication not found".to_string())); + return Err(S3Error::with_message( + S3ErrorCode::ReplicationConfigurationNotFoundError, + "replication not found".to_string(), + )); } // Ok(S3Response::new(GetBucketReplicationOutput {