Compare commits

...

1 Commits

Author SHA1 Message Date
weisd
d2a83505fa feat: mutipart cache 2025-11-17 14:14:57 +08:00
4 changed files with 123 additions and 18 deletions

View File

@@ -5272,6 +5272,7 @@ impl StorageAPI for SetDisks {
if err == DiskError::DiskNotFound {
None
} else if err == DiskError::FileNotFound {
warn!("list_multipart_uploads: FileNotFound");
return Ok(ListMultipartsInfo {
key_marker: key_marker.to_owned(),
max_uploads,

View File

@@ -88,6 +88,8 @@ pub struct ECStore {
pub pool_meta: RwLock<PoolMeta>,
pub rebalance_meta: RwLock<Option<RebalanceMeta>>,
pub decommission_cancelers: Vec<Option<usize>>,
// mpCache: cache for MultipartUploadResult, key is "bucket/object"
pub mp_cache: Arc<RwLock<HashMap<String, MultipartUploadResult>>>,
}
// impl Clone for ECStore {
@@ -240,6 +242,7 @@ impl ECStore {
pool_meta: RwLock::new(pool_meta),
rebalance_meta: RwLock::new(None),
decommission_cancelers,
mp_cache: Arc::new(RwLock::new(HashMap::new())),
});
// Only set it when the global deployment ID is not yet configured
@@ -1763,8 +1766,58 @@ impl StorageAPI for ECStore {
) -> Result<ListMultipartsInfo> {
check_list_multipart_args(bucket, prefix, &key_marker, &upload_id_marker, &delimiter)?;
// Return from cache if prefix is empty (list all multipart uploads for the bucket)
if prefix.is_empty() {
// TODO: return from cache
let cache = self.mp_cache.read().await;
let mut cached_uploads = Vec::new();
let bucket_prefix = format!("{}/", bucket);
for (key, result) in cache.iter() {
if key.starts_with(&bucket_prefix) {
let object = key.strip_prefix(&bucket_prefix).unwrap_or("");
if let Some(key_marker_val) = &key_marker {
if object < key_marker_val.as_str() {
continue;
}
}
if let Some(upload_id_marker_val) = &upload_id_marker {
if object == key_marker.as_deref().unwrap_or("") && result.upload_id < *upload_id_marker_val {
continue;
}
}
cached_uploads.push(MultipartInfo {
bucket: bucket.to_owned(),
object: decode_dir_object(object),
upload_id: result.upload_id.clone(),
initiated: None,
user_defined: HashMap::new(),
});
}
}
// Sort by object name and upload_id
cached_uploads.sort_by(|a, b| match a.object.cmp(&b.object) {
Ordering::Equal => a.upload_id.cmp(&b.upload_id),
other => other,
});
// Apply max_uploads limit
if cached_uploads.len() > max_uploads {
cached_uploads.truncate(max_uploads);
}
if !cached_uploads.is_empty() {
return Ok(ListMultipartsInfo {
key_marker,
upload_id_marker,
max_uploads,
uploads: cached_uploads,
prefix: prefix.to_owned(),
delimiter: delimiter.to_owned(),
..Default::default()
});
}
}
if self.single_pool() {
@@ -1807,8 +1860,15 @@ impl StorageAPI for ECStore {
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult> {
check_new_multipart_args(bucket, object)?;
let encoded_object = encode_dir_object(object);
let cache_key = format!("{}/{}", bucket, encoded_object);
if self.single_pool() {
return self.pools[0].new_multipart_upload(bucket, object, opts).await;
let result = self.pools[0].new_multipart_upload(bucket, &encoded_object, opts).await?;
// Cache the result
let mut cache = self.mp_cache.write().await;
cache.insert(cache_key, result.clone());
return Ok(result);
}
for (idx, pool) in self.pools.iter().enumerate() {
@@ -1816,14 +1876,18 @@ impl StorageAPI for ECStore {
continue;
}
let res = pool
.list_multipart_uploads(bucket, object, None, None, None, MAX_UPLOADS_LIST)
.list_multipart_uploads(bucket, &encoded_object, None, None, None, MAX_UPLOADS_LIST)
.await?;
if !res.uploads.is_empty() {
return self.pools[idx].new_multipart_upload(bucket, object, opts).await;
let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?;
// Cache the result
let mut cache = self.mp_cache.write().await;
cache.insert(cache_key, result.clone());
return Ok(result);
}
}
let idx = self.get_pool_idx(bucket, object, -1).await?;
let idx = self.get_pool_idx(bucket, &encoded_object, -1).await?;
if opts.data_movement && idx == opts.src_pool_idx {
return Err(StorageError::DataMovementOverwriteErr(
bucket.to_owned(),
@@ -1832,7 +1896,11 @@ impl StorageAPI for ECStore {
));
}
self.pools[idx].new_multipart_upload(bucket, object, opts).await
let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?;
// Cache the result
let mut cache = self.mp_cache.write().await;
cache.insert(cache_key, result.clone());
Ok(result)
}
#[instrument(skip(self))]
@@ -1983,8 +2051,19 @@ impl StorageAPI for ECStore {
// TODO: defer DeleteUploadID
let encoded_object = encode_dir_object(object);
let cache_key = format!("{}/{}", bucket, encoded_object);
if self.single_pool() {
return self.pools[0].abort_multipart_upload(bucket, object, upload_id, opts).await;
let result = self.pools[0]
.abort_multipart_upload(bucket, &encoded_object, upload_id, opts)
.await;
// Remove from cache on success
if result.is_ok() {
let mut cache = self.mp_cache.write().await;
cache.remove(&cache_key);
}
return result;
}
for pool in self.pools.iter() {
@@ -1992,8 +2071,13 @@ impl StorageAPI for ECStore {
continue;
}
let err = match pool.abort_multipart_upload(bucket, object, upload_id, opts).await {
Ok(_) => return Ok(()),
let err = match pool.abort_multipart_upload(bucket, &encoded_object, upload_id, opts).await {
Ok(_) => {
// Remove from cache on success
let mut cache = self.mp_cache.write().await;
cache.remove(&cache_key);
return Ok(());
}
Err(err) => {
//
if is_err_invalid_upload_id(&err) { None } else { Some(err) }
@@ -2019,11 +2103,20 @@ impl StorageAPI for ECStore {
) -> Result<ObjectInfo> {
check_complete_multipart_args(bucket, object, upload_id)?;
let encoded_object = encode_dir_object(object);
let cache_key = format!("{}/{}", bucket, encoded_object);
if self.single_pool() {
return self.pools[0]
let result = self.pools[0]
.clone()
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
.complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts, opts)
.await;
// Remove from cache on success
if result.is_ok() {
let mut cache = self.mp_cache.write().await;
cache.remove(&cache_key);
}
return result;
}
for pool in self.pools.iter() {
@@ -2033,10 +2126,15 @@ impl StorageAPI for ECStore {
let pool = pool.clone();
let err = match pool
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts.clone(), opts)
.complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts.clone(), opts)
.await
{
Ok(res) => return Ok(res),
Ok(res) => {
// Remove from cache on success
let mut cache = self.mp_cache.write().await;
cache.remove(&cache_key);
return Ok(res);
}
Err(err) => {
//
if is_err_invalid_upload_id(&err) { None } else { Some(err) }

View File

@@ -3247,6 +3247,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[instrument(level = "debug", skip(self, req))]
async fn list_multipart_uploads(
&self,
req: S3Request<ListMultipartUploadsInput>,
@@ -3275,6 +3276,11 @@ impl S3 for FS {
}
}
warn!(
"List multipart uploads with bucket={}, prefix={}, delimiter={:?}, key_marker={:?}, upload_id_marker={:?}, max_uploads={}",
bucket, prefix, delimiter, key_marker, upload_id_marker, max_uploads
);
let result = store
.list_multipart_uploads(&bucket, &prefix, delimiter, key_marker, upload_id_marker, max_uploads)
.await

View File

@@ -28,8 +28,8 @@ fi
current_dir=$(pwd)
echo "Current directory: $current_dir"
# mkdir -p ./target/volume/test
mkdir -p ./target/volume/test{1..4}
mkdir -p ./target/volume/test
# mkdir -p ./target/volume/test{1..4}
if [ -z "$RUST_LOG" ]; then
@@ -41,8 +41,8 @@ fi
# export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB"
export RUSTFS_VOLUMES="./target/volume/test{1...4}"
# export RUSTFS_VOLUMES="./target/volume/test"
# export RUSTFS_VOLUMES="./target/volume/test{1...4}"
export RUSTFS_VOLUMES="./target/volume/test"
export RUSTFS_ADDRESS=":9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS=":9001"
@@ -77,7 +77,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300
#tokio runtime
export RUSTFS_RUNTIME_WORKER_THREADS=16
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
# shellcheck disable=SC2125
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60