diff --git a/.docker/alpine/Dockerfile.protoc b/.docker/alpine/Dockerfile.protoc new file mode 100644 index 00000000..cd1f8581 --- /dev/null +++ b/.docker/alpine/Dockerfile.protoc @@ -0,0 +1,40 @@ +FROM alpine:3.18 + +ENV LANG C.UTF-8 + +# Install base dependencies +RUN apk add --no-cache \ + wget \ + git \ + curl \ + unzip \ + gcc \ + musl-dev \ + pkgconfig \ + openssl-dev \ + dbus-dev \ + wayland-dev \ + webkit2gtk-4.1-dev \ + build-base \ + linux-headers + +# install protoc +RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v30.2/protoc-30.2-linux-x86_64.zip \ + && unzip protoc-30.2-linux-x86_64.zip -d protoc3 \ + && mv protoc3/bin/* /usr/local/bin/ && chmod +x /usr/local/bin/protoc \ + && mv protoc3/include/* /usr/local/include/ && rm -rf protoc-30.2-linux-x86_64.zip protoc3 + +# install flatc +RUN wget https://github.com/google/flatbuffers/releases/download/v24.3.25/Linux.flatc.binary.g++-13.zip \ + && unzip Linux.flatc.binary.g++-13.zip \ + && mv flatc /usr/local/bin/ && chmod +x /usr/local/bin/flatc && rm -rf Linux.flatc.binary.g++-13.zip + +# install rust +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + +# Set PATH for rust +ENV PATH="/root/.cargo/bin:${PATH}" + +COPY .docker/cargo.config.toml /root/.cargo/config.toml + +WORKDIR /root/rustfs \ No newline at end of file diff --git a/crates/ecstore/src/cache_value/metacache_set.rs b/crates/ecstore/src/cache_value/metacache_set.rs index 996590b6..47154b6a 100644 --- a/crates/ecstore/src/cache_value/metacache_set.rs +++ b/crates/ecstore/src/cache_value/metacache_set.rs @@ -18,7 +18,7 @@ use futures::future::join_all; use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof}; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{spawn, sync::broadcast::Receiver as B_Receiver}; -use tracing::error; +use tracing::{error, warn}; pub type AgreedFn = Box Pin + Send>> + Send + 'static>; pub type PartialFn = @@ -118,10 +118,14 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - if let Some(disk) = d.clone() { disk } else { + warn!("list_path_raw: fallback disk is none"); break; } } - None => break, + None => { + warn!("list_path_raw: fallback disk is none2"); + break; + } }; match disk .as_ref() diff --git a/crates/ecstore/src/disk/error.rs b/crates/ecstore/src/disk/error.rs index 73718379..b84bcad3 100644 --- a/crates/ecstore/src/disk/error.rs +++ b/crates/ecstore/src/disk/error.rs @@ -288,6 +288,12 @@ impl From for DiskError { } } +impl From for DiskError { + fn from(e: rmp_serde::decode::Error) -> Self { + DiskError::other(e) + } +} + impl From for DiskError { fn from(e: rmp::encode::ValueWriteError) -> Self { DiskError::other(e) diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index eeff61ba..5918c572 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -57,8 +57,8 @@ use bytes::Bytes; use path_absolutize::Absolutize; use rustfs_common::defer; use rustfs_filemeta::{ - Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, Opts, RawFileInfo, UpdateFn, get_file_info, - read_xl_meta_no_data, + Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn, + get_file_info, read_xl_meta_no_data, }; use rustfs_utils::HashAlgorithm; use rustfs_utils::os::get_info; @@ -1312,6 +1312,67 @@ impl DiskAPI for LocalDisk { Ok(resp) } + #[tracing::instrument(skip(self))] + async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result> { + let volume_dir = self.get_bucket_path(bucket)?; + + let mut ret = vec![ObjectPartInfo::default(); paths.len()]; + + for (i, path_str) in paths.iter().enumerate() { + let path = Path::new(path_str); + let file_name = path.file_name().and_then(|v| v.to_str()).unwrap_or_default(); + let num = file_name + .strip_prefix("part.") + .and_then(|v| v.strip_suffix(".meta")) + .and_then(|v| v.parse::().ok()) + .unwrap_or_default(); + + if let Err(err) = access( + volume_dir + .clone() + .join(path.parent().unwrap_or(Path::new("")).join(format!("part.{num}"))), + ) + .await + { + ret[i] = ObjectPartInfo { + number: num, + error: Some(err.to_string()), + ..Default::default() + }; + continue; + } + + let data = match self + .read_all_data(bucket, volume_dir.clone(), volume_dir.clone().join(path)) + .await + { + Ok(data) => data, + Err(err) => { + ret[i] = ObjectPartInfo { + number: num, + error: Some(err.to_string()), + ..Default::default() + }; + continue; + } + }; + + match ObjectPartInfo::unmarshal(&data) { + Ok(meta) => { + ret[i] = meta; + } + Err(err) => { + ret[i] = ObjectPartInfo { + number: num, + error: Some(err.to_string()), + ..Default::default() + }; + } + }; + } + + Ok(ret) + } #[tracing::instrument(skip(self))] async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { let volume_dir = self.get_bucket_path(volume)?; diff --git a/crates/ecstore/src/disk/mod.rs b/crates/ecstore/src/disk/mod.rs index 617389ec..1f918ee3 100644 --- a/crates/ecstore/src/disk/mod.rs +++ b/crates/ecstore/src/disk/mod.rs @@ -41,7 +41,7 @@ use endpoint::Endpoint; use error::DiskError; use error::{Error, Result}; use local::LocalDisk; -use rustfs_filemeta::{FileInfo, RawFileInfo}; +use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo}; use rustfs_madmin::info_commands::DiskMetrics; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, path::PathBuf, sync::Arc}; @@ -331,6 +331,14 @@ impl DiskAPI for Disk { } } + #[tracing::instrument(skip(self))] + async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result> { + match self { + Disk::Local(local_disk) => local_disk.read_parts(bucket, paths).await, + Disk::Remote(remote_disk) => remote_disk.read_parts(bucket, paths).await, + } + } + #[tracing::instrument(skip(self))] async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Bytes) -> Result<()> { match self { @@ -513,7 +521,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { // CheckParts async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result; // StatInfoFile - // ReadParts + async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result>; async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; // CleanAbandonedData async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>; diff --git a/crates/ecstore/src/rpc/remote_disk.rs b/crates/ecstore/src/rpc/remote_disk.rs index 161c3a76..e45c9e63 100644 --- a/crates/ecstore/src/rpc/remote_disk.rs +++ b/crates/ecstore/src/rpc/remote_disk.rs @@ -22,8 +22,8 @@ use rustfs_protos::{ proto_gen::node_service::{ CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest, - ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest, - StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, + ReadAllRequest, ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, + RenameFileRequest, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, }, }; @@ -44,7 +44,7 @@ use crate::{ heal_commands::{HealScanMode, HealingTracker}, }, }; -use rustfs_filemeta::{FileInfo, RawFileInfo}; +use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo}; use rustfs_protos::proto_gen::node_service::RenamePartRequest; use rustfs_rio::{HttpReader, HttpWriter}; use tokio::{ @@ -790,6 +790,27 @@ impl DiskAPI for RemoteDisk { Ok(check_parts_resp) } + #[tracing::instrument(skip(self))] + async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result> { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ReadPartsRequest { + disk: self.endpoint.to_string(), + bucket: bucket.to_string(), + paths: paths.to_vec(), + }); + + let response = client.read_parts(request).await?.into_inner(); + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + let read_parts_resp = rmp_serde::from_slice::>(&response.object_part_infos)?; + + Ok(read_parts_resp) + } + #[tracing::instrument(skip(self))] async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { info!("check_parts"); diff --git a/crates/ecstore/src/rpc/tonic_service.rs b/crates/ecstore/src/rpc/tonic_service.rs index faaa3726..93a50e41 100644 --- a/crates/ecstore/src/rpc/tonic_service.rs +++ b/crates/ecstore/src/rpc/tonic_service.rs @@ -404,7 +404,42 @@ impl Node for NodeService { })) } } + async fn read_parts(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.read_parts(&request.bucket, &request.paths).await { + Ok(data) => { + let data = match rmp_serde::to_vec(&data) { + Ok(data) => data, + Err(err) => { + return Ok(tonic::Response::new(ReadPartsResponse { + success: false, + object_part_infos: Bytes::new(), + error: Some(DiskError::other(format!("encode data failed: {err}")).into()), + })); + } + }; + Ok(tonic::Response::new(ReadPartsResponse { + success: true, + object_part_infos: Bytes::copy_from_slice(&data), + error: None, + })) + } + Err(err) => Ok(tonic::Response::new(ReadPartsResponse { + success: false, + object_part_infos: Bytes::new(), + error: Some(err.into()), + })), + } + } else { + Ok(tonic::Response::new(ReadPartsResponse { + success: false, + object_part_infos: Bytes::new(), + error: Some(DiskError::other("can not find disk".to_string()).into()), + })) + } + } async fn check_parts(&self, request: Request) -> Result, Status> { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 60a022a1..8b78555c 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -24,13 +24,13 @@ use crate::disk::{ }; use crate::erasure_coding; use crate::erasure_coding::bitrot_verify; -use crate::error::ObjectApiError; use crate::error::{Error, Result}; +use crate::error::{ObjectApiError, is_err_object_not_found}; use crate::global::GLOBAL_MRFState; use crate::global::{GLOBAL_LocalNodeName, GLOBAL_TierConfigMgr}; use crate::heal::data_usage_cache::DataUsageCache; use crate::heal::heal_ops::{HealEntryFn, HealSequence}; -use crate::store_api::ObjectToDelete; +use crate::store_api::{ListPartsInfo, ObjectToDelete}; use crate::{ bucket::lifecycle::bucket_lifecycle_ops::{gen_transition_objname, get_transitioned_object_reader, put_restore_opts}, cache_value::metacache_set::{ListPathRawOptions, list_path_raw}, @@ -119,6 +119,7 @@ use tracing::{debug, info, warn}; use uuid::Uuid; pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024; +pub const MAX_PARTS_COUNT: usize = 10000; #[derive(Debug, Clone)] pub struct SetDisks { @@ -316,6 +317,9 @@ impl SetDisks { .filter(|v| v.as_ref().is_some_and(|d| d.is_local())) .collect() } + fn default_read_quorum(&self) -> usize { + self.set_drive_count - self.default_parity_count + } fn default_write_quorum(&self) -> usize { let mut data_count = self.set_drive_count - self.default_parity_count; if data_count == self.default_parity_count { @@ -550,6 +554,183 @@ impl SetDisks { } } + async fn read_parts( + disks: &[Option], + bucket: &str, + part_meta_paths: &[String], + part_numbers: &[usize], + read_quorum: usize, + ) -> disk::error::Result> { + let mut futures = Vec::with_capacity(disks.len()); + for (i, disk) in disks.iter().enumerate() { + futures.push(async move { + if let Some(disk) = disk { + disk.read_parts(bucket, part_meta_paths).await + } else { + Err(DiskError::DiskNotFound) + } + }); + } + + let mut errs = Vec::with_capacity(disks.len()); + let mut object_parts = Vec::with_capacity(disks.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(res) => { + errs.push(None); + object_parts.push(res); + } + Err(e) => { + errs.push(Some(e)); + object_parts.push(vec![]); + } + } + } + + if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum) { + return Err(err); + } + + let mut ret = vec![ObjectPartInfo::default(); part_meta_paths.len()]; + + for (part_idx, part_info) in part_meta_paths.iter().enumerate() { + let mut part_meta_quorum = HashMap::new(); + let mut part_infos = Vec::new(); + for (j, parts) in object_parts.iter().enumerate() { + if parts.len() != part_meta_paths.len() { + *part_meta_quorum.entry(part_info.clone()).or_insert(0) += 1; + continue; + } + + if !parts[part_idx].etag.is_empty() { + *part_meta_quorum.entry(parts[part_idx].etag.clone()).or_insert(0) += 1; + part_infos.push(parts[part_idx].clone()); + continue; + } + + *part_meta_quorum.entry(part_info.clone()).or_insert(0) += 1; + } + + let mut max_quorum = 0; + let mut max_etag = None; + let mut max_part_meta = None; + for (etag, quorum) in part_meta_quorum.iter() { + if quorum > &max_quorum { + max_quorum = *quorum; + max_etag = Some(etag); + max_part_meta = Some(etag); + } + } + + let mut found = None; + for info in part_infos.iter() { + if let Some(etag) = max_etag + && info.etag == *etag + { + found = Some(info.clone()); + break; + } + + if let Some(part_meta) = max_part_meta + && info.etag.is_empty() + && part_meta.ends_with(format!("part.{0}.meta", info.number).as_str()) + { + found = Some(info.clone()); + break; + } + } + + if let (Some(found), Some(max_etag)) = (found, max_etag) + && !found.etag.is_empty() + && part_meta_quorum.get(max_etag).unwrap_or(&0) >= &read_quorum + { + ret[part_idx] = found; + } else { + ret[part_idx] = ObjectPartInfo { + number: part_numbers[part_idx], + error: Some(format!("part.{} not found", part_numbers[part_idx])), + ..Default::default() + }; + } + } + + Ok(ret) + } + + async fn list_parts(disks: &[Option], part_path: &str, read_quorum: usize) -> disk::error::Result> { + let mut futures = Vec::with_capacity(disks.len()); + for (i, disk) in disks.iter().enumerate() { + futures.push(async move { + if let Some(disk) = disk { + disk.list_dir(RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_MULTIPART_BUCKET, part_path, -1) + .await + } else { + Err(DiskError::DiskNotFound) + } + }); + } + + let mut errs = Vec::with_capacity(disks.len()); + let mut object_parts = Vec::with_capacity(disks.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(res) => { + errs.push(None); + object_parts.push(res); + } + Err(e) => { + errs.push(Some(e)); + object_parts.push(vec![]); + } + } + } + + if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum) { + return Err(err); + } + + let mut part_quorum_map: HashMap = HashMap::new(); + + for drive_parts in object_parts { + let mut parts_with_meta_count: HashMap = HashMap::new(); + + // part files can be either part.N or part.N.meta + for part_path in drive_parts { + if let Some(num_str) = part_path.strip_prefix("part.") { + if let Some(meta_idx) = num_str.find(".meta") { + if let Ok(part_num) = num_str[..meta_idx].parse::() { + *parts_with_meta_count.entry(part_num).or_insert(0) += 1; + } + } else if let Ok(part_num) = num_str.parse::() { + *parts_with_meta_count.entry(part_num).or_insert(0) += 1; + } + } + } + + // Include only part.N.meta files with corresponding part.N + for (&part_num, &cnt) in &parts_with_meta_count { + if cnt >= 2 { + *part_quorum_map.entry(part_num).or_insert(0) += 1; + } + } + } + + let mut part_numbers = Vec::with_capacity(part_quorum_map.len()); + for (part_num, count) in part_quorum_map { + if count >= read_quorum { + part_numbers.push(part_num); + } + } + + part_numbers.sort(); + + Ok(part_numbers) + } + #[tracing::instrument(skip(disks, meta))] async fn rename_part( disks: &[Option], @@ -4884,7 +5065,7 @@ impl StorageAPI for SetDisks { ) -> Result { let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id); - let (mut fi, _) = self.check_upload_id_exists(bucket, object, upload_id, true).await?; + let (fi, _) = self.check_upload_id_exists(bucket, object, upload_id, true).await?; let write_quorum = fi.write_quorum(self.default_write_quorum()); @@ -5037,9 +5218,9 @@ impl StorageAPI for SetDisks { // debug!("put_object_part part_info {:?}", part_info); - fi.parts = vec![part_info]; + // fi.parts = vec![part_info.clone()]; - let fi_buff = fi.marshal_msg()?; + let part_info_buff = part_info.marshal_msg()?; drop(writers); // drop writers to close all files @@ -5050,7 +5231,7 @@ impl StorageAPI for SetDisks { &tmp_part_path, RUSTFS_META_MULTIPART_BUCKET, &part_path, - fi_buff.into(), + part_info_buff.into(), write_quorum, ) .await?; @@ -5068,6 +5249,123 @@ impl StorageAPI for SetDisks { Ok(ret) } + #[tracing::instrument(skip(self))] + async fn list_object_parts( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_number_marker: Option, + mut max_parts: usize, + opts: &ObjectOptions, + ) -> Result { + let (fi, _) = self.check_upload_id_exists(bucket, object, upload_id, false).await?; + + let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id); + + if max_parts > MAX_PARTS_COUNT { + max_parts = MAX_PARTS_COUNT; + } + + let part_number_marker = part_number_marker.unwrap_or_default(); + + let mut ret = ListPartsInfo { + bucket: bucket.to_owned(), + object: object.to_owned(), + upload_id: upload_id.to_owned(), + max_parts, + part_number_marker, + user_defined: fi.metadata.clone(), + ..Default::default() + }; + + if max_parts == 0 { + return Ok(ret); + } + + let online_disks = self.get_disks_internal().await; + + let read_quorum = fi.read_quorum(self.default_read_quorum()); + + let part_path = format!( + "{}{}", + path_join_buf(&[ + &upload_id_path, + fi.data_dir.map(|v| v.to_string()).unwrap_or_default().as_str(), + ]), + SLASH_SEPARATOR + ); + + let mut part_numbers = match Self::list_parts(&online_disks, &part_path, read_quorum).await { + Ok(parts) => parts, + Err(err) => { + if err == DiskError::FileNotFound { + return Ok(ret); + } + + return Err(to_object_err(err.into(), vec![bucket, object])); + } + }; + + if part_numbers.is_empty() { + return Ok(ret); + } + let start_op = part_numbers.iter().find(|&&v| v != 0 && v == part_number_marker); + if part_number_marker > 0 && start_op.is_none() { + return Ok(ret); + } + + if let Some(start) = start_op { + if start + 1 > part_numbers.len() { + return Ok(ret); + } + + part_numbers = part_numbers[start + 1..].to_vec(); + } + + let mut parts = Vec::with_capacity(part_numbers.len()); + + let part_meta_paths = part_numbers + .iter() + .map(|v| format!("{part_path}part.{v}.meta")) + .collect::>(); + + let object_parts = + Self::read_parts(&online_disks, RUSTFS_META_MULTIPART_BUCKET, &part_meta_paths, &part_numbers, read_quorum) + .await + .map_err(|e| to_object_err(e.into(), vec![bucket, object, upload_id]))?; + + let mut count = max_parts; + + for (i, part) in object_parts.iter().enumerate() { + if let Some(err) = &part.error { + warn!("list_object_parts part error: {:?}", &err); + } + + parts.push(PartInfo { + etag: Some(part.etag.clone()), + part_num: part.number, + last_mod: part.mod_time, + size: part.size, + actual_size: part.actual_size, + }); + + count -= 1; + if count == 0 { + break; + } + } + + ret.parts = parts; + + if object_parts.len() > ret.parts.len() { + ret.is_truncated = true; + ret.next_part_number_marker = ret.parts.last().map(|v| v.part_num).unwrap_or_default(); + } + + Ok(ret) + } + #[tracing::instrument(skip(self))] async fn list_multipart_uploads( &self, @@ -5143,8 +5441,8 @@ impl StorageAPI for SetDisks { let splits: Vec<&str> = upload_id.split("x").collect(); if splits.len() == 2 { - if let Ok(unix) = splits[1].parse::() { - OffsetDateTime::from_unix_timestamp(unix)? + if let Ok(unix) = splits[1].parse::() { + OffsetDateTime::from_unix_timestamp_nanos(unix)? } else { now } @@ -5363,49 +5661,31 @@ impl StorageAPI for SetDisks { let part_path = format!("{}/{}/", upload_id_path, fi.data_dir.unwrap_or(Uuid::nil())); - let files: Vec = uploaded_parts.iter().map(|v| format!("part.{}.meta", v.part_num)).collect(); + let part_meta_paths = uploaded_parts + .iter() + .map(|v| format!("{part_path}part.{0}.meta", v.part_num)) + .collect::>(); - // readMultipleFiles + let part_numbers = uploaded_parts.iter().map(|v| v.part_num).collect::>(); - let req = ReadMultipleReq { - bucket: RUSTFS_META_MULTIPART_BUCKET.to_string(), - prefix: part_path, - files, - max_size: 1 << 20, - metadata_only: true, - abort404: true, - max_results: 0, - }; + let object_parts = + Self::read_parts(&disks, RUSTFS_META_MULTIPART_BUCKET, &part_meta_paths, &part_numbers, write_quorum).await?; - let part_files_resp = Self::read_multiple_files(&disks, req, write_quorum).await; - - if part_files_resp.len() != uploaded_parts.len() { + if object_parts.len() != uploaded_parts.len() { return Err(Error::other("part result number err")); } - for (i, res) in part_files_resp.iter().enumerate() { - let part_id = uploaded_parts[i].part_num; - if !res.error.is_empty() || !res.exists { - error!("complete_multipart_upload part_id err {:?}, exists={}", res, res.exists); - return Err(Error::InvalidPart(part_id, bucket.to_owned(), object.to_owned())); + for (i, part) in object_parts.iter().enumerate() { + if let Some(err) = &part.error { + error!("complete_multipart_upload part error: {:?}", &err); } - let part_fi = FileInfo::unmarshal(&res.data).map_err(|e| { + if uploaded_parts[i].part_num != part.number { error!( - "complete_multipart_upload FileInfo::unmarshal err {:?}, part_id={}, bucket={}, object={}", - e, part_id, bucket, object + "complete_multipart_upload part_id err part_id != part_num {} != {}", + uploaded_parts[i].part_num, part.number ); - Error::InvalidPart(part_id, bucket.to_owned(), object.to_owned()) - })?; - let part = &part_fi.parts[0]; - let part_num = part.number; - - // debug!("complete part {} file info {:?}", part_num, &part_fi); - // debug!("complete part {} object info {:?}", part_num, &part); - - if part_id != part_num { - error!("complete_multipart_upload part_id err part_id != part_num {} != {}", part_id, part_num); - return Err(Error::InvalidPart(part_id, bucket.to_owned(), object.to_owned())); + return Err(Error::InvalidPart(uploaded_parts[i].part_num, bucket.to_owned(), object.to_owned())); } fi.add_object_part( diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 28109afc..28d2a75d 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -17,6 +17,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::disk::error_reduce::count_errs; use crate::error::{Error, Result}; +use crate::store_api::ListPartsInfo; use crate::{ disk::{ DiskAPI, DiskInfo, DiskOption, DiskStore, @@ -619,6 +620,20 @@ impl StorageAPI for Sets { Ok((del_objects, del_errs)) } + async fn list_object_parts( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_number_marker: Option, + max_parts: usize, + opts: &ObjectOptions, + ) -> Result { + self.get_disks_by_key(object) + .list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts) + .await + } + #[tracing::instrument(skip(self))] async fn list_multipart_uploads( &self, diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 9b06a4f9..811304d0 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -38,7 +38,7 @@ use crate::new_object_layer_fn; use crate::notification_sys::get_global_notification_sys; use crate::pools::PoolMeta; use crate::rebalance::RebalanceMeta; -use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO}; +use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, ListPartsInfo, MultipartInfo, ObjectIO}; use crate::store_init::{check_disk_fatal_errs, ec_drives_no_config}; use crate::{ bucket::{lifecycle::bucket_lifecycle_ops::TransitionState, metadata::BucketMetadata}, @@ -1810,6 +1810,47 @@ impl StorageAPI for ECStore { Ok((del_objects, del_errs)) } + #[tracing::instrument(skip(self))] + async fn list_object_parts( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_number_marker: Option, + max_parts: usize, + opts: &ObjectOptions, + ) -> Result { + check_list_parts_args(bucket, object, upload_id)?; + + // TODO: nslock + + if self.single_pool() { + return self.pools[0] + .list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts) + .await; + } + + for pool in self.pools.iter() { + if self.is_suspended(pool.pool_idx).await { + continue; + } + match pool + .list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts) + .await + { + Ok(res) => return Ok(res), + Err(err) => { + if is_err_invalid_upload_id(&err) { + continue; + } + return Err(err); + } + }; + } + + Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned())) + } + #[tracing::instrument(skip(self))] async fn list_multipart_uploads( &self, diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index a5f2add9..87d8bf9c 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -548,6 +548,7 @@ impl ObjectInfo { mod_time: part.mod_time, checksums: part.checksums.clone(), number: part.number, + error: part.error.clone(), }) .collect(); @@ -844,6 +845,48 @@ pub struct ListMultipartsInfo { // encoding_type: String, // Not supported yet. } +/// ListPartsInfo - represents list of all parts. +#[derive(Debug, Clone, Default)] +pub struct ListPartsInfo { + /// Name of the bucket. + pub bucket: String, + + /// Name of the object. + pub object: String, + + /// Upload ID identifying the multipart upload whose parts are being listed. + pub upload_id: String, + + /// The class of storage used to store the object. + pub storage_class: String, + + /// Part number after which listing begins. + pub part_number_marker: usize, + + /// When a list is truncated, this element specifies the last part in the list, + /// as well as the value to use for the part-number-marker request parameter + /// in a subsequent request. + pub next_part_number_marker: usize, + + /// Maximum number of parts that were allowed in the response. + pub max_parts: usize, + + /// Indicates whether the returned list of parts is truncated. + pub is_truncated: bool, + + /// List of all parts. + pub parts: Vec, + + /// Any metadata set during InitMultipartUpload, including encryption headers. + pub user_defined: HashMap, + + /// ChecksumAlgorithm if set + pub checksum_algorithm: String, + + /// ChecksumType if set + pub checksum_type: String, +} + #[derive(Debug, Default, Clone)] pub struct ObjectToDelete { pub object_name: String, @@ -923,10 +966,7 @@ pub trait StorageAPI: ObjectIO { ) -> Result; // Walk TODO: - // GetObjectNInfo ObjectIO async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; - // PutObject ObjectIO - // CopyObject async fn copy_object( &self, src_bucket: &str, @@ -949,7 +989,6 @@ pub trait StorageAPI: ObjectIO { // TransitionObject TODO: // RestoreTransitionedObject TODO: - // ListMultipartUploads async fn list_multipart_uploads( &self, bucket: &str, @@ -960,7 +999,6 @@ pub trait StorageAPI: ObjectIO { max_uploads: usize, ) -> Result; async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; - // CopyObjectPart async fn copy_object_part( &self, src_bucket: &str, @@ -984,7 +1022,6 @@ pub trait StorageAPI: ObjectIO { data: &mut PutObjReader, opts: &ObjectOptions, ) -> Result; - // GetMultipartInfo async fn get_multipart_info( &self, bucket: &str, @@ -992,7 +1029,15 @@ pub trait StorageAPI: ObjectIO { upload_id: &str, opts: &ObjectOptions, ) -> Result; - // ListObjectParts + async fn list_object_parts( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_number_marker: Option, + max_parts: usize, + opts: &ObjectOptions, + ) -> Result; async fn abort_multipart_upload(&self, bucket: &str, object: &str, upload_id: &str, opts: &ObjectOptions) -> Result<()>; async fn complete_multipart_upload( self: Arc, @@ -1002,13 +1047,10 @@ pub trait StorageAPI: ObjectIO { uploaded_parts: Vec, opts: &ObjectOptions, ) -> Result; - // GetDisks async fn get_disks(&self, pool_idx: usize, set_idx: usize) -> Result>>; - // SetDriveCounts fn set_drive_counts(&self) -> Vec; // Health TODO: - // PutObjectMetadata async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; // DecomTieredObject async fn get_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; diff --git a/crates/filemeta/src/fileinfo.rs b/crates/filemeta/src/fileinfo.rs index 9c9f0fea..d2aea4cf 100644 --- a/crates/filemeta/src/fileinfo.rs +++ b/crates/filemeta/src/fileinfo.rs @@ -46,6 +46,20 @@ pub struct ObjectPartInfo { pub index: Option, // Checksums holds checksums of the part pub checksums: Option>, + pub error: Option, +} + +impl ObjectPartInfo { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + self.serialize(&mut Serializer::new(&mut buf))?; + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: ObjectPartInfo = rmp_serde::from_slice(buf)?; + Ok(t) + } } #[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)] @@ -287,6 +301,7 @@ impl FileInfo { actual_size, index, checksums: None, + error: None, }; for p in self.parts.iter_mut() { diff --git a/crates/protos/src/generated/flatbuffers_generated/mod.rs b/crates/protos/src/generated/flatbuffers_generated/mod.rs index ee15f5c7..c446ac88 100644 --- a/crates/protos/src/generated/flatbuffers_generated/mod.rs +++ b/crates/protos/src/generated/flatbuffers_generated/mod.rs @@ -1,15 +1 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - pub mod models; diff --git a/crates/protos/src/generated/flatbuffers_generated/models.rs b/crates/protos/src/generated/flatbuffers_generated/models.rs index 9d1d3d7c..d55f1a98 100644 --- a/crates/protos/src/generated/flatbuffers_generated/models.rs +++ b/crates/protos/src/generated/flatbuffers_generated/models.rs @@ -1,17 +1,3 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - // automatically generated by the FlatBuffers compiler, do not modify // @generated diff --git a/crates/protos/src/generated/mod.rs b/crates/protos/src/generated/mod.rs index 9866676a..4ab5a438 100644 --- a/crates/protos/src/generated/mod.rs +++ b/crates/protos/src/generated/mod.rs @@ -1,17 +1,3 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - #![allow(unused_imports)] #![allow(clippy::all)] pub mod proto_gen; diff --git a/crates/protos/src/generated/proto_gen/mod.rs b/crates/protos/src/generated/proto_gen/mod.rs index fd0a9626..35d3fe1b 100644 --- a/crates/protos/src/generated/proto_gen/mod.rs +++ b/crates/protos/src/generated/proto_gen/mod.rs @@ -1,15 +1 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - pub mod node_service; diff --git a/crates/protos/src/generated/proto_gen/node_service.rs b/crates/protos/src/generated/proto_gen/node_service.rs index f7e54d2e..44f13379 100644 --- a/crates/protos/src/generated/proto_gen/node_service.rs +++ b/crates/protos/src/generated/proto_gen/node_service.rs @@ -1,17 +1,3 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - // This file is @generated by prost-build. /// -------------------------------------------------------------------- #[derive(Clone, PartialEq, ::prost::Message)] @@ -184,6 +170,24 @@ pub struct VerifyFileResponse { pub error: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadPartsRequest { + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub bucket: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "3")] + pub paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadPartsResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(bytes = "bytes", tag = "2")] + pub object_part_infos: ::prost::bytes::Bytes, + #[prost(message, optional, tag = "3")] + pub error: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CheckPartsRequest { /// indicate which one in the disks #[prost(string, tag = "1")] @@ -1295,6 +1299,21 @@ pub mod node_service_client { .insert(GrpcMethod::new("node_service.NodeService", "VerifyFile")); self.inner.unary(req, path, codec).await } + pub async fn read_parts( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadParts"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ReadParts")); + self.inner.unary(req, path, codec).await + } pub async fn check_parts( &mut self, request: impl tonic::IntoRequest, @@ -2338,6 +2357,10 @@ pub mod node_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn read_parts( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; async fn check_parts( &self, request: tonic::Request, @@ -2972,6 +2995,34 @@ pub mod node_service_server { }; Box::pin(fut) } + "/node_service.NodeService/ReadParts" => { + #[allow(non_camel_case_types)] + struct ReadPartsSvc(pub Arc); + impl tonic::server::UnaryService for ReadPartsSvc { + type Response = super::ReadPartsResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::read_parts(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadPartsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/node_service.NodeService/CheckParts" => { #[allow(non_camel_case_types)] struct CheckPartsSvc(pub Arc); diff --git a/crates/protos/src/main.rs b/crates/protos/src/main.rs index 5e48c4d5..889f2862 100644 --- a/crates/protos/src/main.rs +++ b/crates/protos/src/main.rs @@ -45,7 +45,7 @@ fn main() -> Result<(), AnyError> { } // path of proto file - let project_root_dir = env::current_dir()?.join(""); + let project_root_dir = env::current_dir()?.join("crates/protos/src"); let proto_dir = project_root_dir.clone(); let proto_files = &["node.proto"]; let proto_out_dir = project_root_dir.join("generated").join("proto_gen"); @@ -268,7 +268,7 @@ fn protobuf_compiler_version() -> Result { } fn fmt() { - let output = Command::new("cargo").arg("fmt").arg("-p").arg("protos").status(); + let output = Command::new("cargo").arg("fmt").arg("-p").arg("rustfs-protos").status(); match output { Ok(status) => { diff --git a/crates/protos/src/node.proto b/crates/protos/src/node.proto index 7e61b331..b375f37b 100644 --- a/crates/protos/src/node.proto +++ b/crates/protos/src/node.proto @@ -130,6 +130,18 @@ message VerifyFileResponse { optional Error error = 3; } +message ReadPartsRequest { + string disk = 1; + string bucket = 2; + repeated string paths = 3; +} + +message ReadPartsResponse { + bool success = 1; + bytes object_part_infos = 2; + optional Error error = 3; +} + message CheckPartsRequest { string disk = 1; // indicate which one in the disks string volume = 2; @@ -768,6 +780,7 @@ service NodeService { rpc WriteAll(WriteAllRequest) returns (WriteAllResponse) {}; rpc Delete(DeleteRequest) returns (DeleteResponse) {}; rpc VerifyFile(VerifyFileRequest) returns (VerifyFileResponse) {}; + rpc ReadParts(ReadPartsRequest) returns (ReadPartsResponse) {}; rpc CheckParts(CheckPartsRequest) returns (CheckPartsResponse) {}; rpc RenamePart(RenamePartRequest) returns (RenamePartResponse) {}; rpc RenameFile(RenameFileRequest) returns (RenameFileResponse) {}; diff --git a/rustfs/src/admin/test.json b/rustfs/src/admin/test.json deleted file mode 100644 index 4becb978..00000000 --- a/rustfs/src/admin/test.json +++ /dev/null @@ -1,232 +0,0 @@ -{ - "mode": "online", - "domain": null, - "region": null, - "sqsARN": null, - "deploymentID": null, - "buckets": { - "count": 0, - "error": null - }, - "objects": { - "count": 0, - "error": null - }, - "versions": { - "count": 0, - "error": null - }, - "deletemarkers": { - "count": 0, - "error": null - }, - "usage": { - "size": 0, - "error": null - }, - "services": { - "kms": null, - "kmsStatus": null, - "ldap": null, - "logger": null, - "audit": null, - "notifications": null - }, - "backend": { - "backendType": "Erasure", - "onlineDisks": 5, - "offlineDisks": 0, - "standardSCParity": 2, - "rrSCParity": 1, - "totalSets": [ - 1 - ], - "totalDrivesPerSet": [ - 5 - ] - }, - "servers": [ - { - "state": "online", - "endpoint": "127.0.0.1:9000", - "scheme": "", - "uptime": 1736146443, - "version": "", - "commitID": "", - "network": { - "127.0.0.1:9000": "online" - }, - "drives": [ - { - "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0", - "root_disk": true, - "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0", - "healing": false, - "scanning": true, - "state": "ok", - "uuid": "", - "major": 0, - "minor": 0, - "model": null, - "total_space": 494384795648, - "used_space": 283710812160, - "available_space": 210673983488, - "read_throughput": 0.0, - "write_throughput": 0.0, - "read_latency": 0.0, - "write_latency": 0.0, - "utilization": 57.386637828967736, - "metrics": null, - "heal_info": null, - "used_inodes": 2353357, - "free_inodes": 2057363120, - "local": true, - "pool_index": 0, - "set_index": 0, - "disk_index": 0 - }, - { - "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1", - "root_disk": true, - "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1", - "healing": false, - "scanning": true, - "state": "ok", - "uuid": "", - "major": 0, - "minor": 0, - "model": null, - "total_space": 494384795648, - "used_space": 283710812160, - "available_space": 210673983488, - "read_throughput": 0.0, - "write_throughput": 0.0, - "read_latency": 0.0, - "write_latency": 0.0, - "utilization": 57.386637828967736, - "metrics": null, - "heal_info": null, - "used_inodes": 2353357, - "free_inodes": 2057363120, - "local": true, - "pool_index": 0, - "set_index": 0, - "disk_index": 1 - }, - { - "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2", - "root_disk": true, - "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2", - "healing": false, - "scanning": false, - "state": "ok", - "uuid": "", - "major": 0, - "minor": 0, - "model": null, - "total_space": 494384795648, - "used_space": 283710812160, - "available_space": 210673983488, - "read_throughput": 0.0, - "write_throughput": 0.0, - "read_latency": 0.0, - "write_latency": 0.0, - "utilization": 57.386637828967736, - "metrics": null, - "heal_info": null, - "used_inodes": 2353357, - "free_inodes": 2057363120, - "local": true, - "pool_index": 0, - "set_index": 0, - "disk_index": 2 - }, - { - "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3", - "root_disk": true, - "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3", - "healing": false, - "scanning": false, - "state": "ok", - "uuid": "", - "major": 0, - "minor": 0, - "model": null, - "total_space": 494384795648, - "used_space": 283710812160, - "available_space": 210673983488, - "read_throughput": 0.0, - "write_throughput": 0.0, - "read_latency": 0.0, - "write_latency": 0.0, - "utilization": 57.386637828967736, - "metrics": null, - "heal_info": null, - "used_inodes": 2353357, - "free_inodes": 2057363120, - "local": true, - "pool_index": 0, - "set_index": 0, - "disk_index": 3 - }, - { - "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4", - "root_disk": true, - "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4", - "healing": false, - "scanning": false, - "state": "ok", - "uuid": "", - "major": 0, - "minor": 0, - "model": null, - "total_space": 494384795648, - "used_space": 283710812160, - "available_space": 210673983488, - "read_throughput": 0.0, - "write_throughput": 0.0, - "read_latency": 0.0, - "write_latency": 0.0, - "utilization": 57.386637828967736, - "metrics": null, - "heal_info": null, - "used_inodes": 2353357, - "free_inodes": 2057363120, - "local": true, - "pool_index": 0, - "set_index": 0, - "disk_index": 4 - } - ], - "poolNumber": 1, - "poolNumbers": [ - 1 - ], - "mem_stats": { - "alloc": 0, - "total_alloc": 0, - "mallocs": 0, - "frees": 0, - "heap_alloc": 0 - }, - "max_procs": 0, - "num_cpu": 0, - "runtime_version": "", - "rustfs_env_vars": {} - } - ], - "pools": { - "0": { - "0": { - "id": 0, - "rawUsage": 1418554060800, - "rawCapacity": 2471923978240, - "usage": 0, - "objectsCount": 0, - "versionsCount": 0, - "deleteMarkersCount": 0, - "healDisks": 0 - } - } - } -} \ No newline at end of file diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index b59e1e96..0534bc11 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -28,6 +28,7 @@ use chrono::Utc; use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; use datafusion::arrow::json::writer::JsonArray; +use rustfs_ecstore::set_disk::MAX_PARTS_COUNT; use rustfs_s3select_api::object_store::bytes_stream; use rustfs_s3select_api::query::Context; use rustfs_s3select_api::query::Query; @@ -1489,18 +1490,114 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self, req))] async fn list_parts(&self, req: S3Request) -> S3Result> { let ListPartsInput { - bucket, key, upload_id, .. + bucket, + key, + upload_id, + part_number_marker, + max_parts, + .. } = req.input; + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + let part_number_marker = part_number_marker.map(|x| x as usize); + let max_parts = max_parts.map(|x| x as usize).unwrap_or(MAX_PARTS_COUNT); + + let res = store + .list_object_parts(&bucket, &key, &upload_id, part_number_marker, max_parts, &ObjectOptions::default()) + .await + .map_err(ApiError::from)?; + let output = ListPartsOutput { - bucket: Some(bucket), - key: Some(key), - upload_id: Some(upload_id), + bucket: Some(res.bucket), + key: Some(res.object), + upload_id: Some(res.upload_id), + parts: Some( + res.parts + .into_iter() + .map(|p| Part { + e_tag: p.etag, + last_modified: p.last_mod.map(Timestamp::from), + part_number: Some(p.part_num as i32), + size: Some(p.size as i64), + ..Default::default() + }) + .collect(), + ), ..Default::default() }; Ok(S3Response::new(output)) } + async fn list_multipart_uploads( + &self, + req: S3Request, + ) -> S3Result> { + let ListMultipartUploadsInput { + bucket, + prefix, + delimiter, + key_marker, + upload_id_marker, + max_uploads, + .. + } = req.input; + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + let prefix = prefix.unwrap_or_default(); + + let max_uploads = max_uploads.map(|x| x as usize).unwrap_or(MAX_PARTS_COUNT); + + if let Some(key_marker) = &key_marker { + if !key_marker.starts_with(prefix.as_str()) { + return Err(s3_error!(NotImplemented, "Invalid key marker")); + } + } + + let result = store + .list_multipart_uploads(&bucket, &prefix, delimiter, key_marker, upload_id_marker, max_uploads) + .await + .map_err(ApiError::from)?; + + let output = ListMultipartUploadsOutput { + bucket: Some(bucket), + prefix: Some(prefix), + delimiter: result.delimiter, + key_marker: result.key_marker, + upload_id_marker: result.upload_id_marker, + max_uploads: Some(result.max_uploads as i32), + is_truncated: Some(result.is_truncated), + uploads: Some( + result + .uploads + .into_iter() + .map(|u| MultipartUpload { + key: Some(u.object), + upload_id: Some(u.upload_id), + initiated: u.initiated.map(Timestamp::from), + + ..Default::default() + }) + .collect(), + ), + common_prefixes: Some( + result + .common_prefixes + .into_iter() + .map(|c| CommonPrefix { prefix: Some(c) }) + .collect(), + ), + ..Default::default() + }; + + Ok(S3Response::new(output)) + } + #[tracing::instrument(level = "debug", skip(self, req))] async fn complete_multipart_upload( &self,