From 40660e7b80c115000027fac02d6f635c3c2d1581 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 28 Oct 2025 15:02:43 +0800 Subject: [PATCH] fix: scandir object (#733) * fix: scandir object count * fix: base64 list continuation_token --- .../ecstore/src/cache_value/metacache_set.rs | 2 +- crates/ecstore/src/disk/local.rs | 67 ++++++++++--------- crates/ecstore/src/erasure_coding/erasure.rs | 19 ++---- crates/ecstore/src/store_list_objects.rs | 35 +++++----- rustfs/src/storage/ecfs.rs | 29 ++++++-- 5 files changed, 87 insertions(+), 65 deletions(-) diff --git a/crates/ecstore/src/cache_value/metacache_set.rs b/crates/ecstore/src/cache_value/metacache_set.rs index eef250eb..a02a376b 100644 --- a/crates/ecstore/src/cache_value/metacache_set.rs +++ b/crates/ecstore/src/cache_value/metacache_set.rs @@ -108,7 +108,7 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d } if cancel_rx_clone.is_cancelled() { - // warn!("list_path_raw: cancel_rx_clone.try_recv().await.is_ok()"); + // warn!("list_path_raw: cancel_rx_clone.is_cancelled()"); return Ok(()); } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 453036fa..cccc1e26 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -984,7 +984,8 @@ impl LocalDisk { #[async_recursion::async_recursion] async fn scan_dir( &self, - current: &mut String, + mut current: String, + mut prefix: String, opts: &WalkDirOptions, out: &mut MetacacheWriter, objs_returned: &mut i32, @@ -1022,14 +1023,16 @@ impl LocalDisk { return Ok(()); } - let mut entries = match self.list_dir("", &opts.bucket, current, -1).await { + // TODO: add lock + + let mut entries = match self.list_dir("", &opts.bucket, ¤t, -1).await { Ok(res) => res, Err(e) => { if e != DiskError::VolumeNotFound && e != Error::FileNotFound { - debug!("scan list_dir {}, err {:?}", ¤t, &e); + error!("scan list_dir {}, err {:?}", ¤t, &e); } - if opts.report_notfound && e == Error::FileNotFound && current == &opts.base_dir { + if opts.report_notfound && e == Error::FileNotFound && current == opts.base_dir { return Err(DiskError::FileNotFound); } @@ -1041,8 +1044,7 @@ impl LocalDisk { return Ok(()); } - let s = SLASH_SEPARATOR.chars().next().unwrap_or_default(); - *current = current.trim_matches(s).to_owned(); + current = current.trim_matches('/').to_owned(); let bucket = opts.bucket.as_str(); @@ -1056,11 +1058,9 @@ impl LocalDisk { return Ok(()); } // check prefix - if let Some(filter_prefix) = &opts.filter_prefix { - if !entry.starts_with(filter_prefix) { - *item = "".to_owned(); - continue; - } + if !prefix.is_empty() && !entry.starts_with(prefix.as_str()) { + *item = "".to_owned(); + continue; } if let Some(forward) = &forward { @@ -1085,46 +1085,48 @@ impl LocalDisk { *item = "".to_owned(); if entry.ends_with(STORAGE_FORMAT_FILE) { - // let metadata = self .read_metadata(self.get_object_path(bucket, format!("{}/{}", ¤t, &entry).as_str())?) .await?; - // 用 strip_suffix 只删除一次 let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned(); let name = entry.trim_end_matches(SLASH_SEPARATOR); let name = decode_dir_object(format!("{}/{}", ¤t, &name).as_str()); + // if opts.limit > 0 + // && let Ok(meta) = FileMeta::load(&metadata) + // && !meta.all_hidden(true) + // { + *objs_returned += 1; + // } + out.write_obj(&MetaCacheEntry { name: name.clone(), metadata, ..Default::default() }) .await?; - *objs_returned += 1; - // warn!("scan list_dir {}, write_obj done, name: {:?}", ¤t, &name); return Ok(()); } } entries.sort(); - let mut entries = entries.as_slice(); if let Some(forward) = &forward { for (i, entry) in entries.iter().enumerate() { if entry >= forward || forward.starts_with(entry.as_str()) { - entries = &entries[i..]; + entries.drain(..i); break; } } } let mut dir_stack: Vec = Vec::with_capacity(5); + prefix = "".to_owned(); for entry in entries.iter() { if opts.limit > 0 && *objs_returned >= opts.limit { - // warn!("scan list_dir {}, limit reached 2", ¤t); return Ok(()); } @@ -1132,7 +1134,7 @@ impl LocalDisk { continue; } - let name = path_join_buf(&[current, entry]); + let name = path_join_buf(&[current.as_str(), entry.as_str()]); if !dir_stack.is_empty() { if let Some(pop) = dir_stack.last().cloned() { @@ -1144,9 +1146,7 @@ impl LocalDisk { .await?; if opts.recursive { - let mut opts = opts.clone(); - opts.filter_prefix = None; - if let Err(er) = Box::pin(self.scan_dir(&mut pop.clone(), &opts, out, objs_returned)).await { + if let Err(er) = Box::pin(self.scan_dir(pop, prefix.clone(), opts, out, objs_returned)).await { error!("scan_dir err {:?}", er); } } @@ -1181,7 +1181,12 @@ impl LocalDisk { meta.metadata = res; out.write_obj(&meta).await?; + + // if let Ok(meta) = FileMeta::load(&meta.metadata) + // && !meta.all_hidden(true) + // { *objs_returned += 1; + // } } Err(err) => { if err == Error::FileNotFound || err == Error::IsNotRegular { @@ -1200,7 +1205,6 @@ impl LocalDisk { while let Some(dir) = dir_stack.pop() { if opts.limit > 0 && *objs_returned >= opts.limit { - // warn!("scan list_dir {}, limit reached 3", ¤t); return Ok(()); } @@ -1209,19 +1213,14 @@ impl LocalDisk { ..Default::default() }) .await?; - *objs_returned += 1; if opts.recursive { - let mut dir = dir; - let mut opts = opts.clone(); - opts.filter_prefix = None; - if let Err(er) = Box::pin(self.scan_dir(&mut dir, &opts, out, objs_returned)).await { + if let Err(er) = Box::pin(self.scan_dir(dir, prefix.clone(), opts, out, objs_returned)).await { warn!("scan_dir err {:?}", &er); } } } - // warn!("scan list_dir {}, done", ¤t); Ok(()) } } @@ -1884,8 +1883,14 @@ impl DiskAPI for LocalDisk { } } - let mut current = opts.base_dir.clone(); - self.scan_dir(&mut current, &opts, &mut out, &mut objs_returned).await?; + self.scan_dir( + opts.base_dir.clone(), + opts.filter_prefix.clone().unwrap_or_default(), + &opts, + &mut out, + &mut objs_returned, + ) + .await?; Ok(()) } diff --git a/crates/ecstore/src/erasure_coding/erasure.rs b/crates/ecstore/src/erasure_coding/erasure.rs index ad6829d5..37533815 100644 --- a/crates/ecstore/src/erasure_coding/erasure.rs +++ b/crates/ecstore/src/erasure_coding/erasure.rs @@ -27,7 +27,7 @@ //! //! ## Example //! -//! ```rust +//! ```ignore //! use rustfs_ecstore::erasure_coding::Erasure; //! //! let erasure = Erasure::new(4, 2, 1024); // 4 data shards, 2 parity shards, 1KB block size @@ -83,7 +83,6 @@ impl ReedSolomonEncoder { return Ok(()); } - // 使用 SIMD 进行编码 let simd_result = self.encode_with_simd(&mut shards_vec); match simd_result { @@ -176,7 +175,6 @@ impl ReedSolomonEncoder { .find_map(|s| s.as_ref().map(|v| v.len())) .ok_or_else(|| io::Error::other("No valid shards found for reconstruction"))?; - // 获取或创建decoder let mut decoder = { let mut cache_guard = self .decoder_cache @@ -185,21 +183,17 @@ impl ReedSolomonEncoder { match cache_guard.take() { Some(mut cached_decoder) => { - // 使用reset方法重置现有decoder if let Err(e) = cached_decoder.reset(self.data_shards, self.parity_shards, shard_len) { warn!("Failed to reset SIMD decoder: {:?}, creating new one", e); - // 如果reset失败,创建新的decoder + reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len) .map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {e:?}")))? } else { cached_decoder } } - None => { - // 第一次使用,创建新decoder - reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len) - .map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {e:?}")))? - } + None => reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len) + .map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {e:?}")))?, } }; @@ -235,8 +229,7 @@ impl ReedSolomonEncoder { } } - // 将decoder放回缓存(在result被drop后decoder自动重置,可以重用) - drop(result); // 显式drop result,确保decoder被重置 + drop(result); *self .decoder_cache @@ -262,7 +255,7 @@ impl ReedSolomonEncoder { /// - `_buf`: Internal buffer for block operations. /// /// # Example -/// ``` +/// ```ignore /// use rustfs_ecstore::erasure_coding::Erasure; /// let erasure = Erasure::new(4, 2, 8); /// let data = b"hello world"; diff --git a/crates/ecstore/src/store_list_objects.rs b/crates/ecstore/src/store_list_objects.rs index 5dc1c4a8..3c370eb2 100644 --- a/crates/ecstore/src/store_list_objects.rs +++ b/crates/ecstore/src/store_list_objects.rs @@ -1056,6 +1056,10 @@ async fn merge_entry_channels( out_channel: Sender, read_quorum: usize, ) -> Result<()> { + if in_channels.is_empty() { + return Ok(()); + } + let mut in_channels = in_channels; if in_channels.len() == 1 { loop { @@ -1092,18 +1096,18 @@ async fn merge_entry_channels( return Ok(()); } - let mut best: Option = None; + let mut best = top[0].clone(); let mut best_idx = 0; to_merge.clear(); // FIXME: top move when select_from call - let vtop = top.clone(); + // let vtop = top.clone(); - for (i, other) in vtop.iter().enumerate() { - if let Some(other_entry) = other { + // let vtop = top.as_slice(); + + for other_idx in 1..top.len() { + if let Some(other_entry) = &top[other_idx] { if let Some(best_entry) = &best { - let other_idx = i; - // println!("get other_entry {:?}", other_entry.name); if path::clean(&best_entry.name) == path::clean(&other_entry.name) { @@ -1130,21 +1134,20 @@ async fn merge_entry_channels( best_idx = other_idx; continue; } - } else if best_entry.name > other_entry.name { + } + + if best_entry.name > other_entry.name { to_merge.clear(); best = Some(other_entry.clone()); - best_idx = i; + best_idx = other_idx; } } else { best = Some(other_entry.clone()); - best_idx = i; + best_idx = other_idx; } } } - // println!("get best_entry {} {:?}", &best_idx, &best.clone().unwrap_or_default().name); - - // TODO: if !to_merge.is_empty() { if let Some(entry) = &best { let mut versions = Vec::with_capacity(to_merge.len() + 1); @@ -1156,9 +1159,9 @@ async fn merge_entry_channels( } for &idx in to_merge.iter() { - let has_entry = { top.get(idx).cloned() }; + let has_entry = top[idx].clone(); - if let Some(Some(entry)) = has_entry { + if let Some(entry) = has_entry { let xl2 = match entry.clone().xl_meta() { Ok(res) => res, Err(_) => { @@ -1204,9 +1207,9 @@ async fn merge_entry_channels( out_channel.send(best_entry.clone()).await.map_err(Error::other)?; last = best_entry.name.clone(); } - top[best_idx] = None; // Replace entry we just sent - select_from(&mut in_channels, best_idx, &mut top, &mut n_done).await?; } + + select_from(&mut in_channels, best_idx, &mut top, &mut n_done).await?; } } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index fb9a2cc6..0e18e3c1 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -2172,9 +2172,25 @@ impl S3 for FS { }; let delimiter = delimiter.filter(|v| !v.is_empty()); - let continuation_token = continuation_token.filter(|v| !v.is_empty()); let start_after = start_after.filter(|v| !v.is_empty()); + let continuation_token = continuation_token.filter(|v| !v.is_empty()); + + // Save the original encoded continuation_token for response + let encoded_continuation_token = continuation_token.clone(); + + // Decode continuation_token from base64 for internal use + let continuation_token = continuation_token + .map(|token| { + base64_simd::STANDARD + .decode_to_vec(token.as_bytes()) + .map_err(|_| s3_error!(InvalidArgument, "Invalid continuation token")) + .and_then(|bytes| { + String::from_utf8(bytes).map_err(|_| s3_error!(InvalidArgument, "Invalid continuation token")) + }) + }) + .transpose()?; + let store = get_validated_store(&bucket).await?; let object_infos = store @@ -2223,12 +2239,17 @@ impl S3 for FS { .map(|v| CommonPrefix { prefix: Some(v) }) .collect(); + // Encode next_continuation_token to base64 + let next_continuation_token = object_infos + .next_continuation_token + .map(|token| base64_simd::STANDARD.encode_to_string(token.as_bytes())); + let output = ListObjectsV2Output { is_truncated: Some(object_infos.is_truncated), - continuation_token: object_infos.continuation_token, - next_continuation_token: object_infos.next_continuation_token, + continuation_token: encoded_continuation_token, + next_continuation_token, key_count: Some(key_count), - max_keys: Some(key_count), + max_keys: Some(max_keys), contents: Some(objects), delimiter, name: Some(bucket),