mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Fix/delete version (#484)
* fix:delete_version * fix:test_lifecycle_expiry_basic --------- Co-authored-by: likewu <likewu@126.com>
This commit is contained in:
@@ -29,8 +29,8 @@ use std::sync::OnceLock;
|
||||
use std::{path::PathBuf, sync::Arc, time::Duration};
|
||||
use tokio::fs;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use tracing::{debug, info};
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
@@ -278,7 +278,7 @@ async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bo
|
||||
#[allow(dead_code)]
|
||||
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
println!("oi: {:?}", oi);
|
||||
debug!("oi: {:?}", oi);
|
||||
oi.delete_marker
|
||||
} else {
|
||||
panic!("object_is_delete_marker is error");
|
||||
@@ -358,8 +358,10 @@ async fn test_lifecycle_expiry_basic() {
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object is_delete_marker after lifecycle processing: {check_result}");
|
||||
|
||||
if !check_result {
|
||||
if check_result {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
} else {
|
||||
println!("✅ Object was successfully deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
@@ -375,11 +377,9 @@ async fn test_lifecycle_expiry_basic() {
|
||||
println!("Error getting object info: {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("✅ Object was successfully deleted by lifecycle processing");
|
||||
}
|
||||
|
||||
assert!(check_result);
|
||||
assert!(!check_result);
|
||||
println!("✅ Object successfully expired");
|
||||
|
||||
// Stop scanner
|
||||
|
||||
@@ -578,45 +578,61 @@ impl FileMeta {
|
||||
}
|
||||
}
|
||||
|
||||
let mut found_index = None;
|
||||
for (i, version) in self.versions.iter().enumerate() {
|
||||
if version.header.version_type != VersionType::Object || version.header.version_id != fi.version_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut ver = self.get_idx(i)?;
|
||||
|
||||
if fi.expire_restored {
|
||||
ver.object.as_mut().unwrap().remove_restore_hdrs();
|
||||
let _ = self.set_idx(i, ver.clone());
|
||||
} else if fi.transition_status == TRANSITION_COMPLETE {
|
||||
ver.object.as_mut().unwrap().set_transition(fi);
|
||||
ver.object.as_mut().unwrap().reset_inline_data();
|
||||
self.set_idx(i, ver.clone())?;
|
||||
return Ok(None);
|
||||
} else {
|
||||
let vers = self.versions[i + 1..].to_vec();
|
||||
self.versions.extend(vers.iter().cloned());
|
||||
let (free_version, to_free) = ver.object.as_ref().unwrap().init_free_version(fi);
|
||||
if to_free {
|
||||
self.add_version_filemata(free_version)?;
|
||||
}
|
||||
if version.header.version_type == VersionType::Object && version.header.version_id == fi.version_id {
|
||||
found_index = Some(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let Some(i) = found_index else {
|
||||
if fi.deleted {
|
||||
self.add_version_filemata(ventry)?;
|
||||
}
|
||||
if self.shared_data_dir_count(ver.object.as_ref().unwrap().version_id, ver.object.as_ref().unwrap().data_dir) > 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
return Ok(ver.object.as_ref().unwrap().data_dir);
|
||||
return Err(Error::FileVersionNotFound);
|
||||
};
|
||||
|
||||
let mut ver = self.get_idx(i)?;
|
||||
|
||||
let Some(obj) = &mut ver.object else {
|
||||
if fi.deleted {
|
||||
self.add_version_filemata(ventry)?;
|
||||
return Ok(None);
|
||||
}
|
||||
return Err(Error::FileVersionNotFound);
|
||||
};
|
||||
|
||||
let obj_version_id = obj.version_id;
|
||||
let obj_data_dir = obj.data_dir;
|
||||
|
||||
if fi.expire_restored {
|
||||
obj.remove_restore_hdrs();
|
||||
self.set_idx(i, ver)?;
|
||||
} else if fi.transition_status == TRANSITION_COMPLETE {
|
||||
obj.set_transition(fi);
|
||||
obj.reset_inline_data();
|
||||
self.set_idx(i, ver)?;
|
||||
} else {
|
||||
self.versions.remove(i);
|
||||
|
||||
let (free_version, to_free) = obj.init_free_version(fi);
|
||||
|
||||
if to_free {
|
||||
self.add_version_filemata(free_version)?;
|
||||
}
|
||||
}
|
||||
|
||||
if fi.deleted {
|
||||
self.add_version_filemata(ventry)?;
|
||||
}
|
||||
|
||||
if self.shared_data_dir_count(obj_version_id, obj_data_dir) > 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Err(Error::FileVersionNotFound)
|
||||
Ok(obj_data_dir)
|
||||
}
|
||||
|
||||
pub fn into_fileinfo(
|
||||
|
||||
@@ -227,7 +227,7 @@ impl Operation for NotificationTarget {
|
||||
let port = url
|
||||
.port_or_known_default()
|
||||
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing port"))?;
|
||||
let addr = format!("{}:{}", host, port);
|
||||
let addr = format!("{host}:{port}");
|
||||
// First, try to parse as SocketAddr (IP:port)
|
||||
if addr.parse::<SocketAddr>().is_err() {
|
||||
// If not an IP:port, try DNS resolution
|
||||
|
||||
@@ -144,9 +144,9 @@ pub async fn start_http_server(
|
||||
for domain in &opt.server_domains {
|
||||
domain_sets.insert(domain.to_string());
|
||||
if let Some((host, _)) = domain.split_once(':') {
|
||||
domain_sets.insert(format!("{}:{}", host, server_port));
|
||||
domain_sets.insert(format!("{host}:{server_port}"));
|
||||
} else {
|
||||
domain_sets.insert(format!("{}:{}", domain, server_port));
|
||||
domain_sets.insert(format!("{domain}:{server_port}"));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user