mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Compare commits
1 Commits
1.0.0-alph
...
multipartc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2a83505fa |
@@ -5272,6 +5272,7 @@ impl StorageAPI for SetDisks {
|
||||
if err == DiskError::DiskNotFound {
|
||||
None
|
||||
} else if err == DiskError::FileNotFound {
|
||||
warn!("list_multipart_uploads: FileNotFound");
|
||||
return Ok(ListMultipartsInfo {
|
||||
key_marker: key_marker.to_owned(),
|
||||
max_uploads,
|
||||
|
||||
@@ -88,6 +88,8 @@ pub struct ECStore {
|
||||
pub pool_meta: RwLock<PoolMeta>,
|
||||
pub rebalance_meta: RwLock<Option<RebalanceMeta>>,
|
||||
pub decommission_cancelers: Vec<Option<usize>>,
|
||||
// mpCache: cache for MultipartUploadResult, key is "bucket/object"
|
||||
pub mp_cache: Arc<RwLock<HashMap<String, MultipartUploadResult>>>,
|
||||
}
|
||||
|
||||
// impl Clone for ECStore {
|
||||
@@ -240,6 +242,7 @@ impl ECStore {
|
||||
pool_meta: RwLock::new(pool_meta),
|
||||
rebalance_meta: RwLock::new(None),
|
||||
decommission_cancelers,
|
||||
mp_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
});
|
||||
|
||||
// Only set it when the global deployment ID is not yet configured
|
||||
@@ -1763,8 +1766,58 @@ impl StorageAPI for ECStore {
|
||||
) -> Result<ListMultipartsInfo> {
|
||||
check_list_multipart_args(bucket, prefix, &key_marker, &upload_id_marker, &delimiter)?;
|
||||
|
||||
// Return from cache if prefix is empty (list all multipart uploads for the bucket)
|
||||
if prefix.is_empty() {
|
||||
// TODO: return from cache
|
||||
let cache = self.mp_cache.read().await;
|
||||
let mut cached_uploads = Vec::new();
|
||||
let bucket_prefix = format!("{}/", bucket);
|
||||
|
||||
for (key, result) in cache.iter() {
|
||||
if key.starts_with(&bucket_prefix) {
|
||||
let object = key.strip_prefix(&bucket_prefix).unwrap_or("");
|
||||
if let Some(key_marker_val) = &key_marker {
|
||||
if object < key_marker_val.as_str() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(upload_id_marker_val) = &upload_id_marker {
|
||||
if object == key_marker.as_deref().unwrap_or("") && result.upload_id < *upload_id_marker_val {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
cached_uploads.push(MultipartInfo {
|
||||
bucket: bucket.to_owned(),
|
||||
object: decode_dir_object(object),
|
||||
upload_id: result.upload_id.clone(),
|
||||
initiated: None,
|
||||
user_defined: HashMap::new(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by object name and upload_id
|
||||
cached_uploads.sort_by(|a, b| match a.object.cmp(&b.object) {
|
||||
Ordering::Equal => a.upload_id.cmp(&b.upload_id),
|
||||
other => other,
|
||||
});
|
||||
|
||||
// Apply max_uploads limit
|
||||
if cached_uploads.len() > max_uploads {
|
||||
cached_uploads.truncate(max_uploads);
|
||||
}
|
||||
|
||||
if !cached_uploads.is_empty() {
|
||||
return Ok(ListMultipartsInfo {
|
||||
key_marker,
|
||||
upload_id_marker,
|
||||
max_uploads,
|
||||
uploads: cached_uploads,
|
||||
prefix: prefix.to_owned(),
|
||||
delimiter: delimiter.to_owned(),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if self.single_pool() {
|
||||
@@ -1807,8 +1860,15 @@ impl StorageAPI for ECStore {
|
||||
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult> {
|
||||
check_new_multipart_args(bucket, object)?;
|
||||
|
||||
let encoded_object = encode_dir_object(object);
|
||||
let cache_key = format!("{}/{}", bucket, encoded_object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0].new_multipart_upload(bucket, object, opts).await;
|
||||
let result = self.pools[0].new_multipart_upload(bucket, &encoded_object, opts).await?;
|
||||
// Cache the result
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.insert(cache_key, result.clone());
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
for (idx, pool) in self.pools.iter().enumerate() {
|
||||
@@ -1816,14 +1876,18 @@ impl StorageAPI for ECStore {
|
||||
continue;
|
||||
}
|
||||
let res = pool
|
||||
.list_multipart_uploads(bucket, object, None, None, None, MAX_UPLOADS_LIST)
|
||||
.list_multipart_uploads(bucket, &encoded_object, None, None, None, MAX_UPLOADS_LIST)
|
||||
.await?;
|
||||
|
||||
if !res.uploads.is_empty() {
|
||||
return self.pools[idx].new_multipart_upload(bucket, object, opts).await;
|
||||
let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?;
|
||||
// Cache the result
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.insert(cache_key, result.clone());
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
let idx = self.get_pool_idx(bucket, object, -1).await?;
|
||||
let idx = self.get_pool_idx(bucket, &encoded_object, -1).await?;
|
||||
if opts.data_movement && idx == opts.src_pool_idx {
|
||||
return Err(StorageError::DataMovementOverwriteErr(
|
||||
bucket.to_owned(),
|
||||
@@ -1832,7 +1896,11 @@ impl StorageAPI for ECStore {
|
||||
));
|
||||
}
|
||||
|
||||
self.pools[idx].new_multipart_upload(bucket, object, opts).await
|
||||
let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?;
|
||||
// Cache the result
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.insert(cache_key, result.clone());
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
@@ -1983,8 +2051,19 @@ impl StorageAPI for ECStore {
|
||||
|
||||
// TODO: defer DeleteUploadID
|
||||
|
||||
let encoded_object = encode_dir_object(object);
|
||||
let cache_key = format!("{}/{}", bucket, encoded_object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0].abort_multipart_upload(bucket, object, upload_id, opts).await;
|
||||
let result = self.pools[0]
|
||||
.abort_multipart_upload(bucket, &encoded_object, upload_id, opts)
|
||||
.await;
|
||||
// Remove from cache on success
|
||||
if result.is_ok() {
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
@@ -1992,8 +2071,13 @@ impl StorageAPI for ECStore {
|
||||
continue;
|
||||
}
|
||||
|
||||
let err = match pool.abort_multipart_upload(bucket, object, upload_id, opts).await {
|
||||
Ok(_) => return Ok(()),
|
||||
let err = match pool.abort_multipart_upload(bucket, &encoded_object, upload_id, opts).await {
|
||||
Ok(_) => {
|
||||
// Remove from cache on success
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) => {
|
||||
//
|
||||
if is_err_invalid_upload_id(&err) { None } else { Some(err) }
|
||||
@@ -2019,11 +2103,20 @@ impl StorageAPI for ECStore {
|
||||
) -> Result<ObjectInfo> {
|
||||
check_complete_multipart_args(bucket, object, upload_id)?;
|
||||
|
||||
let encoded_object = encode_dir_object(object);
|
||||
let cache_key = format!("{}/{}", bucket, encoded_object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0]
|
||||
let result = self.pools[0]
|
||||
.clone()
|
||||
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
|
||||
.complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts, opts)
|
||||
.await;
|
||||
// Remove from cache on success
|
||||
if result.is_ok() {
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
@@ -2033,10 +2126,15 @@ impl StorageAPI for ECStore {
|
||||
|
||||
let pool = pool.clone();
|
||||
let err = match pool
|
||||
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts.clone(), opts)
|
||||
.complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts.clone(), opts)
|
||||
.await
|
||||
{
|
||||
Ok(res) => return Ok(res),
|
||||
Ok(res) => {
|
||||
// Remove from cache on success
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
return Ok(res);
|
||||
}
|
||||
Err(err) => {
|
||||
//
|
||||
if is_err_invalid_upload_id(&err) { None } else { Some(err) }
|
||||
|
||||
@@ -3247,6 +3247,7 @@ impl S3 for FS {
|
||||
Ok(S3Response::new(output))
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, req))]
|
||||
async fn list_multipart_uploads(
|
||||
&self,
|
||||
req: S3Request<ListMultipartUploadsInput>,
|
||||
@@ -3275,6 +3276,11 @@ impl S3 for FS {
|
||||
}
|
||||
}
|
||||
|
||||
warn!(
|
||||
"List multipart uploads with bucket={}, prefix={}, delimiter={:?}, key_marker={:?}, upload_id_marker={:?}, max_uploads={}",
|
||||
bucket, prefix, delimiter, key_marker, upload_id_marker, max_uploads
|
||||
);
|
||||
|
||||
let result = store
|
||||
.list_multipart_uploads(&bucket, &prefix, delimiter, key_marker, upload_id_marker, max_uploads)
|
||||
.await
|
||||
|
||||
@@ -28,8 +28,8 @@ fi
|
||||
current_dir=$(pwd)
|
||||
echo "Current directory: $current_dir"
|
||||
|
||||
# mkdir -p ./target/volume/test
|
||||
mkdir -p ./target/volume/test{1..4}
|
||||
mkdir -p ./target/volume/test
|
||||
# mkdir -p ./target/volume/test{1..4}
|
||||
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
@@ -41,8 +41,8 @@ fi
|
||||
|
||||
# export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB"
|
||||
|
||||
export RUSTFS_VOLUMES="./target/volume/test{1...4}"
|
||||
# export RUSTFS_VOLUMES="./target/volume/test"
|
||||
# export RUSTFS_VOLUMES="./target/volume/test{1...4}"
|
||||
export RUSTFS_VOLUMES="./target/volume/test"
|
||||
export RUSTFS_ADDRESS=":9000"
|
||||
export RUSTFS_CONSOLE_ENABLE=true
|
||||
export RUSTFS_CONSOLE_ADDRESS=":9001"
|
||||
@@ -77,7 +77,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300
|
||||
#tokio runtime
|
||||
export RUSTFS_RUNTIME_WORKER_THREADS=16
|
||||
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
|
||||
# shellcheck disable=SC2125
|
||||
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
|
||||
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60
|
||||
|
||||
Reference in New Issue
Block a user