diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index dfb66afe..d60df316 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -5272,6 +5272,7 @@ impl StorageAPI for SetDisks { if err == DiskError::DiskNotFound { None } else if err == DiskError::FileNotFound { + warn!("list_multipart_uploads: FileNotFound"); return Ok(ListMultipartsInfo { key_marker: key_marker.to_owned(), max_uploads, diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index a39cf9e1..70b2e1f0 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -88,6 +88,8 @@ pub struct ECStore { pub pool_meta: RwLock, pub rebalance_meta: RwLock>, pub decommission_cancelers: Vec>, + // mpCache: cache for MultipartUploadResult, key is "bucket/object" + pub mp_cache: Arc>>, } // impl Clone for ECStore { @@ -240,6 +242,7 @@ impl ECStore { pool_meta: RwLock::new(pool_meta), rebalance_meta: RwLock::new(None), decommission_cancelers, + mp_cache: Arc::new(RwLock::new(HashMap::new())), }); // Only set it when the global deployment ID is not yet configured @@ -1763,8 +1766,58 @@ impl StorageAPI for ECStore { ) -> Result { check_list_multipart_args(bucket, prefix, &key_marker, &upload_id_marker, &delimiter)?; + // Return from cache if prefix is empty (list all multipart uploads for the bucket) if prefix.is_empty() { - // TODO: return from cache + let cache = self.mp_cache.read().await; + let mut cached_uploads = Vec::new(); + let bucket_prefix = format!("{}/", bucket); + + for (key, result) in cache.iter() { + if key.starts_with(&bucket_prefix) { + let object = key.strip_prefix(&bucket_prefix).unwrap_or(""); + if let Some(key_marker_val) = &key_marker { + if object < key_marker_val.as_str() { + continue; + } + } + if let Some(upload_id_marker_val) = &upload_id_marker { + if object == key_marker.as_deref().unwrap_or("") && result.upload_id < *upload_id_marker_val { + continue; + } + } + + cached_uploads.push(MultipartInfo { + bucket: bucket.to_owned(), + object: decode_dir_object(object), + upload_id: result.upload_id.clone(), + initiated: None, + user_defined: HashMap::new(), + }); + } + } + + // Sort by object name and upload_id + cached_uploads.sort_by(|a, b| match a.object.cmp(&b.object) { + Ordering::Equal => a.upload_id.cmp(&b.upload_id), + other => other, + }); + + // Apply max_uploads limit + if cached_uploads.len() > max_uploads { + cached_uploads.truncate(max_uploads); + } + + if !cached_uploads.is_empty() { + return Ok(ListMultipartsInfo { + key_marker, + upload_id_marker, + max_uploads, + uploads: cached_uploads, + prefix: prefix.to_owned(), + delimiter: delimiter.to_owned(), + ..Default::default() + }); + } } if self.single_pool() { @@ -1807,8 +1860,15 @@ impl StorageAPI for ECStore { async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { check_new_multipart_args(bucket, object)?; + let encoded_object = encode_dir_object(object); + let cache_key = format!("{}/{}", bucket, encoded_object); + if self.single_pool() { - return self.pools[0].new_multipart_upload(bucket, object, opts).await; + let result = self.pools[0].new_multipart_upload(bucket, &encoded_object, opts).await?; + // Cache the result + let mut cache = self.mp_cache.write().await; + cache.insert(cache_key, result.clone()); + return Ok(result); } for (idx, pool) in self.pools.iter().enumerate() { @@ -1816,14 +1876,18 @@ impl StorageAPI for ECStore { continue; } let res = pool - .list_multipart_uploads(bucket, object, None, None, None, MAX_UPLOADS_LIST) + .list_multipart_uploads(bucket, &encoded_object, None, None, None, MAX_UPLOADS_LIST) .await?; if !res.uploads.is_empty() { - return self.pools[idx].new_multipart_upload(bucket, object, opts).await; + let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?; + // Cache the result + let mut cache = self.mp_cache.write().await; + cache.insert(cache_key, result.clone()); + return Ok(result); } } - let idx = self.get_pool_idx(bucket, object, -1).await?; + let idx = self.get_pool_idx(bucket, &encoded_object, -1).await?; if opts.data_movement && idx == opts.src_pool_idx { return Err(StorageError::DataMovementOverwriteErr( bucket.to_owned(), @@ -1832,7 +1896,11 @@ impl StorageAPI for ECStore { )); } - self.pools[idx].new_multipart_upload(bucket, object, opts).await + let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?; + // Cache the result + let mut cache = self.mp_cache.write().await; + cache.insert(cache_key, result.clone()); + Ok(result) } #[instrument(skip(self))] @@ -1983,8 +2051,19 @@ impl StorageAPI for ECStore { // TODO: defer DeleteUploadID + let encoded_object = encode_dir_object(object); + let cache_key = format!("{}/{}", bucket, encoded_object); + if self.single_pool() { - return self.pools[0].abort_multipart_upload(bucket, object, upload_id, opts).await; + let result = self.pools[0] + .abort_multipart_upload(bucket, &encoded_object, upload_id, opts) + .await; + // Remove from cache on success + if result.is_ok() { + let mut cache = self.mp_cache.write().await; + cache.remove(&cache_key); + } + return result; } for pool in self.pools.iter() { @@ -1992,8 +2071,13 @@ impl StorageAPI for ECStore { continue; } - let err = match pool.abort_multipart_upload(bucket, object, upload_id, opts).await { - Ok(_) => return Ok(()), + let err = match pool.abort_multipart_upload(bucket, &encoded_object, upload_id, opts).await { + Ok(_) => { + // Remove from cache on success + let mut cache = self.mp_cache.write().await; + cache.remove(&cache_key); + return Ok(()); + } Err(err) => { // if is_err_invalid_upload_id(&err) { None } else { Some(err) } @@ -2019,11 +2103,20 @@ impl StorageAPI for ECStore { ) -> Result { check_complete_multipart_args(bucket, object, upload_id)?; + let encoded_object = encode_dir_object(object); + let cache_key = format!("{}/{}", bucket, encoded_object); + if self.single_pool() { - return self.pools[0] + let result = self.pools[0] .clone() - .complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts) + .complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts, opts) .await; + // Remove from cache on success + if result.is_ok() { + let mut cache = self.mp_cache.write().await; + cache.remove(&cache_key); + } + return result; } for pool in self.pools.iter() { @@ -2033,10 +2126,15 @@ impl StorageAPI for ECStore { let pool = pool.clone(); let err = match pool - .complete_multipart_upload(bucket, object, upload_id, uploaded_parts.clone(), opts) + .complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts.clone(), opts) .await { - Ok(res) => return Ok(res), + Ok(res) => { + // Remove from cache on success + let mut cache = self.mp_cache.write().await; + cache.remove(&cache_key); + return Ok(res); + } Err(err) => { // if is_err_invalid_upload_id(&err) { None } else { Some(err) } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 4942308a..82682f09 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -3247,6 +3247,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } + #[instrument(level = "debug", skip(self, req))] async fn list_multipart_uploads( &self, req: S3Request, @@ -3275,6 +3276,11 @@ impl S3 for FS { } } + warn!( + "List multipart uploads with bucket={}, prefix={}, delimiter={:?}, key_marker={:?}, upload_id_marker={:?}, max_uploads={}", + bucket, prefix, delimiter, key_marker, upload_id_marker, max_uploads + ); + let result = store .list_multipart_uploads(&bucket, &prefix, delimiter, key_marker, upload_id_marker, max_uploads) .await diff --git a/scripts/run.sh b/scripts/run.sh index 8e505ff7..aa61f1b7 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -28,8 +28,8 @@ fi current_dir=$(pwd) echo "Current directory: $current_dir" -# mkdir -p ./target/volume/test -mkdir -p ./target/volume/test{1..4} +mkdir -p ./target/volume/test +# mkdir -p ./target/volume/test{1..4} if [ -z "$RUST_LOG" ]; then @@ -41,8 +41,8 @@ fi # export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB" -export RUSTFS_VOLUMES="./target/volume/test{1...4}" -# export RUSTFS_VOLUMES="./target/volume/test" +# export RUSTFS_VOLUMES="./target/volume/test{1...4}" +export RUSTFS_VOLUMES="./target/volume/test" export RUSTFS_ADDRESS=":9000" export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS=":9001" @@ -77,7 +77,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300 #tokio runtime export RUSTFS_RUNTIME_WORKER_THREADS=16 export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024 -export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true +export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false # shellcheck disable=SC2125 export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024 export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60