marker done, need test

This commit is contained in:
weisd
2024-12-24 17:15:20 +08:00
parent 2e0ee441cf
commit ff23706b10
11 changed files with 545 additions and 295 deletions

View File

@@ -27,8 +27,8 @@ pub struct ListPathRawOptions {
pub bucket: String,
pub path: String,
pub recursice: bool,
pub filter_prefix: String,
pub forward_to: String,
pub filter_prefix: Option<String>,
pub forward_to: Option<String>,
pub min_disks: usize,
pub report_not_found: bool,
pub per_disk_limit: i32,

View File

@@ -729,16 +729,29 @@ impl LocalDisk {
objs_returned: &mut i32,
) -> Result<()> {
let forward = {
if !opts.forward_to.is_empty() && opts.forward_to.starts_with(&*current) {
let forward = opts.forward_to.trim_start_matches(&*current);
opts.forward_to.as_ref().filter(|v| v.starts_with(&*current)).map(|v| {
let forward = v.trim_start_matches(&*current);
if let Some(idx) = forward.find('/') {
&forward[..idx]
forward[..idx].to_owned()
} else {
forward
forward.to_owned()
}
} else {
""
}
})
// if let Some(forward_to) = &opts.forward_to {
// } else {
// None
// }
// if !opts.forward_to.is_empty() && opts.forward_to.starts_with(&*current) {
// let forward = opts.forward_to.trim_start_matches(&*current);
// if let Some(idx) = forward.find('/') {
// &forward[..idx]
// } else {
// forward
// }
// } else {
// ""
// }
};
if opts.limit > 0 && *objs_returned >= opts.limit {
@@ -781,14 +794,18 @@ impl LocalDisk {
return Ok(());
}
// check prefix
if !opts.filter_prefix.is_empty() && !entry.starts_with(&opts.filter_prefix) {
*item = "".to_owned();
continue;
if let Some(filter_prefix) = &opts.filter_prefix {
if !entry.starts_with(filter_prefix) {
*item = "".to_owned();
continue;
}
}
if !forward.is_empty() && entry.as_str() < forward {
*item = "".to_owned();
continue;
if let Some(forward) = &forward {
if &entry < forward {
*item = "".to_owned();
continue;
}
}
if entry.ends_with(SLASH_SEPARATOR) {
@@ -828,9 +845,9 @@ impl LocalDisk {
entries.sort();
let mut entries = entries.as_slice();
if !forward.is_empty() {
if let Some(forward) = &forward {
for (i, entry) in entries.iter().enumerate() {
if entry.as_str() >= forward || forward.starts_with(entry.as_str()) {
if entry >= forward || forward.starts_with(entry.as_str()) {
entries = &entries[i..];
break;
}

View File

@@ -25,6 +25,7 @@ use crate::{
heal_commands::{HealScanMode, HealingTracker},
},
store_api::{FileInfo, ObjectInfo, RawFileInfo},
utils::path::SLASH_SEPARATOR,
};
use endpoint::Endpoint;
use error::DiskError;
@@ -566,8 +567,6 @@ impl FileInfoVersions {
let vid = Uuid::parse_str(v).unwrap_or(Uuid::nil());
for ver in self.versions.iter() {}
self.versions.iter().position(|v| v.version_id == Some(vid))
}
}
@@ -586,10 +585,10 @@ pub struct WalkDirOptions {
// FilterPrefix will only return results with given prefix within folder.
// Should never contain a slash.
pub filter_prefix: String,
pub filter_prefix: Option<String>,
// ForwardTo will forward to the given object path.
pub forward_to: String,
pub forward_to: Option<String>,
// Limit the number of returned objects if > 0.
pub limit: i32,
@@ -639,10 +638,29 @@ impl MetaCacheEntry {
pub fn is_dir(&self) -> bool {
self.metadata.is_empty() && self.name.ends_with('/')
}
pub fn is_in_dir(&self, dir: &str, separator: &str) -> bool {
if dir.is_empty() {
let idx = self.name.find(separator);
return idx.is_none() || idx.unwrap() == self.name.len() - separator.len();
}
let ext = self.name.trim_start_matches(dir);
if ext.len() != self.name.len() {
let idx = ext.find(separator);
return idx.is_none() || idx.unwrap() == ext.len() - separator.len();
}
false
}
pub fn is_object(&self) -> bool {
!self.metadata.is_empty()
}
pub fn is_object_dir(&self) -> bool {
!self.metadata.is_empty() && self.name.ends_with(SLASH_SEPARATOR)
}
pub fn is_latest_deletemarker(&mut self) -> bool {
if let Some(cached) = &self.cached {
if cached.versions.is_empty() {
@@ -838,7 +856,7 @@ impl MetaCacheEntry {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct MetaCacheEntries(pub Vec<Option<MetaCacheEntry>>);
impl MetaCacheEntries {
@@ -936,26 +954,50 @@ impl MetaCacheEntries {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct MetaCacheEntriesSortedResult {
pub entries: Option<MetaCacheEntriesSorted>,
pub err: Option<Error>,
}
// impl MetaCacheEntriesSortedResult {
// pub fn entriy_list(&self) -> Vec<&MetaCacheEntry> {
// if let Some(entries) = &self.entries {
// entries.entries()
// } else {
// Vec::new()
// }
// }
// }
#[derive(Debug, Default)]
pub struct MetaCacheEntriesSorted {
pub o: MetaCacheEntries,
// pub list_id: String,
// pub reuse: bool,
// pub lastSkippedEntry: String,
pub list_id: Option<String>,
pub reuse: bool,
pub last_skipped_entry: Option<String>,
}
impl MetaCacheEntriesSorted {
pub fn entries(&self) -> Vec<MetaCacheEntry> {
let entries: Vec<MetaCacheEntry> = self.o.0.iter().flatten().cloned().collect();
pub fn entries(&self) -> Vec<&MetaCacheEntry> {
let entries: Vec<&MetaCacheEntry> = self.o.0.iter().flatten().collect();
entries
}
pub async fn file_infos(&self, bucket: &str, prefix: &str, delimiter: &str) -> Vec<ObjectInfo> {
pub fn forward_past(&mut self, marker: Option<String>) {
if let Some(val) = marker {
// TODO: reuse
if let Some(idx) = self.o.0.iter().flatten().position(|v| v.name > val) {
self.o.0 = self.o.0.split_off(idx);
}
}
}
pub async fn file_infos(&self, bucket: &str, prefix: &str, delimiter: Option<String>) -> Vec<ObjectInfo> {
let vcfg = get_versioning_config(bucket).await.ok();
let mut objects = Vec::with_capacity(self.o.as_ref().len());
let mut prev_prefix = "";
for entry in self.o.as_ref().iter().flatten() {
if entry.is_object() {
if !delimiter.is_empty() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
@@ -985,25 +1027,23 @@ impl MetaCacheEntriesSorted {
}
if entry.is_dir() {
if delimiter.is_empty() {
continue;
}
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
continue;
}
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
continue;
prev_prefix = curr_prefix;
objects.push(ObjectInfo {
is_dir: true,
bucket: bucket.to_owned(),
name: curr_prefix.to_owned(),
..Default::default()
});
}
prev_prefix = curr_prefix;
objects.push(ObjectInfo {
is_dir: true,
bucket: bucket.to_owned(),
name: curr_prefix.to_owned(),
..Default::default()
});
}
}
}
@@ -1012,14 +1052,20 @@ impl MetaCacheEntriesSorted {
objects
}
pub async fn file_info_versions(&self, bucket: &str, prefix: &str, delimiter: &str, after_v: &str) -> Vec<ObjectInfo> {
pub async fn file_info_versions(
&self,
bucket: &str,
prefix: &str,
delimiter: Option<String>,
after_v: Option<String>,
) -> Vec<ObjectInfo> {
let vcfg = get_versioning_config(bucket).await.ok();
let mut objects = Vec::with_capacity(self.o.as_ref().len());
let mut prev_prefix = "";
let mut after_v = after_v;
for entry in self.o.as_ref().iter().flatten() {
if entry.is_object() {
if !delimiter.is_empty() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
@@ -1049,13 +1095,13 @@ impl MetaCacheEntriesSorted {
};
let fi_versions = 'c: {
if !after_v.is_empty() {
if let Some(idx) = fiv.find_version_index(after_v) {
after_v = "";
if let Some(after_val) = &after_v {
if let Some(idx) = fiv.find_version_index(after_val) {
after_v = None;
break 'c fiv.versions.split_off(idx + 1);
}
after_v = "";
after_v = None;
break 'c fiv.versions;
} else {
break 'c fiv.versions;
@@ -1073,25 +1119,23 @@ impl MetaCacheEntriesSorted {
}
if entry.is_dir() {
if delimiter.is_empty() {
continue;
}
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
continue;
}
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
continue;
prev_prefix = curr_prefix;
objects.push(ObjectInfo {
is_dir: true,
bucket: bucket.to_owned(),
name: curr_prefix.to_owned(),
..Default::default()
});
}
prev_prefix = curr_prefix;
objects.push(ObjectInfo {
is_dir: true,
bucket: bucket.to_owned(),
name: curr_prefix.to_owned(),
..Default::default()
});
}
}
}

View File

@@ -1907,9 +1907,15 @@ impl SetDisks {
fallback_disks: fallback_disks.to_vec(),
bucket: bucket.to_string(),
path: path.to_string(),
filter_prefix: filter_prefix.to_string(),
filter_prefix: {
if filter_prefix.is_empty() {
None
} else {
Some(filter_prefix.to_string())
}
},
recursice: true,
forward_to: "".to_string(),
forward_to: None,
min_disks: 1,
report_not_found: false,
per_disk_limit: 0,
@@ -3067,10 +3073,10 @@ impl SetDisks {
continue;
}
let mut forward_to = "".to_string();
let mut forward_to = None;
let b = tracker.read().await.get_bucket().await;
if b == *bucket {
forward_to = tracker.read().await.get_object().await;
forward_to = Some(tracker.read().await.get_object().await);
}
if !b.is_empty() {
@@ -3835,11 +3841,11 @@ impl StorageAPI for SetDisks {
self: Arc<Self>,
_bucket: &str,
_prefix: &str,
_continuation_token: &str,
_delimiter: &str,
_continuation_token: Option<String>,
_delimiter: Option<String>,
_max_keys: i32,
_fetch_owner: bool,
_start_after: &str,
_start_after: Option<String>,
) -> Result<ListObjectsV2Info> {
unimplemented!()
}
@@ -3847,9 +3853,9 @@ impl StorageAPI for SetDisks {
self: Arc<Self>,
_bucket: &str,
_prefix: &str,
_marker: &str,
_version_marker: &str,
_delimiter: &str,
_marker: Option<String>,
_version_marker: Option<String>,
_delimiter: Option<String>,
_max_keys: i32,
) -> Result<ListObjectVersionsInfo> {
unimplemented!()
@@ -4113,9 +4119,9 @@ impl StorageAPI for SetDisks {
&self,
bucket: &str,
object: &str,
key_marker: &str,
upload_id_marker: &str,
delimiter: &str,
key_marker: Option<String>,
upload_id_marker: Option<String>,
delimiter: Option<String>,
max_uploads: usize,
) -> Result<ListMultipartsInfo> {
let disks = {
@@ -4209,14 +4215,14 @@ impl StorageAPI for SetDisks {
uploads.sort_by(|a, b| a.initiated.cmp(&b.initiated));
let mut upload_idx = 0;
if !upload_id_marker.is_empty() {
if let Some(upload_id_marker) = &upload_id_marker {
while upload_idx < uploads.len() {
if uploads[upload_idx].upload_id != upload_id_marker {
if &uploads[upload_idx].upload_id != upload_id_marker {
upload_idx += 1;
continue;
}
if uploads[upload_idx].upload_id == upload_id_marker {
if &uploads[upload_idx].upload_id == upload_id_marker {
upload_idx += 1;
break;
}
@@ -4226,10 +4232,10 @@ impl StorageAPI for SetDisks {
}
let mut ret_uploads = Vec::new();
let mut next_upload_id_marker = String::new();
let mut next_upload_id_marker = None;
while upload_idx < uploads.len() {
ret_uploads.push(uploads[upload_idx].clone());
next_upload_id_marker = uploads[upload_idx].upload_id.clone();
next_upload_id_marker = Some(uploads[upload_idx].upload_id.clone());
upload_idx += 1;
if ret_uploads.len() > max_uploads {
@@ -4240,7 +4246,7 @@ impl StorageAPI for SetDisks {
let is_truncated = ret_uploads.len() < uploads.len();
if !is_truncated {
next_upload_id_marker = "".to_owned();
next_upload_id_marker = None;
}
Ok(ListMultipartsInfo {

View File

@@ -454,11 +454,11 @@ impl StorageAPI for Sets {
self: Arc<Self>,
_bucket: &str,
_prefix: &str,
_continuation_token: &str,
_delimiter: &str,
_continuation_token: Option<String>,
_delimiter: Option<String>,
_max_keys: i32,
_fetch_owner: bool,
_start_after: &str,
_start_after: Option<String>,
) -> Result<ListObjectsV2Info> {
unimplemented!()
}
@@ -466,9 +466,9 @@ impl StorageAPI for Sets {
self: Arc<Self>,
_bucket: &str,
_prefix: &str,
_marker: &str,
_version_marker: &str,
_delimiter: &str,
_marker: Option<String>,
_version_marker: Option<String>,
_delimiter: Option<String>,
_max_keys: i32,
) -> Result<ListObjectVersionsInfo> {
unimplemented!()
@@ -527,9 +527,9 @@ impl StorageAPI for Sets {
&self,
bucket: &str,
prefix: &str,
key_marker: &str,
upload_id_marker: &str,
delimiter: &str,
key_marker: Option<String>,
upload_id_marker: Option<String>,
delimiter: Option<String>,
max_uploads: usize,
) -> Result<ListMultipartsInfo> {
self.get_disks_by_key(prefix)

View File

@@ -23,7 +23,6 @@ use crate::store_err::{
to_object_err, StorageError,
};
use crate::store_init::ec_drives_no_config;
use crate::store_list_objects::{max_keys_plus_one, ListPathOptions};
use crate::utils::crypto::base64_decode;
use crate::utils::path::{decode_dir_object, encode_dir_object, SLASH_SEPARATOR};
use crate::utils::xml;
@@ -1519,11 +1518,11 @@ impl StorageAPI for ECStore {
self: Arc<Self>,
bucket: &str,
prefix: &str,
continuation_token: &str,
delimiter: &str,
continuation_token: Option<String>,
delimiter: Option<String>,
max_keys: i32,
fetch_owner: bool,
start_after: &str,
start_after: Option<String>,
) -> Result<ListObjectsV2Info> {
self.inner_list_objects_v2(bucket, prefix, continuation_token, delimiter, max_keys, fetch_owner, start_after)
.await
@@ -1532,9 +1531,9 @@ impl StorageAPI for ECStore {
self: Arc<Self>,
bucket: &str,
prefix: &str,
marker: &str,
version_marker: &str,
delimiter: &str,
marker: Option<String>,
version_marker: Option<String>,
delimiter: Option<String>,
max_keys: i32,
) -> Result<ListObjectVersionsInfo> {
self.inner_list_object_versions(bucket, prefix, marker, version_marker, delimiter, max_keys)
@@ -1672,12 +1671,12 @@ impl StorageAPI for ECStore {
&self,
bucket: &str,
prefix: &str,
key_marker: &str,
upload_id_marker: &str,
delimiter: &str,
key_marker: Option<String>,
upload_id_marker: Option<String>,
delimiter: Option<String>,
max_uploads: usize,
) -> Result<ListMultipartsInfo> {
check_list_multipart_args(bucket, prefix, key_marker, upload_id_marker, delimiter)?;
check_list_multipart_args(bucket, prefix, &key_marker, &upload_id_marker, &delimiter)?;
if prefix.is_empty() {
// TODO: return from cache
@@ -1693,14 +1692,21 @@ impl StorageAPI for ECStore {
for pool in self.pools.iter() {
let res = pool
.list_multipart_uploads(bucket, prefix, key_marker, upload_id_marker, delimiter, max_uploads)
.list_multipart_uploads(
bucket,
prefix,
key_marker.clone(),
upload_id_marker.clone(),
delimiter.clone(),
max_uploads,
)
.await?;
uploads.extend(res.uploads);
}
Ok(ListMultipartsInfo {
key_marker: key_marker.to_owned(),
upload_id_marker: upload_id_marker.to_owned(),
key_marker,
upload_id_marker,
max_uploads,
uploads,
prefix: prefix.to_owned(),
@@ -1718,7 +1724,7 @@ impl StorageAPI for ECStore {
for (idx, pool) in self.pools.iter().enumerate() {
// // TODO: IsSuspended
let res = pool
.list_multipart_uploads(bucket, object, "", "", "", MAX_UPLOADS_LIST)
.list_multipart_uploads(bucket, object, None, None, None, MAX_UPLOADS_LIST)
.await?;
if !res.uploads.is_empty() {
@@ -2203,7 +2209,7 @@ fn check_bucket_and_object_names(bucket: &str, object: &str) -> Result<()> {
Ok(())
}
pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &str) -> Result<()> {
pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &Option<String>) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(Error::new(StorageError::BucketNameInvalid(bucket.to_string())));
}
@@ -2218,18 +2224,20 @@ pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &str) -> Result
fn check_list_multipart_args(
bucket: &str,
prefix: &str,
key_marker: &str,
upload_id_marker: &str,
_delimiter: &str,
key_marker: &Option<String>,
upload_id_marker: &Option<String>,
_delimiter: &Option<String>,
) -> Result<()> {
check_list_objs_args(bucket, prefix, key_marker)?;
if !upload_id_marker.is_empty() {
if key_marker.ends_with('/') {
return Err(Error::new(StorageError::InvalidUploadIDKeyCombination(
upload_id_marker.to_string(),
key_marker.to_string(),
)));
if let Some(upload_id_marker) = upload_id_marker {
if let Some(key_marker) = key_marker {
if key_marker.ends_with('/') {
return Err(Error::new(StorageError::InvalidUploadIDKeyCombination(
upload_id_marker.to_string(),
key_marker.to_string(),
)));
}
}
if let Err(_e) = base64_decode(upload_id_marker.as_bytes()) {

View File

@@ -688,7 +688,7 @@ pub struct ListObjectsInfo {
// When response is truncated (the IsTruncated element value in the response
// is true), you can use the key name in this field as marker in the subsequent
// request to get next set of objects.
pub next_marker: String,
pub next_marker: Option<String>,
// List of objects info for this request.
pub objects: Vec<ObjectInfo>,
@@ -711,8 +711,8 @@ pub struct ListObjectsV2Info {
//
// NOTE: This element is returned only if you have delimiter request parameter
// specified.
pub continuation_token: String,
pub next_continuation_token: String,
pub continuation_token: Option<String>,
pub next_continuation_token: Option<String>,
// List of objects info for this request.
pub objects: Vec<ObjectInfo>,
@@ -744,20 +744,20 @@ pub struct MultipartInfo {
pub struct ListMultipartsInfo {
// Together with upload-id-marker, this parameter specifies the multipart upload
// after which listing should begin.
pub key_marker: String,
pub key_marker: Option<String>,
// Together with key-marker, specifies the multipart upload after which listing
// should begin. If key-marker is not specified, the upload-id-marker parameter
// is ignored.
pub upload_id_marker: String,
pub upload_id_marker: Option<String>,
// When a list is truncated, this element specifies the value that should be
// used for the key-marker request parameter in a subsequent request.
pub next_key_marker: String,
pub next_key_marker: Option<String>,
// When a list is truncated, this element specifies the value that should be
// used for the upload-id-marker request parameter in a subsequent request.
pub next_upload_id_marker: String,
pub next_upload_id_marker: Option<String>,
// Maximum number of multipart uploads that could have been included in the
// response.
@@ -778,7 +778,7 @@ pub struct ListMultipartsInfo {
// A character used to truncate the object prefixes.
// NOTE: only supported delimiter is '/'.
pub delimiter: String,
pub delimiter: Option<String>,
// CommonPrefixes contains all (if there are any) keys between Prefix and the
// next occurrence of the string specified by delimiter.
@@ -846,20 +846,20 @@ pub trait StorageAPI: ObjectIO {
self: Arc<Self>,
bucket: &str,
prefix: &str,
continuation_token: &str,
delimiter: &str,
continuation_token: Option<String>,
delimiter: Option<String>,
max_keys: i32,
fetch_owner: bool,
start_after: &str,
start_after: Option<String>,
) -> Result<ListObjectsV2Info>;
// ListObjectVersions TODO: FIXME:
async fn list_object_versions(
self: Arc<Self>,
bucket: &str,
prefix: &str,
marker: &str,
version_marker: &str,
delimiter: &str,
marker: Option<String>,
version_marker: Option<String>,
delimiter: Option<String>,
max_keys: i32,
) -> Result<ListObjectVersionsInfo>;
// Walk TODO:
@@ -884,9 +884,9 @@ pub trait StorageAPI: ObjectIO {
&self,
bucket: &str,
prefix: &str,
key_marker: &str,
upload_id_marker: &str,
delimiter: &str,
key_marker: Option<String>,
upload_id_marker: Option<String>,
delimiter: Option<String>,
max_uploads: usize,
) -> Result<ListMultipartsInfo>;
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult>;

View File

@@ -1,13 +1,16 @@
use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions};
use crate::disk::error::{is_all_not_found, is_all_volume_not_found, is_err_eof, DiskError};
use crate::disk::{DiskInfo, DiskStore, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntry, MetadataResolutionParams};
use crate::disk::{
DiskInfo, DiskStore, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry,
MetadataResolutionParams,
};
use crate::error::{Error, Result};
use crate::file_meta::merge_file_meta_versions;
use crate::peer::is_reserved_or_invalid_bucket;
use crate::set_disk::SetDisks;
use crate::store::check_list_objs_args;
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::store_err::{is_err_bucket_exists, is_err_bucket_not_found, StorageError};
use crate::store_err::{is_err_bucket_not_found, to_object_err, StorageError};
use crate::utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR};
use crate::StorageAPI;
use crate::{store::ECStore, store_api::ListObjectsV2Info};
@@ -20,6 +23,7 @@ use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::error;
use uuid::Uuid;
const MAX_OBJECT_LIST: i32 = 1000;
// const MAX_DELETE_LIST: i32 = 1000;
@@ -41,7 +45,7 @@ pub fn max_keys_plus_one(max_keys: i32, add_one: bool) -> i32 {
#[derive(Debug, Default, Clone)]
pub struct ListPathOptions {
pub id: String,
pub id: Option<String>,
// Bucket of the listing.
pub bucket: String,
@@ -56,11 +60,11 @@ pub struct ListPathOptions {
// FilterPrefix will return only results with this prefix when scanning.
// Should never contain a slash.
// Prefix should still be set.
pub filter_prefix: String,
pub filter_prefix: Option<String>,
// Marker to resume listing.
// The response will be the first entry >= this object name.
pub marker: String,
pub marker: Option<String>,
// Limit the number of results.
pub limit: i32,
@@ -77,7 +81,7 @@ pub struct ListPathOptions {
pub recursive: bool,
// Separator to use.
pub separator: String,
pub separator: Option<String>,
// Create indicates that the lister should not attempt to load an existing cache.
pub create: bool,
@@ -94,8 +98,13 @@ pub struct ListPathOptions {
pub versioned: bool,
pub stop_disk_at_limit: bool,
pub pool_idx: Option<usize>,
pub set_idx: Option<usize>,
}
const MARKER_TAG_VERSION: &str = "v1";
impl ListPathOptions {
pub fn set_filter(&mut self) {
if METACACHE_SHARE_PREFIX {
@@ -106,10 +115,79 @@ impl ListPathOptions {
}
let s = SLASH_SEPARATOR.chars().next().unwrap_or_default();
self.filter_prefix = self.prefix.trim_start_matches(&self.base_dir).trim_matches(s).to_owned();
self.filter_prefix = {
let fp = self.prefix.trim_start_matches(&self.base_dir).trim_matches(s);
if self.filter_prefix.contains(s) {
self.filter_prefix = "".to_owned();
if fp.contains(s) || fp.is_empty() {
None
} else {
Some(fp.to_owned())
}
}
}
pub fn parse_marker(&mut self) {
if let Some(marker) = &self.marker {
let s = marker.clone();
if !s.contains(format!("[rustfs_cache:{}", MARKER_TAG_VERSION).as_str()) {
return;
}
if let (Some(start_idx), Some(end_idx)) = (s.find("["), s.find("]")) {
self.marker = Some(s[0..start_idx].to_owned());
let tags: Vec<_> = s[start_idx..end_idx].trim_matches(['[', ']']).split(",").collect();
for &tag in tags.iter() {
let kv: Vec<_> = tag.split(":").collect();
if kv.len() != 2 {
continue;
}
match kv[0] {
"rustfs_cache" => {
if kv[1] != MARKER_TAG_VERSION {
continue;
}
}
"id" => self.id = Some(kv[1].to_owned()),
"return" => {
self.id = Some(Uuid::new_v4().to_string());
self.create = true;
}
"p" => match kv[1].parse::<usize>() {
Ok(res) => self.pool_idx = Some(res),
Err(_) => {
self.id = Some(Uuid::new_v4().to_string());
self.create = true;
continue;
}
},
"s" => match kv[1].parse::<usize>() {
Ok(res) => self.set_idx = Some(res),
Err(_) => {
self.id = Some(Uuid::new_v4().to_string());
self.create = true;
continue;
}
},
_ => (),
}
}
}
}
}
pub fn encode_marker(&mut self, marker: &str) -> String {
if let Some(id) = &self.id {
format!(
"{}[rustfs_cache:{},id:{},p:{},s:{}]",
marker,
MARKER_TAG_VERSION,
id.to_owned(),
self.pool_idx.unwrap_or_default(),
self.pool_idx.unwrap_or_default(),
)
} else {
format!("{}[rustfs_cache:{},return:]", marker, MARKER_TAG_VERSION)
}
}
}
@@ -124,24 +202,24 @@ impl ECStore {
self: Arc<Self>,
bucket: &str,
prefix: &str,
continuation_token: &str,
delimiter: &str,
continuation_token: Option<String>,
delimiter: Option<String>,
max_keys: i32,
_fetch_owner: bool,
start_after: &str,
start_after: Option<String>,
) -> Result<ListObjectsV2Info> {
let marker = {
if continuation_token.is_empty() {
if continuation_token.is_none() {
start_after
} else {
continuation_token
continuation_token.clone()
}
};
let loi = self.list_objects_generic(bucket, prefix, marker, delimiter, max_keys).await?;
Ok(ListObjectsV2Info {
is_truncated: loi.is_truncated,
continuation_token: continuation_token.to_owned(),
continuation_token,
next_continuation_token: loi.next_marker,
objects: loi.objects,
prefixes: loi.prefixes,
@@ -152,23 +230,23 @@ impl ECStore {
self: Arc<Self>,
bucket: &str,
prefix: &str,
marker: &str,
delimiter: &str,
marker: Option<String>,
delimiter: Option<String>,
max_keys: i32,
) -> Result<ListObjectsInfo> {
let opts = ListPathOptions {
bucket: bucket.to_owned(),
prefix: prefix.to_owned(),
separator: delimiter.to_owned(),
limit: max_keys_plus_one(max_keys, !marker.is_empty()),
marker: marker.to_owned(),
separator: delimiter.clone(),
limit: max_keys_plus_one(max_keys, marker.is_some()),
marker,
incl_deleted: false,
ask_disks: "strict".to_owned(), //TODO: from config
..Default::default()
};
// use get
if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_empty() {
if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() {
match self
.get_object_info(
&opts.bucket,
@@ -194,31 +272,46 @@ impl ECStore {
};
};
let mut err_eof = false;
let has_merged = match self.list_path(&opts).await {
Ok(res) => Some(res),
Err(err) => {
if !is_err_eof(&err) {
return Err(err);
}
let mut list_result = match self.list_path(&opts).await {
Ok(res) => res,
Err(err) => MetaCacheEntriesSortedResult {
err: Some(err),
..Default::default()
},
};
err_eof = true;
None
if let Some(err) = &list_result.err {
if !is_err_eof(err) {
return Err(to_object_err(list_result.err.unwrap(), vec![bucket, prefix]));
}
};
}
let mut get_objects = if let Some(merged) = has_merged {
merged.file_infos(bucket, prefix, delimiter).await
} else {
Vec::new()
};
if let Some(result) = list_result.entries.as_mut() {
result.forward_past(opts.marker);
}
// contextCanceled
let mut get_objects = list_result
.entries
.unwrap_or_default()
.file_infos(bucket, prefix, delimiter.clone())
.await;
let is_truncated = {
if max_keys > 0 && get_objects.len() > max_keys as usize {
get_objects.truncate(max_keys as usize);
true
} else {
!err_eof && !get_objects.is_empty()
list_result.err.is_none() && !get_objects.is_empty()
}
};
let next_marker = {
if is_truncated {
get_objects.last().map(|last| last.name.clone())
} else {
None
}
};
@@ -226,35 +319,31 @@ impl ECStore {
let mut objects = Vec::with_capacity(get_objects.len());
for obj in get_objects.into_iter() {
if obj.is_dir && obj.mod_time.is_none() && !delimiter.is_empty() {
let mut found = false;
if delimiter != SLASH_SEPARATOR {
for p in prefixes.iter() {
if found {
break;
if let Some(delimiter) = &delimiter {
if obj.is_dir && obj.mod_time.is_none() {
let mut found = false;
if delimiter != SLASH_SEPARATOR {
for p in prefixes.iter() {
if found {
break;
}
found = p == &obj.name;
}
found = p == &obj.name;
}
}
if !found {
prefixes.push(obj.name.clone());
if !found {
prefixes.push(obj.name.clone());
}
} else {
objects.push(obj);
}
} else {
objects.push(obj);
}
}
let next_marker = {
if is_truncated {
objects.last().map(|last| last.name.clone())
} else {
None
}
};
Ok(ListObjectsInfo {
is_truncated,
next_marker: next_marker.unwrap_or_default(),
next_marker,
objects,
prefixes,
})
@@ -264,80 +353,64 @@ impl ECStore {
self: Arc<Self>,
bucket: &str,
prefix: &str,
marker: &str,
version_marker: &str,
delimiter: &str,
marker: Option<String>,
version_marker: Option<String>,
delimiter: Option<String>,
max_keys: i32,
) -> Result<ListObjectVersionsInfo> {
if marker.is_empty() && !version_marker.is_empty() {
if marker.is_none() && version_marker.is_some() {
return Err(Error::new(StorageError::NotImplemented));
}
// if marker set, limit +1
let opts = ListPathOptions {
bucket: bucket.to_owned(),
prefix: prefix.to_owned(),
separator: delimiter.to_owned(),
limit: max_keys_plus_one(max_keys, !marker.is_empty()),
marker: marker.to_owned(),
separator: delimiter.clone(),
limit: max_keys_plus_one(max_keys, marker.is_some()),
marker,
incl_deleted: true,
ask_disks: "strict".to_owned(),
versioned: true,
..Default::default()
};
let mut err_eof = false;
let has_merged = match self.list_path(&opts).await {
Ok(res) => Some(res),
Err(err) => {
if !is_err_eof(&err) {
return Err(err);
}
let mut list_result = match self.list_path(&opts).await {
Ok(res) => res,
Err(err) => MetaCacheEntriesSortedResult {
err: Some(err),
..Default::default()
},
};
err_eof = true;
None
if let Some(err) = &list_result.err {
if !is_err_eof(err) {
return Err(to_object_err(list_result.err.unwrap(), vec![bucket, prefix]));
}
};
}
let mut get_objects = if let Some(merged) = has_merged {
merged.file_info_versions(bucket, prefix, delimiter, version_marker).await
} else {
Vec::new()
};
if let Some(result) = list_result.entries.as_mut() {
result.forward_past(opts.marker);
}
let mut get_objects = list_result
.entries
.unwrap_or_default()
.file_info_versions(bucket, prefix, delimiter.clone(), version_marker)
.await;
let is_truncated = {
if max_keys > 0 && get_objects.len() > max_keys as usize {
get_objects.truncate(max_keys as usize);
true
} else {
!err_eof && !get_objects.is_empty()
list_result.err.is_none() && !get_objects.is_empty()
}
};
let mut prefixes: Vec<String> = Vec::new();
let mut objects = Vec::with_capacity(get_objects.len());
for obj in get_objects.into_iter() {
if obj.is_dir && obj.mod_time.is_none() && !delimiter.is_empty() {
let mut found = false;
if delimiter != SLASH_SEPARATOR {
for p in prefixes.iter() {
if found {
break;
}
found = p == &obj.name;
}
}
if !found {
prefixes.push(obj.name.clone());
}
} else {
objects.push(obj);
}
}
let (next_marker, next_version_idmarker) = {
if is_truncated {
objects
get_objects
.last()
.map(|last| (Some(last.name.clone()), last.version_id.map(|v| v.to_string())))
.unwrap_or_default()
@@ -346,6 +419,32 @@ impl ECStore {
}
};
let mut prefixes: Vec<String> = Vec::new();
let mut objects = Vec::with_capacity(get_objects.len());
for obj in get_objects.into_iter() {
if let Some(delimiter) = &delimiter {
if obj.is_dir && obj.mod_time.is_none() {
let mut found = false;
if delimiter != SLASH_SEPARATOR {
for p in prefixes.iter() {
if found {
break;
}
found = p == &obj.name;
}
}
if !found {
prefixes.push(obj.name.clone());
}
} else {
objects.push(obj);
}
} else {
objects.push(obj);
}
}
Ok(ListObjectVersionsInfo {
is_truncated,
next_marker,
@@ -355,19 +454,21 @@ impl ECStore {
})
}
pub async fn list_path(self: Arc<Self>, o: &ListPathOptions) -> Result<MetaCacheEntriesSorted> {
pub async fn list_path(self: Arc<Self>, o: &ListPathOptions) -> Result<MetaCacheEntriesSortedResult> {
// warn!("list_path opt {:?}", &o);
check_list_objs_args(&o.bucket, &o.prefix, &o.marker)?;
// if opts.prefix.ends_with(SLASH_SEPARATOR) {
// return Err(Error::msg("eof"));
// }
let mut o = o.clone();
if o.marker < o.prefix {
o.marker = "".to_owned();
}
o.marker = o.marker.filter(|v| v >= &o.prefix);
if !o.marker.is_empty() && !o.prefix.is_empty() && !o.marker.starts_with(&o.prefix) {
return Err(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof)));
if let Some(marker) = &o.marker {
if !o.prefix.is_empty() && !marker.starts_with(&o.prefix) {
return Err(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof)));
}
}
if o.limit == 0 {
@@ -378,16 +479,18 @@ impl ECStore {
return Err(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof)));
}
o.include_directories = o.separator == SLASH_SEPARATOR;
let slash_separator = Some(SLASH_SEPARATOR.to_owned());
if (o.separator == SLASH_SEPARATOR || o.separator.is_empty()) && !o.recursive {
o.recursive = o.separator != SLASH_SEPARATOR;
o.separator = SLASH_SEPARATOR.to_owned();
o.include_directories = o.separator == slash_separator;
if (o.separator == slash_separator || o.separator.is_none()) && !o.recursive {
o.recursive = o.separator != slash_separator;
o.separator = slash_separator;
} else {
o.recursive = true
}
// TODO: parseMarker
o.parse_marker();
if o.base_dir.is_empty() {
o.base_dir = base_dir_from_prefix(&o.prefix);
@@ -421,10 +524,16 @@ impl ECStore {
let cancel_rx2 = cancel_rx.resubscribe();
let (result_tx, mut result_rx) = mpsc::channel(1);
let err_tx2 = err_tx.clone();
let opts = o.clone();
let job2 = tokio::spawn(async move {
if let Err(err) = gather_results(cancel_rx2, opts, recv, result_tx).await {
error!("gather_results err {:?}", err);
let _ = err_tx2.send(err);
}
});
let job2 = tokio::spawn(gather_results(cancel_rx2, o, recv, result_tx));
let result = {
let mut result = {
// receiver result
tokio::select! {
res = err_rx.recv() =>{
@@ -432,11 +541,12 @@ impl ECStore {
match res{
Ok(o) => {
error!("list_path err_rx.recv() ok {:?}", &o);
Err(o)
MetaCacheEntriesSortedResult{ entries: None, err: Some(o) }
},
Err(err) => {
error!("list_path err_rx.recv() err {:?}", &err);
Err(Error::new(err))
MetaCacheEntriesSortedResult{ entries: None, err: Some(Error::new(err)) }
},
}
},
@@ -452,7 +562,28 @@ impl ECStore {
// wait spawns exit
join_all(vec![job1, job2]).await;
result
if result.err.is_some() {
return Ok(result);
}
if let Some(entries) = result.entries.as_mut() {
entries.reuse = true;
let truncated = !entries.entries().is_empty() || result.err.is_none();
entries.o.0.truncate(o.limit as usize);
if !o.transient && truncated {
entries.list_id = if let Some(id) = o.id {
Some(id)
} else {
Some(Uuid::new_v4().to_string())
}
}
if !truncated {
result.err = Some(Error::new(std::io::Error::from(ErrorKind::UnexpectedEof)));
}
}
Ok(result)
}
// 读所有
@@ -488,6 +619,10 @@ impl ECStore {
// let merge_res = merge_entry_channels(rx, inputs, sender.clone(), 1).await;
// TODO: cancelList
// let merge_res = merge_entry_channels(rx, inputs, sender.clone(), 1).await;
let results = join_all(futures).await;
let mut all_at_eof = true;
@@ -538,17 +673,15 @@ async fn gather_results(
_rx: B_Receiver<bool>,
opts: ListPathOptions,
recv: Receiver<MetaCacheEntry>,
results_tx: Sender<Result<MetaCacheEntriesSorted>>,
) {
results_tx: Sender<MetaCacheEntriesSortedResult>,
) -> Result<()> {
let mut returned = false;
let mut results = MetaCacheEntriesSorted {
o: MetaCacheEntries(Vec::new()),
};
let mut sender = Some(results_tx);
let mut recv = recv;
// let mut entrys = Vec::new();
let mut entrys = Vec::new();
while let Some(mut entry) = recv.recv().await {
// warn!("gather_entrys entry {}", &entry.name);
if returned {
continue;
}
@@ -562,21 +695,62 @@ async fn gather_results(
continue;
}
if !opts.marker.is_empty() && entry.name < opts.marker {
if let Some(marker) = &opts.marker {
if &entry.name < marker {
continue;
}
}
if !entry.name.starts_with(&opts.prefix) {
continue;
}
// TODO: other
if opts.limit > 0 && results.o.0.len() >= opts.limit as usize {
returned = true;
if let Some(separator) = &opts.separator {
if !opts.recursive && !entry.is_in_dir(&opts.prefix, separator) {
continue;
}
}
if !opts.incl_deleted && entry.is_object() && entry.is_latest_deletemarker() && entry.is_object_dir() {
continue;
}
results.o.0.push(Some(entry));
// TODO: Lifecycle
if opts.limit > 0 && entrys.len() >= opts.limit as usize {
if let Some(tx) = sender {
tx.send(MetaCacheEntriesSortedResult {
entries: Some(MetaCacheEntriesSorted {
o: MetaCacheEntries(entrys.clone()),
..Default::default()
}),
err: None,
})
.await?;
returned = true;
sender = None;
}
continue;
}
entrys.push(Some(entry));
// entrys.push(entry);
}
results_tx.send(Ok(results)).await;
// finish not full, return eof
if let Some(tx) = sender {
tx.send(MetaCacheEntriesSortedResult {
entries: Some(MetaCacheEntriesSorted {
o: MetaCacheEntries(entrys.clone()),
..Default::default()
}),
err: Some(Error::new(std::io::Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF"))),
})
.await?;
}
Ok(())
}
async fn select_from(
@@ -1024,7 +1198,7 @@ mod test {
let (_, rx) = broadcast::channel(1);
let bucket = "dada".to_owned();
let forward_to = "".to_owned();
let forward_to = None;
let disks = vec![Some(disk)];
let fallback_disks = Vec::new();

View File

@@ -55,7 +55,7 @@ impl ObjectStore {
let items = self
.object_api
.clone()
.list_objects_v2(Self::BUCKET_NAME.into(), &prefix.clone(), "", "", 0, false, "")
.list_objects_v2(Self::BUCKET_NAME, &prefix.clone(), None, None, 0, false, None)
.await;
match items {

View File

@@ -10,7 +10,7 @@ use ecstore::{
bucket::{metadata::load_bucket_metadata, metadata_sys},
disk::{
DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, Reader,
UpdateMetadataOpts, WalkDirOptions,
UpdateMetadataOpts,
},
erasure::Writer,
heal::{

View File

@@ -460,6 +460,7 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
// warn!("list_objects_v2 req {:?}", &req.input);
let ListObjectsV2Input {
bucket,
continuation_token,
@@ -472,9 +473,12 @@ impl S3 for FS {
} = req.input;
let prefix = prefix.unwrap_or_default();
let delimiter = delimiter.unwrap_or_default();
let max_keys = max_keys.unwrap_or(1000);
let delimiter = delimiter.filter(|v| !v.is_empty());
let continuation_token = continuation_token.filter(|v| !v.is_empty());
let start_after = start_after.filter(|v| !v.is_empty());
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
@@ -483,16 +487,16 @@ impl S3 for FS {
.list_objects_v2(
&bucket,
&prefix,
&continuation_token.unwrap_or_default(),
&delimiter,
continuation_token,
delimiter.clone(),
max_keys,
fetch_owner.unwrap_or_default(),
&start_after.unwrap_or_default(),
start_after,
)
.await
.map_err(to_s3_error)?;
// warn!("object_infos {:?}", object_infos);
// warn!("object_infos objects {:?}", object_infos.objects);
let objects: Vec<Object> = object_infos
.objects
@@ -526,10 +530,13 @@ impl S3 for FS {
.collect();
let output = ListObjectsV2Output {
is_truncated: Some(object_infos.is_truncated),
continuation_token: object_infos.continuation_token,
next_continuation_token: object_infos.next_continuation_token,
key_count: Some(key_count),
max_keys: Some(key_count),
contents: Some(objects),
delimiter: Some(delimiter),
delimiter,
name: Some(bucket),
prefix: Some(prefix),
common_prefixes: Some(common_prefixes),
@@ -555,22 +562,18 @@ impl S3 for FS {
} = req.input;
let prefix = prefix.unwrap_or_default();
let delimiter = delimiter.unwrap_or_default();
let max_keys = max_keys.unwrap_or(1000);
let key_marker = key_marker.filter(|v| !v.is_empty());
let version_id_marker = version_id_marker.filter(|v| !v.is_empty());
let delimiter = delimiter.filter(|v| !v.is_empty());
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let object_infos = store
.list_object_versions(
&bucket,
&prefix,
&key_marker.unwrap_or_default(),
&version_id_marker.unwrap_or_default(),
&delimiter,
max_keys,
)
.list_object_versions(&bucket, &prefix, key_marker, version_id_marker, delimiter.clone(), max_keys)
.await
.map_err(to_s3_error)?;
@@ -579,7 +582,7 @@ impl S3 for FS {
.iter()
.filter(|v| !v.name.is_empty())
.map(|v| {
let obj = ObjectVersion {
ObjectVersion {
key: Some(v.name.to_owned()),
last_modified: v.mod_time.map(Timestamp::from),
size: Some(v.size as i64),
@@ -587,9 +590,7 @@ impl S3 for FS {
is_latest: Some(v.is_latest),
e_tag: v.etag.clone(),
..Default::default() // TODO: another fields
};
obj
}
})
.collect();
@@ -604,7 +605,7 @@ impl S3 for FS {
let output = ListObjectVersionsOutput {
// is_truncated: Some(object_infos.is_truncated),
max_keys: Some(key_count),
delimiter: Some(delimiter),
delimiter,
name: Some(bucket),
prefix: Some(prefix),
common_prefixes: Some(common_prefixes),