Compare commits

...

1 Commits

Author SHA1 Message Date
weisd
0b80074270 todo 2025-11-04 17:12:58 +08:00
3 changed files with 136 additions and 5 deletions

View File

@@ -1085,9 +1085,16 @@ impl LocalDisk {
*item = "".to_owned();
if entry.ends_with(STORAGE_FORMAT_FILE) {
let metadata = self
let metadata = match self
.read_metadata(self.get_object_path(bucket, format!("{}/{}", &current, &entry).as_str())?)
.await?;
.await
{
Ok(res) => res,
Err(err) => {
warn!("scan dir read_metadata error, continue {:?}", err);
continue;
}
};
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
let name = entry.trim_end_matches(SLASH_SEPARATOR);
@@ -1189,6 +1196,9 @@ impl LocalDisk {
// }
}
Err(err) => {
if err == Error::DiskNotDir {
continue;
}
if err == Error::FileNotFound || err == Error::IsNotRegular {
// NOT an object, append to stack (with slash)
// If dirObject, but no metadata (which is unexpected) we skip it.
@@ -1203,7 +1213,7 @@ impl LocalDisk {
};
}
while let Some(dir) = dir_stack.pop() {
while let Some(dir) = dir_stack.last() {
if opts.limit > 0 && *objs_returned >= opts.limit {
return Ok(());
}
@@ -1215,10 +1225,11 @@ impl LocalDisk {
.await?;
if opts.recursive {
if let Err(er) = Box::pin(self.scan_dir(dir, prefix.clone(), opts, out, objs_returned)).await {
if let Err(er) = Box::pin(self.scan_dir(dir.clone(), prefix.clone(), opts, out, objs_returned)).await {
warn!("scan_dir err {:?}", &er);
}
}
dir_stack.pop();
}
Ok(())

View File

@@ -2601,6 +2601,8 @@ pub async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool>
#[cfg(test)]
mod tests {
use crate::bucket::metadata_sys::init_bucket_metadata_sys;
use super::*;
// Test validation functions
@@ -2788,4 +2790,122 @@ mod tests {
assert!(check_put_object_args("", "test-object").is_err());
assert!(check_put_object_args("test-bucket", "").is_err());
}
#[tokio::test]
async fn test_ecstore_put_and_list_objects() {
use crate::disk::endpoint::Endpoint;
use crate::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints};
use std::path::PathBuf;
use tokio::fs;
let test_base_dir = format!("/tmp/rustfs_test_put_list_{}", Uuid::new_v4());
let temp_dir = PathBuf::from(&test_base_dir);
if temp_dir.exists() {
let _ = fs::remove_dir_all(&temp_dir).await;
}
fs::create_dir_all(&temp_dir).await.expect("Failed to create test directory");
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.expect("Failed to create disk directory");
}
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let disk_str = disk_path.to_str().expect("Invalid disk path");
let mut endpoint = Endpoint::try_from(disk_str).expect("Failed to create endpoint");
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
init_local_disks(endpoint_pools.clone())
.await
.expect("Failed to initialize local disks");
let server_addr: SocketAddr = "127.0.0.1:0".parse().expect("Invalid server address");
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.expect("Failed to create ECStore");
init_bucket_metadata_sys(ecstore.clone(), vec![]).await;
let bucket_name = "test-bucket";
ecstore
.make_bucket(bucket_name, &MakeBucketOptions::default())
.await
.expect("Failed to create bucket");
let test_objects = vec![
("object1.txt", b"Hello, World!".to_vec()),
("object2.txt", b"Test data for object 2".to_vec()),
("folder/object3.txt", b"Object in folder".to_vec()),
("folder/subfolder/object4.txt", b"Nested object".to_vec()),
];
for (object_name, data) in &test_objects {
let mut reader = PutObjReader::from_vec(data.clone());
let object_info = ecstore
.put_object(bucket_name, object_name, &mut reader, &ObjectOptions::default())
.await
.unwrap_or_else(|e| panic!("Failed to put object {}: {}", object_name, e));
assert_eq!(object_info.size, data.len() as i64, "Object size mismatch for {}", object_name);
assert_eq!(object_info.bucket, bucket_name);
}
let list_result = ecstore
.clone()
.list_objects_v2(bucket_name, "", None, None, 1000, false, None)
.await
.expect("Failed to list objects");
assert_eq!(list_result.objects.len(), test_objects.len(), "Number of objects mismatch");
let mut object_names: Vec<String> = list_result.objects.iter().map(|o| o.name.clone()).collect();
object_names.sort();
let mut expected_names: Vec<String> = test_objects.iter().map(|(n, _)| n.to_string()).collect();
expected_names.sort();
assert_eq!(object_names, expected_names, "Object names mismatch");
let prefix_result = ecstore
.clone()
.list_objects_v2(bucket_name, "folder/", None, None, 1000, false, None)
.await
.expect("Failed to list objects with prefix");
assert_eq!(prefix_result.objects.len(), 2, "Should find 2 objects with prefix 'folder/'");
assert!(prefix_result.objects.iter().all(|o| o.name.starts_with("folder/")));
let delimiter_result = ecstore
.clone()
.list_objects_v2(bucket_name, "", None, Some("/".to_string()), 1000, false, None)
.await
.expect("Failed to list objects with delimiter");
assert!(!delimiter_result.prefixes.is_empty() || !delimiter_result.objects.is_empty());
let _ = fs::remove_dir_all(&temp_dir).await;
}
}

View File

@@ -70,7 +70,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300
#tokio runtime
export RUSTFS_RUNTIME_WORKER_THREADS=16
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
# shellcheck disable=SC2125
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60