mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Compare commits
1 Commits
1.0.0-alph
...
refactor/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b80074270 |
@@ -1085,9 +1085,16 @@ impl LocalDisk {
|
||||
*item = "".to_owned();
|
||||
|
||||
if entry.ends_with(STORAGE_FORMAT_FILE) {
|
||||
let metadata = self
|
||||
let metadata = match self
|
||||
.read_metadata(self.get_object_path(bucket, format!("{}/{}", ¤t, &entry).as_str())?)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!("scan dir read_metadata error, continue {:?}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
|
||||
let name = entry.trim_end_matches(SLASH_SEPARATOR);
|
||||
@@ -1189,6 +1196,9 @@ impl LocalDisk {
|
||||
// }
|
||||
}
|
||||
Err(err) => {
|
||||
if err == Error::DiskNotDir {
|
||||
continue;
|
||||
}
|
||||
if err == Error::FileNotFound || err == Error::IsNotRegular {
|
||||
// NOT an object, append to stack (with slash)
|
||||
// If dirObject, but no metadata (which is unexpected) we skip it.
|
||||
@@ -1203,7 +1213,7 @@ impl LocalDisk {
|
||||
};
|
||||
}
|
||||
|
||||
while let Some(dir) = dir_stack.pop() {
|
||||
while let Some(dir) = dir_stack.last() {
|
||||
if opts.limit > 0 && *objs_returned >= opts.limit {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1215,10 +1225,11 @@ impl LocalDisk {
|
||||
.await?;
|
||||
|
||||
if opts.recursive {
|
||||
if let Err(er) = Box::pin(self.scan_dir(dir, prefix.clone(), opts, out, objs_returned)).await {
|
||||
if let Err(er) = Box::pin(self.scan_dir(dir.clone(), prefix.clone(), opts, out, objs_returned)).await {
|
||||
warn!("scan_dir err {:?}", &er);
|
||||
}
|
||||
}
|
||||
dir_stack.pop();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -2601,6 +2601,8 @@ pub async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool>
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bucket::metadata_sys::init_bucket_metadata_sys;
|
||||
|
||||
use super::*;
|
||||
|
||||
// Test validation functions
|
||||
@@ -2788,4 +2790,122 @@ mod tests {
|
||||
assert!(check_put_object_args("", "test-object").is_err());
|
||||
assert!(check_put_object_args("test-bucket", "").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ecstore_put_and_list_objects() {
|
||||
use crate::disk::endpoint::Endpoint;
|
||||
use crate::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
|
||||
let test_base_dir = format!("/tmp/rustfs_test_put_list_{}", Uuid::new_v4());
|
||||
let temp_dir = PathBuf::from(&test_base_dir);
|
||||
|
||||
if temp_dir.exists() {
|
||||
let _ = fs::remove_dir_all(&temp_dir).await;
|
||||
}
|
||||
fs::create_dir_all(&temp_dir).await.expect("Failed to create test directory");
|
||||
|
||||
let disk_paths = vec![
|
||||
temp_dir.join("disk1"),
|
||||
temp_dir.join("disk2"),
|
||||
temp_dir.join("disk3"),
|
||||
temp_dir.join("disk4"),
|
||||
];
|
||||
|
||||
for disk_path in &disk_paths {
|
||||
fs::create_dir_all(disk_path).await.expect("Failed to create disk directory");
|
||||
}
|
||||
|
||||
let mut endpoints = Vec::new();
|
||||
for (i, disk_path) in disk_paths.iter().enumerate() {
|
||||
let disk_str = disk_path.to_str().expect("Invalid disk path");
|
||||
let mut endpoint = Endpoint::try_from(disk_str).expect("Failed to create endpoint");
|
||||
endpoint.set_pool_index(0);
|
||||
endpoint.set_set_index(0);
|
||||
endpoint.set_disk_index(i);
|
||||
endpoints.push(endpoint);
|
||||
}
|
||||
|
||||
let pool_endpoints = PoolEndpoints {
|
||||
legacy: false,
|
||||
set_count: 1,
|
||||
drives_per_set: 4,
|
||||
endpoints: Endpoints::from(endpoints),
|
||||
cmd_line: "test".to_string(),
|
||||
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
|
||||
};
|
||||
|
||||
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
|
||||
|
||||
init_local_disks(endpoint_pools.clone())
|
||||
.await
|
||||
.expect("Failed to initialize local disks");
|
||||
|
||||
let server_addr: SocketAddr = "127.0.0.1:0".parse().expect("Invalid server address");
|
||||
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
|
||||
.await
|
||||
.expect("Failed to create ECStore");
|
||||
|
||||
init_bucket_metadata_sys(ecstore.clone(), vec![]).await;
|
||||
|
||||
let bucket_name = "test-bucket";
|
||||
ecstore
|
||||
.make_bucket(bucket_name, &MakeBucketOptions::default())
|
||||
.await
|
||||
.expect("Failed to create bucket");
|
||||
|
||||
let test_objects = vec![
|
||||
("object1.txt", b"Hello, World!".to_vec()),
|
||||
("object2.txt", b"Test data for object 2".to_vec()),
|
||||
("folder/object3.txt", b"Object in folder".to_vec()),
|
||||
("folder/subfolder/object4.txt", b"Nested object".to_vec()),
|
||||
];
|
||||
|
||||
for (object_name, data) in &test_objects {
|
||||
let mut reader = PutObjReader::from_vec(data.clone());
|
||||
let object_info = ecstore
|
||||
.put_object(bucket_name, object_name, &mut reader, &ObjectOptions::default())
|
||||
.await
|
||||
.unwrap_or_else(|e| panic!("Failed to put object {}: {}", object_name, e));
|
||||
|
||||
assert_eq!(object_info.size, data.len() as i64, "Object size mismatch for {}", object_name);
|
||||
assert_eq!(object_info.bucket, bucket_name);
|
||||
}
|
||||
|
||||
let list_result = ecstore
|
||||
.clone()
|
||||
.list_objects_v2(bucket_name, "", None, None, 1000, false, None)
|
||||
.await
|
||||
.expect("Failed to list objects");
|
||||
|
||||
assert_eq!(list_result.objects.len(), test_objects.len(), "Number of objects mismatch");
|
||||
|
||||
let mut object_names: Vec<String> = list_result.objects.iter().map(|o| o.name.clone()).collect();
|
||||
object_names.sort();
|
||||
|
||||
let mut expected_names: Vec<String> = test_objects.iter().map(|(n, _)| n.to_string()).collect();
|
||||
expected_names.sort();
|
||||
|
||||
assert_eq!(object_names, expected_names, "Object names mismatch");
|
||||
|
||||
let prefix_result = ecstore
|
||||
.clone()
|
||||
.list_objects_v2(bucket_name, "folder/", None, None, 1000, false, None)
|
||||
.await
|
||||
.expect("Failed to list objects with prefix");
|
||||
|
||||
assert_eq!(prefix_result.objects.len(), 2, "Should find 2 objects with prefix 'folder/'");
|
||||
assert!(prefix_result.objects.iter().all(|o| o.name.starts_with("folder/")));
|
||||
|
||||
let delimiter_result = ecstore
|
||||
.clone()
|
||||
.list_objects_v2(bucket_name, "", None, Some("/".to_string()), 1000, false, None)
|
||||
.await
|
||||
.expect("Failed to list objects with delimiter");
|
||||
|
||||
assert!(!delimiter_result.prefixes.is_empty() || !delimiter_result.objects.is_empty());
|
||||
|
||||
let _ = fs::remove_dir_all(&temp_dir).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300
|
||||
#tokio runtime
|
||||
export RUSTFS_RUNTIME_WORKER_THREADS=16
|
||||
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
|
||||
# shellcheck disable=SC2125
|
||||
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
|
||||
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60
|
||||
|
||||
Reference in New Issue
Block a user