refactor: improve object listing and version handling

- Refactor find_version_index to accept Uuid directly instead of string
- Split from_meta_cache_entries_sorted into separate methods for versions and infos
- Add support for after_version_id filtering in version listing
- Fix S3 error code for replication configuration not found
- Improve error handling and logging levels
- Clean up import statements and remove debug code
This commit is contained in:
weisd
2025-06-24 11:10:23 +08:00
parent 364c3098b2
commit 7e17d7d729
10 changed files with 141 additions and 45 deletions

View File

@@ -461,13 +461,7 @@ pub struct FileInfoVersions {
}
impl FileInfoVersions {
pub fn find_version_index(&self, v: &str) -> Option<usize> {
if v.is_empty() {
return None;
}
let vid = Uuid::parse_str(v).unwrap_or_default();
pub fn find_version_index(&self, vid: Uuid) -> Option<usize> {
self.versions.iter().position(|v| v.version_id == Some(vid))
}

View File

@@ -2006,11 +2006,7 @@ impl MetaObject {
if *status == TRANSITION_COMPLETE.as_bytes().to_vec() {
let vid = Uuid::parse_str(&fi.tier_free_version_id());
if let Err(err) = vid {
panic!(
"Invalid Tier Object delete marker versionId {} {}",
fi.tier_free_version_id(),
err
);
panic!("Invalid Tier Object delete marker versionId {} {}", fi.tier_free_version_id(), err);
}
let vid = vid.unwrap();
let mut free_entry = FileMetaVersion {

View File

@@ -1,19 +1,19 @@
use crate::OtelConfig;
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
Resource,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO,
@@ -28,7 +28,7 @@ use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::fmt::time::LocalTime;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
/// A guard object that manages the lifecycle of OpenTelemetry components.
///

View File

@@ -40,14 +40,12 @@ pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
let free = unsafe { *lp_total_number_of_free_bytes.QuadPart() };
if free > total {
return Err(Error::other(
format!(
"detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'",
free,
total,
p.as_ref().display()
),
));
return Err(Error::other(format!(
"detected free space ({}) > total drive space ({}), fs corruption at ({}). please run 'fsck'",
free,
total,
p.as_ref().display()
)));
}
let mut lp_sectors_per_cluster: DWORD = 0;

View File

@@ -65,4 +65,4 @@ impl Stream for RetryTimer {
pub fn new_retry_timer(_max_retry: i32, _base_sleep: Duration, _max_sleep: Duration, _jitter: f64) -> Vec<i32> {
todo!();
}
}

View File

@@ -198,7 +198,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
let rule_prefix = rule.prefix.as_ref().expect("err!");
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix) {
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
{
continue;
}
@@ -425,7 +426,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
if !obj.is_latest {
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE
{
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if due.is_some()
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unwrap().unix_timestamp())
@@ -452,7 +454,9 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0 && (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp()) {
if date0.unix_timestamp() != 0
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp())
{
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),

View File

@@ -4690,8 +4690,7 @@ impl StorageAPI for SetDisks {
continue;
}
}
let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err"))
.await;
let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err")).await;
break;
}

View File

@@ -567,7 +567,92 @@ impl ObjectInfo {
}
}
pub async fn from_meta_cache_entries_sorted(
pub async fn from_meta_cache_entries_sorted_versions(
entries: &MetaCacheEntriesSorted,
bucket: &str,
prefix: &str,
delimiter: Option<String>,
after_version_id: Option<Uuid>,
) -> Vec<ObjectInfo> {
let vcfg = get_versioning_config(bucket).await.ok();
let mut objects = Vec::with_capacity(entries.entries().len());
let mut prev_prefix = "";
for entry in entries.entries() {
if entry.is_object() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
continue;
}
prev_prefix = curr_prefix;
objects.push(ObjectInfo {
is_dir: true,
bucket: bucket.to_owned(),
name: curr_prefix.to_owned(),
..Default::default()
});
}
continue;
}
}
let file_infos = match entry.file_info_versions(bucket) {
Ok(res) => res,
Err(err) => {
warn!("file_info_versions err {:?}", err);
continue;
}
};
let versions = if let Some(vid) = after_version_id {
if let Some(idx) = file_infos.find_version_index(vid) {
&file_infos.versions[idx + 1..]
} else {
&file_infos.versions
}
} else {
&file_infos.versions
};
for fi in versions.iter() {
// TODO:VersionPurgeStatus
let versioned = vcfg.clone().map(|v| v.0.versioned(&entry.name)).unwrap_or_default();
objects.push(ObjectInfo::from_file_info(fi, bucket, &entry.name, versioned));
}
continue;
}
if entry.is_dir() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
continue;
}
prev_prefix = curr_prefix;
objects.push(ObjectInfo {
is_dir: true,
bucket: bucket.to_owned(),
name: curr_prefix.to_owned(),
..Default::default()
});
}
}
}
}
}
objects
}
pub async fn from_meta_cache_entries_sorted_infos(
entries: &MetaCacheEntriesSorted,
bucket: &str,
prefix: &str,
@@ -599,11 +684,18 @@ impl ObjectInfo {
}
}
if let Ok(fi) = entry.to_fileinfo(bucket) {
// TODO:VersionPurgeStatus
let versioned = vcfg.clone().map(|v| v.0.versioned(&entry.name)).unwrap_or_default();
objects.push(ObjectInfo::from_file_info(&fi, bucket, &entry.name, versioned));
}
let fi = match entry.to_fileinfo(bucket) {
Ok(res) => res,
Err(err) => {
warn!("file_info_versions err {:?}", err);
continue;
}
};
// TODO:VersionPurgeStatus
let versioned = vcfg.clone().map(|v| v.0.versioned(&entry.name)).unwrap_or_default();
objects.push(ObjectInfo::from_file_info(&fi, bucket, &entry.name, versioned));
continue;
}

View File

@@ -23,7 +23,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{error, warn};
use tracing::{error, info};
use uuid::Uuid;
const MAX_OBJECT_LIST: i32 = 1000;
@@ -246,8 +246,6 @@ impl ECStore {
..Default::default()
};
// warn!("list_objects_generic opts {:?}", &opts);
// use get
if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() {
match self
@@ -295,7 +293,7 @@ impl ECStore {
// contextCanceled
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted(
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(
&list_result.entries.unwrap_or_default(),
bucket,
prefix,
@@ -364,10 +362,15 @@ impl ECStore {
max_keys: i32,
) -> Result<ListObjectVersionsInfo> {
if marker.is_none() && version_marker.is_some() {
warn!("inner_list_object_versions: marker is none and version_marker is some");
return Err(StorageError::NotImplemented);
}
let version_marker = if let Some(marker) = version_marker {
Some(Uuid::parse_str(&marker)?)
} else {
None
};
// if marker set, limit +1
let opts = ListPathOptions {
bucket: bucket.to_owned(),
@@ -399,11 +402,12 @@ impl ECStore {
result.forward_past(opts.marker);
}
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted(
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_versions(
&list_result.entries.unwrap_or_default(),
bucket,
prefix,
delimiter.clone(),
version_marker,
)
.await;
@@ -1068,7 +1072,7 @@ async fn merge_entry_channels(
}
},
_ = rx.recv()=>{
warn!("merge_entry_channels rx.recv() cancel");
info!("merge_entry_channels rx.recv() cancel");
return Ok(())
},
}

View File

@@ -1931,13 +1931,22 @@ impl S3 for FS {
let rcfg = match metadata_sys::get_replication_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
if err == StorageError::ConfigNotFound {
return Err(S3Error::with_message(
S3ErrorCode::ReplicationConfigurationNotFoundError,
"replication not found".to_string(),
));
}
error!("get_replication_config err {:?}", err);
return Err(ApiError::from(err).into());
}
};
if rcfg.is_none() {
return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "replication not found".to_string()));
return Err(S3Error::with_message(
S3ErrorCode::ReplicationConfigurationNotFoundError,
"replication not found".to_string(),
));
}
// Ok(S3Response::new(GetBucketReplicationOutput {