From ff23706b1066314b7161464f5ecb331e45696bec Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 24 Dec 2024 17:15:20 +0800 Subject: [PATCH] marker done, need test --- ecstore/src/cache_value/metacache_set.rs | 4 +- ecstore/src/disk/local.rs | 47 ++- ecstore/src/disk/mod.rs | 150 +++++--- ecstore/src/set_disk.rs | 44 ++- ecstore/src/sets.rs | 18 +- ecstore/src/store.rs | 58 +-- ecstore/src/store_api.rs | 34 +- ecstore/src/store_list_objects.rs | 440 ++++++++++++++++------- iam/src/store/object.rs | 2 +- rustfs/src/grpc.rs | 2 +- rustfs/src/storage/ecfs.rs | 41 +-- 11 files changed, 545 insertions(+), 295 deletions(-) diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index 6409bd78..5e1ff84d 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -27,8 +27,8 @@ pub struct ListPathRawOptions { pub bucket: String, pub path: String, pub recursice: bool, - pub filter_prefix: String, - pub forward_to: String, + pub filter_prefix: Option, + pub forward_to: Option, pub min_disks: usize, pub report_not_found: bool, pub per_disk_limit: i32, diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 0b6da5af..d4082592 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -729,16 +729,29 @@ impl LocalDisk { objs_returned: &mut i32, ) -> Result<()> { let forward = { - if !opts.forward_to.is_empty() && opts.forward_to.starts_with(&*current) { - let forward = opts.forward_to.trim_start_matches(&*current); + opts.forward_to.as_ref().filter(|v| v.starts_with(&*current)).map(|v| { + let forward = v.trim_start_matches(&*current); if let Some(idx) = forward.find('/') { - &forward[..idx] + forward[..idx].to_owned() } else { - forward + forward.to_owned() } - } else { - "" - } + }) + // if let Some(forward_to) = &opts.forward_to { + + // } else { + // None + // } + // if !opts.forward_to.is_empty() && opts.forward_to.starts_with(&*current) { + // let forward = opts.forward_to.trim_start_matches(&*current); + // if let Some(idx) = forward.find('/') { + // &forward[..idx] + // } else { + // forward + // } + // } else { + // "" + // } }; if opts.limit > 0 && *objs_returned >= opts.limit { @@ -781,14 +794,18 @@ impl LocalDisk { return Ok(()); } // check prefix - if !opts.filter_prefix.is_empty() && !entry.starts_with(&opts.filter_prefix) { - *item = "".to_owned(); - continue; + if let Some(filter_prefix) = &opts.filter_prefix { + if !entry.starts_with(filter_prefix) { + *item = "".to_owned(); + continue; + } } - if !forward.is_empty() && entry.as_str() < forward { - *item = "".to_owned(); - continue; + if let Some(forward) = &forward { + if &entry < forward { + *item = "".to_owned(); + continue; + } } if entry.ends_with(SLASH_SEPARATOR) { @@ -828,9 +845,9 @@ impl LocalDisk { entries.sort(); let mut entries = entries.as_slice(); - if !forward.is_empty() { + if let Some(forward) = &forward { for (i, entry) in entries.iter().enumerate() { - if entry.as_str() >= forward || forward.starts_with(entry.as_str()) { + if entry >= forward || forward.starts_with(entry.as_str()) { entries = &entries[i..]; break; } diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 254a2c9f..dc6333b8 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -25,6 +25,7 @@ use crate::{ heal_commands::{HealScanMode, HealingTracker}, }, store_api::{FileInfo, ObjectInfo, RawFileInfo}, + utils::path::SLASH_SEPARATOR, }; use endpoint::Endpoint; use error::DiskError; @@ -566,8 +567,6 @@ impl FileInfoVersions { let vid = Uuid::parse_str(v).unwrap_or(Uuid::nil()); - for ver in self.versions.iter() {} - self.versions.iter().position(|v| v.version_id == Some(vid)) } } @@ -586,10 +585,10 @@ pub struct WalkDirOptions { // FilterPrefix will only return results with given prefix within folder. // Should never contain a slash. - pub filter_prefix: String, + pub filter_prefix: Option, // ForwardTo will forward to the given object path. - pub forward_to: String, + pub forward_to: Option, // Limit the number of returned objects if > 0. pub limit: i32, @@ -639,10 +638,29 @@ impl MetaCacheEntry { pub fn is_dir(&self) -> bool { self.metadata.is_empty() && self.name.ends_with('/') } + pub fn is_in_dir(&self, dir: &str, separator: &str) -> bool { + if dir.is_empty() { + let idx = self.name.find(separator); + return idx.is_none() || idx.unwrap() == self.name.len() - separator.len(); + } + + let ext = self.name.trim_start_matches(dir); + + if ext.len() != self.name.len() { + let idx = ext.find(separator); + return idx.is_none() || idx.unwrap() == ext.len() - separator.len(); + } + + false + } pub fn is_object(&self) -> bool { !self.metadata.is_empty() } + pub fn is_object_dir(&self) -> bool { + !self.metadata.is_empty() && self.name.ends_with(SLASH_SEPARATOR) + } + pub fn is_latest_deletemarker(&mut self) -> bool { if let Some(cached) = &self.cached { if cached.versions.is_empty() { @@ -838,7 +856,7 @@ impl MetaCacheEntry { } } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MetaCacheEntries(pub Vec>); impl MetaCacheEntries { @@ -936,26 +954,50 @@ impl MetaCacheEntries { } } -#[derive(Debug)] +#[derive(Debug, Default)] +pub struct MetaCacheEntriesSortedResult { + pub entries: Option, + pub err: Option, +} + +// impl MetaCacheEntriesSortedResult { +// pub fn entriy_list(&self) -> Vec<&MetaCacheEntry> { +// if let Some(entries) = &self.entries { +// entries.entries() +// } else { +// Vec::new() +// } +// } +// } + +#[derive(Debug, Default)] pub struct MetaCacheEntriesSorted { pub o: MetaCacheEntries, - // pub list_id: String, - // pub reuse: bool, - // pub lastSkippedEntry: String, + pub list_id: Option, + pub reuse: bool, + pub last_skipped_entry: Option, } impl MetaCacheEntriesSorted { - pub fn entries(&self) -> Vec { - let entries: Vec = self.o.0.iter().flatten().cloned().collect(); + pub fn entries(&self) -> Vec<&MetaCacheEntry> { + let entries: Vec<&MetaCacheEntry> = self.o.0.iter().flatten().collect(); entries } - pub async fn file_infos(&self, bucket: &str, prefix: &str, delimiter: &str) -> Vec { + pub fn forward_past(&mut self, marker: Option) { + if let Some(val) = marker { + // TODO: reuse + if let Some(idx) = self.o.0.iter().flatten().position(|v| v.name > val) { + self.o.0 = self.o.0.split_off(idx); + } + } + } + pub async fn file_infos(&self, bucket: &str, prefix: &str, delimiter: Option) -> Vec { let vcfg = get_versioning_config(bucket).await.ok(); let mut objects = Vec::with_capacity(self.o.as_ref().len()); let mut prev_prefix = ""; for entry in self.o.as_ref().iter().flatten() { if entry.is_object() { - if !delimiter.is_empty() { + if let Some(delimiter) = &delimiter { if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { let idx = prefix.len() + idx + delimiter.len(); if let Some(curr_prefix) = entry.name.get(0..idx) { @@ -985,25 +1027,23 @@ impl MetaCacheEntriesSorted { } if entry.is_dir() { - if delimiter.is_empty() { - continue; - } + if let Some(delimiter) = &delimiter { + if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { + let idx = prefix.len() + idx + delimiter.len(); + if let Some(curr_prefix) = entry.name.get(0..idx) { + if curr_prefix == prev_prefix { + continue; + } - if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { - let idx = prefix.len() + idx + delimiter.len(); - if let Some(curr_prefix) = entry.name.get(0..idx) { - if curr_prefix == prev_prefix { - continue; + prev_prefix = curr_prefix; + + objects.push(ObjectInfo { + is_dir: true, + bucket: bucket.to_owned(), + name: curr_prefix.to_owned(), + ..Default::default() + }); } - - prev_prefix = curr_prefix; - - objects.push(ObjectInfo { - is_dir: true, - bucket: bucket.to_owned(), - name: curr_prefix.to_owned(), - ..Default::default() - }); } } } @@ -1012,14 +1052,20 @@ impl MetaCacheEntriesSorted { objects } - pub async fn file_info_versions(&self, bucket: &str, prefix: &str, delimiter: &str, after_v: &str) -> Vec { + pub async fn file_info_versions( + &self, + bucket: &str, + prefix: &str, + delimiter: Option, + after_v: Option, + ) -> Vec { let vcfg = get_versioning_config(bucket).await.ok(); let mut objects = Vec::with_capacity(self.o.as_ref().len()); let mut prev_prefix = ""; let mut after_v = after_v; for entry in self.o.as_ref().iter().flatten() { if entry.is_object() { - if !delimiter.is_empty() { + if let Some(delimiter) = &delimiter { if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { let idx = prefix.len() + idx + delimiter.len(); if let Some(curr_prefix) = entry.name.get(0..idx) { @@ -1049,13 +1095,13 @@ impl MetaCacheEntriesSorted { }; let fi_versions = 'c: { - if !after_v.is_empty() { - if let Some(idx) = fiv.find_version_index(after_v) { - after_v = ""; + if let Some(after_val) = &after_v { + if let Some(idx) = fiv.find_version_index(after_val) { + after_v = None; break 'c fiv.versions.split_off(idx + 1); } - after_v = ""; + after_v = None; break 'c fiv.versions; } else { break 'c fiv.versions; @@ -1073,25 +1119,23 @@ impl MetaCacheEntriesSorted { } if entry.is_dir() { - if delimiter.is_empty() { - continue; - } + if let Some(delimiter) = &delimiter { + if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { + let idx = prefix.len() + idx + delimiter.len(); + if let Some(curr_prefix) = entry.name.get(0..idx) { + if curr_prefix == prev_prefix { + continue; + } - if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) { - let idx = prefix.len() + idx + delimiter.len(); - if let Some(curr_prefix) = entry.name.get(0..idx) { - if curr_prefix == prev_prefix { - continue; + prev_prefix = curr_prefix; + + objects.push(ObjectInfo { + is_dir: true, + bucket: bucket.to_owned(), + name: curr_prefix.to_owned(), + ..Default::default() + }); } - - prev_prefix = curr_prefix; - - objects.push(ObjectInfo { - is_dir: true, - bucket: bucket.to_owned(), - name: curr_prefix.to_owned(), - ..Default::default() - }); } } } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 59c77eaf..6883736b 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1907,9 +1907,15 @@ impl SetDisks { fallback_disks: fallback_disks.to_vec(), bucket: bucket.to_string(), path: path.to_string(), - filter_prefix: filter_prefix.to_string(), + filter_prefix: { + if filter_prefix.is_empty() { + None + } else { + Some(filter_prefix.to_string()) + } + }, recursice: true, - forward_to: "".to_string(), + forward_to: None, min_disks: 1, report_not_found: false, per_disk_limit: 0, @@ -3067,10 +3073,10 @@ impl SetDisks { continue; } - let mut forward_to = "".to_string(); + let mut forward_to = None; let b = tracker.read().await.get_bucket().await; if b == *bucket { - forward_to = tracker.read().await.get_object().await; + forward_to = Some(tracker.read().await.get_object().await); } if !b.is_empty() { @@ -3835,11 +3841,11 @@ impl StorageAPI for SetDisks { self: Arc, _bucket: &str, _prefix: &str, - _continuation_token: &str, - _delimiter: &str, + _continuation_token: Option, + _delimiter: Option, _max_keys: i32, _fetch_owner: bool, - _start_after: &str, + _start_after: Option, ) -> Result { unimplemented!() } @@ -3847,9 +3853,9 @@ impl StorageAPI for SetDisks { self: Arc, _bucket: &str, _prefix: &str, - _marker: &str, - _version_marker: &str, - _delimiter: &str, + _marker: Option, + _version_marker: Option, + _delimiter: Option, _max_keys: i32, ) -> Result { unimplemented!() @@ -4113,9 +4119,9 @@ impl StorageAPI for SetDisks { &self, bucket: &str, object: &str, - key_marker: &str, - upload_id_marker: &str, - delimiter: &str, + key_marker: Option, + upload_id_marker: Option, + delimiter: Option, max_uploads: usize, ) -> Result { let disks = { @@ -4209,14 +4215,14 @@ impl StorageAPI for SetDisks { uploads.sort_by(|a, b| a.initiated.cmp(&b.initiated)); let mut upload_idx = 0; - if !upload_id_marker.is_empty() { + if let Some(upload_id_marker) = &upload_id_marker { while upload_idx < uploads.len() { - if uploads[upload_idx].upload_id != upload_id_marker { + if &uploads[upload_idx].upload_id != upload_id_marker { upload_idx += 1; continue; } - if uploads[upload_idx].upload_id == upload_id_marker { + if &uploads[upload_idx].upload_id == upload_id_marker { upload_idx += 1; break; } @@ -4226,10 +4232,10 @@ impl StorageAPI for SetDisks { } let mut ret_uploads = Vec::new(); - let mut next_upload_id_marker = String::new(); + let mut next_upload_id_marker = None; while upload_idx < uploads.len() { ret_uploads.push(uploads[upload_idx].clone()); - next_upload_id_marker = uploads[upload_idx].upload_id.clone(); + next_upload_id_marker = Some(uploads[upload_idx].upload_id.clone()); upload_idx += 1; if ret_uploads.len() > max_uploads { @@ -4240,7 +4246,7 @@ impl StorageAPI for SetDisks { let is_truncated = ret_uploads.len() < uploads.len(); if !is_truncated { - next_upload_id_marker = "".to_owned(); + next_upload_id_marker = None; } Ok(ListMultipartsInfo { diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 8955ef4c..f15b6c91 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -454,11 +454,11 @@ impl StorageAPI for Sets { self: Arc, _bucket: &str, _prefix: &str, - _continuation_token: &str, - _delimiter: &str, + _continuation_token: Option, + _delimiter: Option, _max_keys: i32, _fetch_owner: bool, - _start_after: &str, + _start_after: Option, ) -> Result { unimplemented!() } @@ -466,9 +466,9 @@ impl StorageAPI for Sets { self: Arc, _bucket: &str, _prefix: &str, - _marker: &str, - _version_marker: &str, - _delimiter: &str, + _marker: Option, + _version_marker: Option, + _delimiter: Option, _max_keys: i32, ) -> Result { unimplemented!() @@ -527,9 +527,9 @@ impl StorageAPI for Sets { &self, bucket: &str, prefix: &str, - key_marker: &str, - upload_id_marker: &str, - delimiter: &str, + key_marker: Option, + upload_id_marker: Option, + delimiter: Option, max_uploads: usize, ) -> Result { self.get_disks_by_key(prefix) diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 1d84e39c..c9d52e6f 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -23,7 +23,6 @@ use crate::store_err::{ to_object_err, StorageError, }; use crate::store_init::ec_drives_no_config; -use crate::store_list_objects::{max_keys_plus_one, ListPathOptions}; use crate::utils::crypto::base64_decode; use crate::utils::path::{decode_dir_object, encode_dir_object, SLASH_SEPARATOR}; use crate::utils::xml; @@ -1519,11 +1518,11 @@ impl StorageAPI for ECStore { self: Arc, bucket: &str, prefix: &str, - continuation_token: &str, - delimiter: &str, + continuation_token: Option, + delimiter: Option, max_keys: i32, fetch_owner: bool, - start_after: &str, + start_after: Option, ) -> Result { self.inner_list_objects_v2(bucket, prefix, continuation_token, delimiter, max_keys, fetch_owner, start_after) .await @@ -1532,9 +1531,9 @@ impl StorageAPI for ECStore { self: Arc, bucket: &str, prefix: &str, - marker: &str, - version_marker: &str, - delimiter: &str, + marker: Option, + version_marker: Option, + delimiter: Option, max_keys: i32, ) -> Result { self.inner_list_object_versions(bucket, prefix, marker, version_marker, delimiter, max_keys) @@ -1672,12 +1671,12 @@ impl StorageAPI for ECStore { &self, bucket: &str, prefix: &str, - key_marker: &str, - upload_id_marker: &str, - delimiter: &str, + key_marker: Option, + upload_id_marker: Option, + delimiter: Option, max_uploads: usize, ) -> Result { - check_list_multipart_args(bucket, prefix, key_marker, upload_id_marker, delimiter)?; + check_list_multipart_args(bucket, prefix, &key_marker, &upload_id_marker, &delimiter)?; if prefix.is_empty() { // TODO: return from cache @@ -1693,14 +1692,21 @@ impl StorageAPI for ECStore { for pool in self.pools.iter() { let res = pool - .list_multipart_uploads(bucket, prefix, key_marker, upload_id_marker, delimiter, max_uploads) + .list_multipart_uploads( + bucket, + prefix, + key_marker.clone(), + upload_id_marker.clone(), + delimiter.clone(), + max_uploads, + ) .await?; uploads.extend(res.uploads); } Ok(ListMultipartsInfo { - key_marker: key_marker.to_owned(), - upload_id_marker: upload_id_marker.to_owned(), + key_marker, + upload_id_marker, max_uploads, uploads, prefix: prefix.to_owned(), @@ -1718,7 +1724,7 @@ impl StorageAPI for ECStore { for (idx, pool) in self.pools.iter().enumerate() { // // TODO: IsSuspended let res = pool - .list_multipart_uploads(bucket, object, "", "", "", MAX_UPLOADS_LIST) + .list_multipart_uploads(bucket, object, None, None, None, MAX_UPLOADS_LIST) .await?; if !res.uploads.is_empty() { @@ -2203,7 +2209,7 @@ fn check_bucket_and_object_names(bucket: &str, object: &str) -> Result<()> { Ok(()) } -pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &str) -> Result<()> { +pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &Option) -> Result<()> { if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() { return Err(Error::new(StorageError::BucketNameInvalid(bucket.to_string()))); } @@ -2218,18 +2224,20 @@ pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &str) -> Result fn check_list_multipart_args( bucket: &str, prefix: &str, - key_marker: &str, - upload_id_marker: &str, - _delimiter: &str, + key_marker: &Option, + upload_id_marker: &Option, + _delimiter: &Option, ) -> Result<()> { check_list_objs_args(bucket, prefix, key_marker)?; - if !upload_id_marker.is_empty() { - if key_marker.ends_with('/') { - return Err(Error::new(StorageError::InvalidUploadIDKeyCombination( - upload_id_marker.to_string(), - key_marker.to_string(), - ))); + if let Some(upload_id_marker) = upload_id_marker { + if let Some(key_marker) = key_marker { + if key_marker.ends_with('/') { + return Err(Error::new(StorageError::InvalidUploadIDKeyCombination( + upload_id_marker.to_string(), + key_marker.to_string(), + ))); + } } if let Err(_e) = base64_decode(upload_id_marker.as_bytes()) { diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 1cbcc487..cd1eb6b7 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -688,7 +688,7 @@ pub struct ListObjectsInfo { // When response is truncated (the IsTruncated element value in the response // is true), you can use the key name in this field as marker in the subsequent // request to get next set of objects. - pub next_marker: String, + pub next_marker: Option, // List of objects info for this request. pub objects: Vec, @@ -711,8 +711,8 @@ pub struct ListObjectsV2Info { // // NOTE: This element is returned only if you have delimiter request parameter // specified. - pub continuation_token: String, - pub next_continuation_token: String, + pub continuation_token: Option, + pub next_continuation_token: Option, // List of objects info for this request. pub objects: Vec, @@ -744,20 +744,20 @@ pub struct MultipartInfo { pub struct ListMultipartsInfo { // Together with upload-id-marker, this parameter specifies the multipart upload // after which listing should begin. - pub key_marker: String, + pub key_marker: Option, // Together with key-marker, specifies the multipart upload after which listing // should begin. If key-marker is not specified, the upload-id-marker parameter // is ignored. - pub upload_id_marker: String, + pub upload_id_marker: Option, // When a list is truncated, this element specifies the value that should be // used for the key-marker request parameter in a subsequent request. - pub next_key_marker: String, + pub next_key_marker: Option, // When a list is truncated, this element specifies the value that should be // used for the upload-id-marker request parameter in a subsequent request. - pub next_upload_id_marker: String, + pub next_upload_id_marker: Option, // Maximum number of multipart uploads that could have been included in the // response. @@ -778,7 +778,7 @@ pub struct ListMultipartsInfo { // A character used to truncate the object prefixes. // NOTE: only supported delimiter is '/'. - pub delimiter: String, + pub delimiter: Option, // CommonPrefixes contains all (if there are any) keys between Prefix and the // next occurrence of the string specified by delimiter. @@ -846,20 +846,20 @@ pub trait StorageAPI: ObjectIO { self: Arc, bucket: &str, prefix: &str, - continuation_token: &str, - delimiter: &str, + continuation_token: Option, + delimiter: Option, max_keys: i32, fetch_owner: bool, - start_after: &str, + start_after: Option, ) -> Result; // ListObjectVersions TODO: FIXME: async fn list_object_versions( self: Arc, bucket: &str, prefix: &str, - marker: &str, - version_marker: &str, - delimiter: &str, + marker: Option, + version_marker: Option, + delimiter: Option, max_keys: i32, ) -> Result; // Walk TODO: @@ -884,9 +884,9 @@ pub trait StorageAPI: ObjectIO { &self, bucket: &str, prefix: &str, - key_marker: &str, - upload_id_marker: &str, - delimiter: &str, + key_marker: Option, + upload_id_marker: Option, + delimiter: Option, max_uploads: usize, ) -> Result; async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 95c2dd9d..f4b55c33 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -1,13 +1,16 @@ use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions}; use crate::disk::error::{is_all_not_found, is_all_volume_not_found, is_err_eof, DiskError}; -use crate::disk::{DiskInfo, DiskStore, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntry, MetadataResolutionParams}; +use crate::disk::{ + DiskInfo, DiskStore, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, + MetadataResolutionParams, +}; use crate::error::{Error, Result}; use crate::file_meta::merge_file_meta_versions; use crate::peer::is_reserved_or_invalid_bucket; use crate::set_disk::SetDisks; use crate::store::check_list_objs_args; use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions}; -use crate::store_err::{is_err_bucket_exists, is_err_bucket_not_found, StorageError}; +use crate::store_err::{is_err_bucket_not_found, to_object_err, StorageError}; use crate::utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR}; use crate::StorageAPI; use crate::{store::ECStore, store_api::ListObjectsV2Info}; @@ -20,6 +23,7 @@ use std::sync::Arc; use tokio::sync::broadcast::{self, Receiver as B_Receiver}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::error; +use uuid::Uuid; const MAX_OBJECT_LIST: i32 = 1000; // const MAX_DELETE_LIST: i32 = 1000; @@ -41,7 +45,7 @@ pub fn max_keys_plus_one(max_keys: i32, add_one: bool) -> i32 { #[derive(Debug, Default, Clone)] pub struct ListPathOptions { - pub id: String, + pub id: Option, // Bucket of the listing. pub bucket: String, @@ -56,11 +60,11 @@ pub struct ListPathOptions { // FilterPrefix will return only results with this prefix when scanning. // Should never contain a slash. // Prefix should still be set. - pub filter_prefix: String, + pub filter_prefix: Option, // Marker to resume listing. // The response will be the first entry >= this object name. - pub marker: String, + pub marker: Option, // Limit the number of results. pub limit: i32, @@ -77,7 +81,7 @@ pub struct ListPathOptions { pub recursive: bool, // Separator to use. - pub separator: String, + pub separator: Option, // Create indicates that the lister should not attempt to load an existing cache. pub create: bool, @@ -94,8 +98,13 @@ pub struct ListPathOptions { pub versioned: bool, pub stop_disk_at_limit: bool, + + pub pool_idx: Option, + pub set_idx: Option, } +const MARKER_TAG_VERSION: &str = "v1"; + impl ListPathOptions { pub fn set_filter(&mut self) { if METACACHE_SHARE_PREFIX { @@ -106,10 +115,79 @@ impl ListPathOptions { } let s = SLASH_SEPARATOR.chars().next().unwrap_or_default(); - self.filter_prefix = self.prefix.trim_start_matches(&self.base_dir).trim_matches(s).to_owned(); + self.filter_prefix = { + let fp = self.prefix.trim_start_matches(&self.base_dir).trim_matches(s); - if self.filter_prefix.contains(s) { - self.filter_prefix = "".to_owned(); + if fp.contains(s) || fp.is_empty() { + None + } else { + Some(fp.to_owned()) + } + } + } + + pub fn parse_marker(&mut self) { + if let Some(marker) = &self.marker { + let s = marker.clone(); + if !s.contains(format!("[rustfs_cache:{}", MARKER_TAG_VERSION).as_str()) { + return; + } + + if let (Some(start_idx), Some(end_idx)) = (s.find("["), s.find("]")) { + self.marker = Some(s[0..start_idx].to_owned()); + let tags: Vec<_> = s[start_idx..end_idx].trim_matches(['[', ']']).split(",").collect(); + + for &tag in tags.iter() { + let kv: Vec<_> = tag.split(":").collect(); + if kv.len() != 2 { + continue; + } + + match kv[0] { + "rustfs_cache" => { + if kv[1] != MARKER_TAG_VERSION { + continue; + } + } + "id" => self.id = Some(kv[1].to_owned()), + "return" => { + self.id = Some(Uuid::new_v4().to_string()); + self.create = true; + } + "p" => match kv[1].parse::() { + Ok(res) => self.pool_idx = Some(res), + Err(_) => { + self.id = Some(Uuid::new_v4().to_string()); + self.create = true; + continue; + } + }, + "s" => match kv[1].parse::() { + Ok(res) => self.set_idx = Some(res), + Err(_) => { + self.id = Some(Uuid::new_v4().to_string()); + self.create = true; + continue; + } + }, + _ => (), + } + } + } + } + } + pub fn encode_marker(&mut self, marker: &str) -> String { + if let Some(id) = &self.id { + format!( + "{}[rustfs_cache:{},id:{},p:{},s:{}]", + marker, + MARKER_TAG_VERSION, + id.to_owned(), + self.pool_idx.unwrap_or_default(), + self.pool_idx.unwrap_or_default(), + ) + } else { + format!("{}[rustfs_cache:{},return:]", marker, MARKER_TAG_VERSION) } } } @@ -124,24 +202,24 @@ impl ECStore { self: Arc, bucket: &str, prefix: &str, - continuation_token: &str, - delimiter: &str, + continuation_token: Option, + delimiter: Option, max_keys: i32, _fetch_owner: bool, - start_after: &str, + start_after: Option, ) -> Result { let marker = { - if continuation_token.is_empty() { + if continuation_token.is_none() { start_after } else { - continuation_token + continuation_token.clone() } }; let loi = self.list_objects_generic(bucket, prefix, marker, delimiter, max_keys).await?; Ok(ListObjectsV2Info { is_truncated: loi.is_truncated, - continuation_token: continuation_token.to_owned(), + continuation_token, next_continuation_token: loi.next_marker, objects: loi.objects, prefixes: loi.prefixes, @@ -152,23 +230,23 @@ impl ECStore { self: Arc, bucket: &str, prefix: &str, - marker: &str, - delimiter: &str, + marker: Option, + delimiter: Option, max_keys: i32, ) -> Result { let opts = ListPathOptions { bucket: bucket.to_owned(), prefix: prefix.to_owned(), - separator: delimiter.to_owned(), - limit: max_keys_plus_one(max_keys, !marker.is_empty()), - marker: marker.to_owned(), + separator: delimiter.clone(), + limit: max_keys_plus_one(max_keys, marker.is_some()), + marker, incl_deleted: false, ask_disks: "strict".to_owned(), //TODO: from config ..Default::default() }; // use get - if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_empty() { + if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() { match self .get_object_info( &opts.bucket, @@ -194,31 +272,46 @@ impl ECStore { }; }; - let mut err_eof = false; - let has_merged = match self.list_path(&opts).await { - Ok(res) => Some(res), - Err(err) => { - if !is_err_eof(&err) { - return Err(err); - } + let mut list_result = match self.list_path(&opts).await { + Ok(res) => res, + Err(err) => MetaCacheEntriesSortedResult { + err: Some(err), + ..Default::default() + }, + }; - err_eof = true; - None + if let Some(err) = &list_result.err { + if !is_err_eof(err) { + return Err(to_object_err(list_result.err.unwrap(), vec![bucket, prefix])); } - }; + } - let mut get_objects = if let Some(merged) = has_merged { - merged.file_infos(bucket, prefix, delimiter).await - } else { - Vec::new() - }; + if let Some(result) = list_result.entries.as_mut() { + result.forward_past(opts.marker); + } + + // contextCanceled + + let mut get_objects = list_result + .entries + .unwrap_or_default() + .file_infos(bucket, prefix, delimiter.clone()) + .await; let is_truncated = { if max_keys > 0 && get_objects.len() > max_keys as usize { get_objects.truncate(max_keys as usize); true } else { - !err_eof && !get_objects.is_empty() + list_result.err.is_none() && !get_objects.is_empty() + } + }; + + let next_marker = { + if is_truncated { + get_objects.last().map(|last| last.name.clone()) + } else { + None } }; @@ -226,35 +319,31 @@ impl ECStore { let mut objects = Vec::with_capacity(get_objects.len()); for obj in get_objects.into_iter() { - if obj.is_dir && obj.mod_time.is_none() && !delimiter.is_empty() { - let mut found = false; - if delimiter != SLASH_SEPARATOR { - for p in prefixes.iter() { - if found { - break; + if let Some(delimiter) = &delimiter { + if obj.is_dir && obj.mod_time.is_none() { + let mut found = false; + if delimiter != SLASH_SEPARATOR { + for p in prefixes.iter() { + if found { + break; + } + found = p == &obj.name; } - found = p == &obj.name; } - } - if !found { - prefixes.push(obj.name.clone()); + if !found { + prefixes.push(obj.name.clone()); + } + } else { + objects.push(obj); } } else { objects.push(obj); } } - let next_marker = { - if is_truncated { - objects.last().map(|last| last.name.clone()) - } else { - None - } - }; - Ok(ListObjectsInfo { is_truncated, - next_marker: next_marker.unwrap_or_default(), + next_marker, objects, prefixes, }) @@ -264,80 +353,64 @@ impl ECStore { self: Arc, bucket: &str, prefix: &str, - marker: &str, - version_marker: &str, - delimiter: &str, + marker: Option, + version_marker: Option, + delimiter: Option, max_keys: i32, ) -> Result { - if marker.is_empty() && !version_marker.is_empty() { + if marker.is_none() && version_marker.is_some() { return Err(Error::new(StorageError::NotImplemented)); } + // if marker set, limit +1 let opts = ListPathOptions { bucket: bucket.to_owned(), prefix: prefix.to_owned(), - separator: delimiter.to_owned(), - limit: max_keys_plus_one(max_keys, !marker.is_empty()), - marker: marker.to_owned(), + separator: delimiter.clone(), + limit: max_keys_plus_one(max_keys, marker.is_some()), + marker, incl_deleted: true, ask_disks: "strict".to_owned(), versioned: true, ..Default::default() }; - let mut err_eof = false; - let has_merged = match self.list_path(&opts).await { - Ok(res) => Some(res), - Err(err) => { - if !is_err_eof(&err) { - return Err(err); - } + let mut list_result = match self.list_path(&opts).await { + Ok(res) => res, + Err(err) => MetaCacheEntriesSortedResult { + err: Some(err), + ..Default::default() + }, + }; - err_eof = true; - None + if let Some(err) = &list_result.err { + if !is_err_eof(err) { + return Err(to_object_err(list_result.err.unwrap(), vec![bucket, prefix])); } - }; + } - let mut get_objects = if let Some(merged) = has_merged { - merged.file_info_versions(bucket, prefix, delimiter, version_marker).await - } else { - Vec::new() - }; + if let Some(result) = list_result.entries.as_mut() { + result.forward_past(opts.marker); + } + + let mut get_objects = list_result + .entries + .unwrap_or_default() + .file_info_versions(bucket, prefix, delimiter.clone(), version_marker) + .await; let is_truncated = { if max_keys > 0 && get_objects.len() > max_keys as usize { get_objects.truncate(max_keys as usize); true } else { - !err_eof && !get_objects.is_empty() + list_result.err.is_none() && !get_objects.is_empty() } }; - let mut prefixes: Vec = Vec::new(); - - let mut objects = Vec::with_capacity(get_objects.len()); - for obj in get_objects.into_iter() { - if obj.is_dir && obj.mod_time.is_none() && !delimiter.is_empty() { - let mut found = false; - if delimiter != SLASH_SEPARATOR { - for p in prefixes.iter() { - if found { - break; - } - found = p == &obj.name; - } - } - if !found { - prefixes.push(obj.name.clone()); - } - } else { - objects.push(obj); - } - } - let (next_marker, next_version_idmarker) = { if is_truncated { - objects + get_objects .last() .map(|last| (Some(last.name.clone()), last.version_id.map(|v| v.to_string()))) .unwrap_or_default() @@ -346,6 +419,32 @@ impl ECStore { } }; + let mut prefixes: Vec = Vec::new(); + + let mut objects = Vec::with_capacity(get_objects.len()); + for obj in get_objects.into_iter() { + if let Some(delimiter) = &delimiter { + if obj.is_dir && obj.mod_time.is_none() { + let mut found = false; + if delimiter != SLASH_SEPARATOR { + for p in prefixes.iter() { + if found { + break; + } + found = p == &obj.name; + } + } + if !found { + prefixes.push(obj.name.clone()); + } + } else { + objects.push(obj); + } + } else { + objects.push(obj); + } + } + Ok(ListObjectVersionsInfo { is_truncated, next_marker, @@ -355,19 +454,21 @@ impl ECStore { }) } - pub async fn list_path(self: Arc, o: &ListPathOptions) -> Result { + pub async fn list_path(self: Arc, o: &ListPathOptions) -> Result { + // warn!("list_path opt {:?}", &o); + check_list_objs_args(&o.bucket, &o.prefix, &o.marker)?; // if opts.prefix.ends_with(SLASH_SEPARATOR) { // return Err(Error::msg("eof")); // } let mut o = o.clone(); - if o.marker < o.prefix { - o.marker = "".to_owned(); - } + o.marker = o.marker.filter(|v| v >= &o.prefix); - if !o.marker.is_empty() && !o.prefix.is_empty() && !o.marker.starts_with(&o.prefix) { - return Err(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof))); + if let Some(marker) = &o.marker { + if !o.prefix.is_empty() && !marker.starts_with(&o.prefix) { + return Err(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof))); + } } if o.limit == 0 { @@ -378,16 +479,18 @@ impl ECStore { return Err(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof))); } - o.include_directories = o.separator == SLASH_SEPARATOR; + let slash_separator = Some(SLASH_SEPARATOR.to_owned()); - if (o.separator == SLASH_SEPARATOR || o.separator.is_empty()) && !o.recursive { - o.recursive = o.separator != SLASH_SEPARATOR; - o.separator = SLASH_SEPARATOR.to_owned(); + o.include_directories = o.separator == slash_separator; + + if (o.separator == slash_separator || o.separator.is_none()) && !o.recursive { + o.recursive = o.separator != slash_separator; + o.separator = slash_separator; } else { o.recursive = true } - // TODO: parseMarker + o.parse_marker(); if o.base_dir.is_empty() { o.base_dir = base_dir_from_prefix(&o.prefix); @@ -421,10 +524,16 @@ impl ECStore { let cancel_rx2 = cancel_rx.resubscribe(); let (result_tx, mut result_rx) = mpsc::channel(1); + let err_tx2 = err_tx.clone(); + let opts = o.clone(); + let job2 = tokio::spawn(async move { + if let Err(err) = gather_results(cancel_rx2, opts, recv, result_tx).await { + error!("gather_results err {:?}", err); + let _ = err_tx2.send(err); + } + }); - let job2 = tokio::spawn(gather_results(cancel_rx2, o, recv, result_tx)); - - let result = { + let mut result = { // receiver result tokio::select! { res = err_rx.recv() =>{ @@ -432,11 +541,12 @@ impl ECStore { match res{ Ok(o) => { error!("list_path err_rx.recv() ok {:?}", &o); - Err(o) + MetaCacheEntriesSortedResult{ entries: None, err: Some(o) } }, Err(err) => { error!("list_path err_rx.recv() err {:?}", &err); - Err(Error::new(err)) + + MetaCacheEntriesSortedResult{ entries: None, err: Some(Error::new(err)) } }, } }, @@ -452,7 +562,28 @@ impl ECStore { // wait spawns exit join_all(vec![job1, job2]).await; - result + if result.err.is_some() { + return Ok(result); + } + + if let Some(entries) = result.entries.as_mut() { + entries.reuse = true; + let truncated = !entries.entries().is_empty() || result.err.is_none(); + entries.o.0.truncate(o.limit as usize); + if !o.transient && truncated { + entries.list_id = if let Some(id) = o.id { + Some(id) + } else { + Some(Uuid::new_v4().to_string()) + } + } + + if !truncated { + result.err = Some(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof))); + } + } + + Ok(result) } // 读所有 @@ -488,6 +619,10 @@ impl ECStore { // let merge_res = merge_entry_channels(rx, inputs, sender.clone(), 1).await; + // TODO: cancelList + + // let merge_res = merge_entry_channels(rx, inputs, sender.clone(), 1).await; + let results = join_all(futures).await; let mut all_at_eof = true; @@ -538,17 +673,15 @@ async fn gather_results( _rx: B_Receiver, opts: ListPathOptions, recv: Receiver, - results_tx: Sender>, -) { + results_tx: Sender, +) -> Result<()> { let mut returned = false; - let mut results = MetaCacheEntriesSorted { - o: MetaCacheEntries(Vec::new()), - }; + + let mut sender = Some(results_tx); let mut recv = recv; - // let mut entrys = Vec::new(); + let mut entrys = Vec::new(); while let Some(mut entry) = recv.recv().await { - // warn!("gather_entrys entry {}", &entry.name); if returned { continue; } @@ -562,21 +695,62 @@ async fn gather_results( continue; } - if !opts.marker.is_empty() && entry.name < opts.marker { + if let Some(marker) = &opts.marker { + if &entry.name < marker { + continue; + } + } + + if !entry.name.starts_with(&opts.prefix) { continue; } - // TODO: other - if opts.limit > 0 && results.o.0.len() >= opts.limit as usize { - returned = true; + if let Some(separator) = &opts.separator { + if !opts.recursive && !entry.is_in_dir(&opts.prefix, separator) { + continue; + } + } + + if !opts.incl_deleted && entry.is_object() && entry.is_latest_deletemarker() && entry.is_object_dir() { continue; } - results.o.0.push(Some(entry)); + // TODO: Lifecycle + + if opts.limit > 0 && entrys.len() >= opts.limit as usize { + if let Some(tx) = sender { + tx.send(MetaCacheEntriesSortedResult { + entries: Some(MetaCacheEntriesSorted { + o: MetaCacheEntries(entrys.clone()), + ..Default::default() + }), + err: None, + }) + .await?; + + returned = true; + sender = None; + } + continue; + } + + entrys.push(Some(entry)); // entrys.push(entry); } - results_tx.send(Ok(results)).await; + // finish not full, return eof + if let Some(tx) = sender { + tx.send(MetaCacheEntriesSortedResult { + entries: Some(MetaCacheEntriesSorted { + o: MetaCacheEntries(entrys.clone()), + ..Default::default() + }), + err: Some(Error::new(std::io::Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF"))), + }) + .await?; + } + + Ok(()) } async fn select_from( @@ -1024,7 +1198,7 @@ mod test { let (_, rx) = broadcast::channel(1); let bucket = "dada".to_owned(); - let forward_to = "".to_owned(); + let forward_to = None; let disks = vec![Some(disk)]; let fallback_disks = Vec::new(); diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index a3ea78b9..cda7db11 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -55,7 +55,7 @@ impl ObjectStore { let items = self .object_api .clone() - .list_objects_v2(Self::BUCKET_NAME.into(), &prefix.clone(), "", "", 0, false, "") + .list_objects_v2(Self::BUCKET_NAME, &prefix.clone(), None, None, 0, false, None) .await; match items { diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index f2c1df3c..999d80df 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -10,7 +10,7 @@ use ecstore::{ bucket::{metadata::load_bucket_metadata, metadata_sys}, disk::{ DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, Reader, - UpdateMetadataOpts, WalkDirOptions, + UpdateMetadataOpts, }, erasure::Writer, heal::{ diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index da9eb186..4251391b 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -460,6 +460,7 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self, req))] async fn list_objects_v2(&self, req: S3Request) -> S3Result> { + // warn!("list_objects_v2 req {:?}", &req.input); let ListObjectsV2Input { bucket, continuation_token, @@ -472,9 +473,12 @@ impl S3 for FS { } = req.input; let prefix = prefix.unwrap_or_default(); - let delimiter = delimiter.unwrap_or_default(); let max_keys = max_keys.unwrap_or(1000); + let delimiter = delimiter.filter(|v| !v.is_empty()); + let continuation_token = continuation_token.filter(|v| !v.is_empty()); + let start_after = start_after.filter(|v| !v.is_empty()); + let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; @@ -483,16 +487,16 @@ impl S3 for FS { .list_objects_v2( &bucket, &prefix, - &continuation_token.unwrap_or_default(), - &delimiter, + continuation_token, + delimiter.clone(), max_keys, fetch_owner.unwrap_or_default(), - &start_after.unwrap_or_default(), + start_after, ) .await .map_err(to_s3_error)?; - // warn!("object_infos {:?}", object_infos); + // warn!("object_infos objects {:?}", object_infos.objects); let objects: Vec = object_infos .objects @@ -526,10 +530,13 @@ impl S3 for FS { .collect(); let output = ListObjectsV2Output { + is_truncated: Some(object_infos.is_truncated), + continuation_token: object_infos.continuation_token, + next_continuation_token: object_infos.next_continuation_token, key_count: Some(key_count), max_keys: Some(key_count), contents: Some(objects), - delimiter: Some(delimiter), + delimiter, name: Some(bucket), prefix: Some(prefix), common_prefixes: Some(common_prefixes), @@ -555,22 +562,18 @@ impl S3 for FS { } = req.input; let prefix = prefix.unwrap_or_default(); - let delimiter = delimiter.unwrap_or_default(); let max_keys = max_keys.unwrap_or(1000); + let key_marker = key_marker.filter(|v| !v.is_empty()); + let version_id_marker = version_id_marker.filter(|v| !v.is_empty()); + let delimiter = delimiter.filter(|v| !v.is_empty()); + let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; let object_infos = store - .list_object_versions( - &bucket, - &prefix, - &key_marker.unwrap_or_default(), - &version_id_marker.unwrap_or_default(), - &delimiter, - max_keys, - ) + .list_object_versions(&bucket, &prefix, key_marker, version_id_marker, delimiter.clone(), max_keys) .await .map_err(to_s3_error)?; @@ -579,7 +582,7 @@ impl S3 for FS { .iter() .filter(|v| !v.name.is_empty()) .map(|v| { - let obj = ObjectVersion { + ObjectVersion { key: Some(v.name.to_owned()), last_modified: v.mod_time.map(Timestamp::from), size: Some(v.size as i64), @@ -587,9 +590,7 @@ impl S3 for FS { is_latest: Some(v.is_latest), e_tag: v.etag.clone(), ..Default::default() // TODO: another fields - }; - - obj + } }) .collect(); @@ -604,7 +605,7 @@ impl S3 for FS { let output = ListObjectVersionsOutput { // is_truncated: Some(object_infos.is_truncated), max_keys: Some(key_count), - delimiter: Some(delimiter), + delimiter, name: Some(bucket), prefix: Some(prefix), common_prefixes: Some(common_prefixes),