fix: scandir object (#733)

* fix: scandir object count

* fix: base64 list continuation_token
This commit is contained in:
weisd
2025-10-28 15:02:43 +08:00
committed by GitHub
parent 2aca1f77af
commit 40660e7b80
5 changed files with 87 additions and 65 deletions

View File

@@ -108,7 +108,7 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
}
if cancel_rx_clone.is_cancelled() {
// warn!("list_path_raw: cancel_rx_clone.try_recv().await.is_ok()");
// warn!("list_path_raw: cancel_rx_clone.is_cancelled()");
return Ok(());
}

View File

@@ -984,7 +984,8 @@ impl LocalDisk {
#[async_recursion::async_recursion]
async fn scan_dir<W>(
&self,
current: &mut String,
mut current: String,
mut prefix: String,
opts: &WalkDirOptions,
out: &mut MetacacheWriter<W>,
objs_returned: &mut i32,
@@ -1022,14 +1023,16 @@ impl LocalDisk {
return Ok(());
}
let mut entries = match self.list_dir("", &opts.bucket, current, -1).await {
// TODO: add lock
let mut entries = match self.list_dir("", &opts.bucket, &current, -1).await {
Ok(res) => res,
Err(e) => {
if e != DiskError::VolumeNotFound && e != Error::FileNotFound {
debug!("scan list_dir {}, err {:?}", &current, &e);
error!("scan list_dir {}, err {:?}", &current, &e);
}
if opts.report_notfound && e == Error::FileNotFound && current == &opts.base_dir {
if opts.report_notfound && e == Error::FileNotFound && current == opts.base_dir {
return Err(DiskError::FileNotFound);
}
@@ -1041,8 +1044,7 @@ impl LocalDisk {
return Ok(());
}
let s = SLASH_SEPARATOR.chars().next().unwrap_or_default();
*current = current.trim_matches(s).to_owned();
current = current.trim_matches('/').to_owned();
let bucket = opts.bucket.as_str();
@@ -1056,11 +1058,9 @@ impl LocalDisk {
return Ok(());
}
// check prefix
if let Some(filter_prefix) = &opts.filter_prefix {
if !entry.starts_with(filter_prefix) {
*item = "".to_owned();
continue;
}
if !prefix.is_empty() && !entry.starts_with(prefix.as_str()) {
*item = "".to_owned();
continue;
}
if let Some(forward) = &forward {
@@ -1085,46 +1085,48 @@ impl LocalDisk {
*item = "".to_owned();
if entry.ends_with(STORAGE_FORMAT_FILE) {
//
let metadata = self
.read_metadata(self.get_object_path(bucket, format!("{}/{}", &current, &entry).as_str())?)
.await?;
// 用 strip_suffix 只删除一次
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
let name = entry.trim_end_matches(SLASH_SEPARATOR);
let name = decode_dir_object(format!("{}/{}", &current, &name).as_str());
// if opts.limit > 0
// && let Ok(meta) = FileMeta::load(&metadata)
// && !meta.all_hidden(true)
// {
*objs_returned += 1;
// }
out.write_obj(&MetaCacheEntry {
name: name.clone(),
metadata,
..Default::default()
})
.await?;
*objs_returned += 1;
// warn!("scan list_dir {}, write_obj done, name: {:?}", &current, &name);
return Ok(());
}
}
entries.sort();
let mut entries = entries.as_slice();
if let Some(forward) = &forward {
for (i, entry) in entries.iter().enumerate() {
if entry >= forward || forward.starts_with(entry.as_str()) {
entries = &entries[i..];
entries.drain(..i);
break;
}
}
}
let mut dir_stack: Vec<String> = Vec::with_capacity(5);
prefix = "".to_owned();
for entry in entries.iter() {
if opts.limit > 0 && *objs_returned >= opts.limit {
// warn!("scan list_dir {}, limit reached 2", &current);
return Ok(());
}
@@ -1132,7 +1134,7 @@ impl LocalDisk {
continue;
}
let name = path_join_buf(&[current, entry]);
let name = path_join_buf(&[current.as_str(), entry.as_str()]);
if !dir_stack.is_empty() {
if let Some(pop) = dir_stack.last().cloned() {
@@ -1144,9 +1146,7 @@ impl LocalDisk {
.await?;
if opts.recursive {
let mut opts = opts.clone();
opts.filter_prefix = None;
if let Err(er) = Box::pin(self.scan_dir(&mut pop.clone(), &opts, out, objs_returned)).await {
if let Err(er) = Box::pin(self.scan_dir(pop, prefix.clone(), opts, out, objs_returned)).await {
error!("scan_dir err {:?}", er);
}
}
@@ -1181,7 +1181,12 @@ impl LocalDisk {
meta.metadata = res;
out.write_obj(&meta).await?;
// if let Ok(meta) = FileMeta::load(&meta.metadata)
// && !meta.all_hidden(true)
// {
*objs_returned += 1;
// }
}
Err(err) => {
if err == Error::FileNotFound || err == Error::IsNotRegular {
@@ -1200,7 +1205,6 @@ impl LocalDisk {
while let Some(dir) = dir_stack.pop() {
if opts.limit > 0 && *objs_returned >= opts.limit {
// warn!("scan list_dir {}, limit reached 3", &current);
return Ok(());
}
@@ -1209,19 +1213,14 @@ impl LocalDisk {
..Default::default()
})
.await?;
*objs_returned += 1;
if opts.recursive {
let mut dir = dir;
let mut opts = opts.clone();
opts.filter_prefix = None;
if let Err(er) = Box::pin(self.scan_dir(&mut dir, &opts, out, objs_returned)).await {
if let Err(er) = Box::pin(self.scan_dir(dir, prefix.clone(), opts, out, objs_returned)).await {
warn!("scan_dir err {:?}", &er);
}
}
}
// warn!("scan list_dir {}, done", &current);
Ok(())
}
}
@@ -1884,8 +1883,14 @@ impl DiskAPI for LocalDisk {
}
}
let mut current = opts.base_dir.clone();
self.scan_dir(&mut current, &opts, &mut out, &mut objs_returned).await?;
self.scan_dir(
opts.base_dir.clone(),
opts.filter_prefix.clone().unwrap_or_default(),
&opts,
&mut out,
&mut objs_returned,
)
.await?;
Ok(())
}

View File

@@ -27,7 +27,7 @@
//!
//! ## Example
//!
//! ```rust
//! ```ignore
//! use rustfs_ecstore::erasure_coding::Erasure;
//!
//! let erasure = Erasure::new(4, 2, 1024); // 4 data shards, 2 parity shards, 1KB block size
@@ -83,7 +83,6 @@ impl ReedSolomonEncoder {
return Ok(());
}
// 使用 SIMD 进行编码
let simd_result = self.encode_with_simd(&mut shards_vec);
match simd_result {
@@ -176,7 +175,6 @@ impl ReedSolomonEncoder {
.find_map(|s| s.as_ref().map(|v| v.len()))
.ok_or_else(|| io::Error::other("No valid shards found for reconstruction"))?;
// 获取或创建decoder
let mut decoder = {
let mut cache_guard = self
.decoder_cache
@@ -185,21 +183,17 @@ impl ReedSolomonEncoder {
match cache_guard.take() {
Some(mut cached_decoder) => {
// 使用reset方法重置现有decoder
if let Err(e) = cached_decoder.reset(self.data_shards, self.parity_shards, shard_len) {
warn!("Failed to reset SIMD decoder: {:?}, creating new one", e);
// 如果reset失败创建新的decoder
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {e:?}")))?
} else {
cached_decoder
}
}
None => {
// 第一次使用创建新decoder
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {e:?}")))?
}
None => reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {e:?}")))?,
}
};
@@ -235,8 +229,7 @@ impl ReedSolomonEncoder {
}
}
// 将decoder放回缓存在result被drop后decoder自动重置可以重用
drop(result); // 显式drop result确保decoder被重置
drop(result);
*self
.decoder_cache
@@ -262,7 +255,7 @@ impl ReedSolomonEncoder {
/// - `_buf`: Internal buffer for block operations.
///
/// # Example
/// ```
/// ```ignore
/// use rustfs_ecstore::erasure_coding::Erasure;
/// let erasure = Erasure::new(4, 2, 8);
/// let data = b"hello world";

View File

@@ -1056,6 +1056,10 @@ async fn merge_entry_channels(
out_channel: Sender<MetaCacheEntry>,
read_quorum: usize,
) -> Result<()> {
if in_channels.is_empty() {
return Ok(());
}
let mut in_channels = in_channels;
if in_channels.len() == 1 {
loop {
@@ -1092,18 +1096,18 @@ async fn merge_entry_channels(
return Ok(());
}
let mut best: Option<MetaCacheEntry> = None;
let mut best = top[0].clone();
let mut best_idx = 0;
to_merge.clear();
// FIXME: top move when select_from call
let vtop = top.clone();
// let vtop = top.clone();
for (i, other) in vtop.iter().enumerate() {
if let Some(other_entry) = other {
// let vtop = top.as_slice();
for other_idx in 1..top.len() {
if let Some(other_entry) = &top[other_idx] {
if let Some(best_entry) = &best {
let other_idx = i;
// println!("get other_entry {:?}", other_entry.name);
if path::clean(&best_entry.name) == path::clean(&other_entry.name) {
@@ -1130,21 +1134,20 @@ async fn merge_entry_channels(
best_idx = other_idx;
continue;
}
} else if best_entry.name > other_entry.name {
}
if best_entry.name > other_entry.name {
to_merge.clear();
best = Some(other_entry.clone());
best_idx = i;
best_idx = other_idx;
}
} else {
best = Some(other_entry.clone());
best_idx = i;
best_idx = other_idx;
}
}
}
// println!("get best_entry {} {:?}", &best_idx, &best.clone().unwrap_or_default().name);
// TODO:
if !to_merge.is_empty() {
if let Some(entry) = &best {
let mut versions = Vec::with_capacity(to_merge.len() + 1);
@@ -1156,9 +1159,9 @@ async fn merge_entry_channels(
}
for &idx in to_merge.iter() {
let has_entry = { top.get(idx).cloned() };
let has_entry = top[idx].clone();
if let Some(Some(entry)) = has_entry {
if let Some(entry) = has_entry {
let xl2 = match entry.clone().xl_meta() {
Ok(res) => res,
Err(_) => {
@@ -1204,9 +1207,9 @@ async fn merge_entry_channels(
out_channel.send(best_entry.clone()).await.map_err(Error::other)?;
last = best_entry.name.clone();
}
top[best_idx] = None; // Replace entry we just sent
select_from(&mut in_channels, best_idx, &mut top, &mut n_done).await?;
}
select_from(&mut in_channels, best_idx, &mut top, &mut n_done).await?;
}
}

View File

@@ -2172,9 +2172,25 @@ impl S3 for FS {
};
let delimiter = delimiter.filter(|v| !v.is_empty());
let continuation_token = continuation_token.filter(|v| !v.is_empty());
let start_after = start_after.filter(|v| !v.is_empty());
let continuation_token = continuation_token.filter(|v| !v.is_empty());
// Save the original encoded continuation_token for response
let encoded_continuation_token = continuation_token.clone();
// Decode continuation_token from base64 for internal use
let continuation_token = continuation_token
.map(|token| {
base64_simd::STANDARD
.decode_to_vec(token.as_bytes())
.map_err(|_| s3_error!(InvalidArgument, "Invalid continuation token"))
.and_then(|bytes| {
String::from_utf8(bytes).map_err(|_| s3_error!(InvalidArgument, "Invalid continuation token"))
})
})
.transpose()?;
let store = get_validated_store(&bucket).await?;
let object_infos = store
@@ -2223,12 +2239,17 @@ impl S3 for FS {
.map(|v| CommonPrefix { prefix: Some(v) })
.collect();
// Encode next_continuation_token to base64
let next_continuation_token = object_infos
.next_continuation_token
.map(|token| base64_simd::STANDARD.encode_to_string(token.as_bytes()));
let output = ListObjectsV2Output {
is_truncated: Some(object_infos.is_truncated),
continuation_token: object_infos.continuation_token,
next_continuation_token: object_infos.next_continuation_token,
continuation_token: encoded_continuation_token,
next_continuation_token,
key_count: Some(key_count),
max_keys: Some(key_count),
max_keys: Some(max_keys),
contents: Some(objects),
delimiter,
name: Some(bucket),