feat: implement list_parts API for S3 multipart upload compatibility (#209)

* feat: add list_parts api
This commit is contained in:
weisd
2025-07-15 16:04:03 +08:00
committed by GitHub
parent 38cdc87e93
commit a9d77a618f
21 changed files with 810 additions and 369 deletions

View File

@@ -0,0 +1,40 @@
FROM alpine:3.18
ENV LANG C.UTF-8
# Install base dependencies
RUN apk add --no-cache \
wget \
git \
curl \
unzip \
gcc \
musl-dev \
pkgconfig \
openssl-dev \
dbus-dev \
wayland-dev \
webkit2gtk-4.1-dev \
build-base \
linux-headers
# install protoc
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v30.2/protoc-30.2-linux-x86_64.zip \
&& unzip protoc-30.2-linux-x86_64.zip -d protoc3 \
&& mv protoc3/bin/* /usr/local/bin/ && chmod +x /usr/local/bin/protoc \
&& mv protoc3/include/* /usr/local/include/ && rm -rf protoc-30.2-linux-x86_64.zip protoc3
# install flatc
RUN wget https://github.com/google/flatbuffers/releases/download/v24.3.25/Linux.flatc.binary.g++-13.zip \
&& unzip Linux.flatc.binary.g++-13.zip \
&& mv flatc /usr/local/bin/ && chmod +x /usr/local/bin/flatc && rm -rf Linux.flatc.binary.g++-13.zip
# install rust
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
# Set PATH for rust
ENV PATH="/root/.cargo/bin:${PATH}"
COPY .docker/cargo.config.toml /root/.cargo/config.toml
WORKDIR /root/rustfs

View File

@@ -18,7 +18,7 @@ use futures::future::join_all;
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{spawn, sync::broadcast::Receiver as B_Receiver};
use tracing::error;
use tracing::{error, warn};
pub type AgreedFn = Box<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
pub type PartialFn =
@@ -118,10 +118,14 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
if let Some(disk) = d.clone() {
disk
} else {
warn!("list_path_raw: fallback disk is none");
break;
}
}
None => break,
None => {
warn!("list_path_raw: fallback disk is none2");
break;
}
};
match disk
.as_ref()

View File

@@ -288,6 +288,12 @@ impl From<rmp_serde::encode::Error> for DiskError {
}
}
impl From<rmp_serde::decode::Error> for DiskError {
fn from(e: rmp_serde::decode::Error) -> Self {
DiskError::other(e)
}
}
impl From<rmp::encode::ValueWriteError> for DiskError {
fn from(e: rmp::encode::ValueWriteError) -> Self {
DiskError::other(e)

View File

@@ -57,8 +57,8 @@ use bytes::Bytes;
use path_absolutize::Absolutize;
use rustfs_common::defer;
use rustfs_filemeta::{
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, Opts, RawFileInfo, UpdateFn, get_file_info,
read_xl_meta_no_data,
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn,
get_file_info, read_xl_meta_no_data,
};
use rustfs_utils::HashAlgorithm;
use rustfs_utils::os::get_info;
@@ -1312,6 +1312,67 @@ impl DiskAPI for LocalDisk {
Ok(resp)
}
#[tracing::instrument(skip(self))]
async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result<Vec<ObjectPartInfo>> {
let volume_dir = self.get_bucket_path(bucket)?;
let mut ret = vec![ObjectPartInfo::default(); paths.len()];
for (i, path_str) in paths.iter().enumerate() {
let path = Path::new(path_str);
let file_name = path.file_name().and_then(|v| v.to_str()).unwrap_or_default();
let num = file_name
.strip_prefix("part.")
.and_then(|v| v.strip_suffix(".meta"))
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or_default();
if let Err(err) = access(
volume_dir
.clone()
.join(path.parent().unwrap_or(Path::new("")).join(format!("part.{num}"))),
)
.await
{
ret[i] = ObjectPartInfo {
number: num,
error: Some(err.to_string()),
..Default::default()
};
continue;
}
let data = match self
.read_all_data(bucket, volume_dir.clone(), volume_dir.clone().join(path))
.await
{
Ok(data) => data,
Err(err) => {
ret[i] = ObjectPartInfo {
number: num,
error: Some(err.to_string()),
..Default::default()
};
continue;
}
};
match ObjectPartInfo::unmarshal(&data) {
Ok(meta) => {
ret[i] = meta;
}
Err(err) => {
ret[i] = ObjectPartInfo {
number: num,
error: Some(err.to_string()),
..Default::default()
};
}
};
}
Ok(ret)
}
#[tracing::instrument(skip(self))]
async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
let volume_dir = self.get_bucket_path(volume)?;

View File

@@ -41,7 +41,7 @@ use endpoint::Endpoint;
use error::DiskError;
use error::{Error, Result};
use local::LocalDisk;
use rustfs_filemeta::{FileInfo, RawFileInfo};
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_madmin::info_commands::DiskMetrics;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, path::PathBuf, sync::Arc};
@@ -331,6 +331,14 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result<Vec<ObjectPartInfo>> {
match self {
Disk::Local(local_disk) => local_disk.read_parts(bucket, paths).await,
Disk::Remote(remote_disk) => remote_disk.read_parts(bucket, paths).await,
}
}
#[tracing::instrument(skip(self))]
async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Bytes) -> Result<()> {
match self {
@@ -513,7 +521,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
// CheckParts
async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp>;
// StatInfoFile
// ReadParts
async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result<Vec<ObjectPartInfo>>;
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
// CleanAbandonedData
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;

View File

@@ -22,8 +22,8 @@ use rustfs_protos::{
proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest,
ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
ReadAllRequest, ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest,
RenameFileRequest, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
},
};
@@ -44,7 +44,7 @@ use crate::{
heal_commands::{HealScanMode, HealingTracker},
},
};
use rustfs_filemeta::{FileInfo, RawFileInfo};
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_rio::{HttpReader, HttpWriter};
use tokio::{
@@ -790,6 +790,27 @@ impl DiskAPI for RemoteDisk {
Ok(check_parts_resp)
}
#[tracing::instrument(skip(self))]
async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result<Vec<ObjectPartInfo>> {
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadPartsRequest {
disk: self.endpoint.to_string(),
bucket: bucket.to_string(),
paths: paths.to_vec(),
});
let response = client.read_parts(request).await?.into_inner();
if !response.success {
return Err(response.error.unwrap_or_default().into());
}
let read_parts_resp = rmp_serde::from_slice::<Vec<ObjectPartInfo>>(&response.object_part_infos)?;
Ok(read_parts_resp)
}
#[tracing::instrument(skip(self))]
async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
info!("check_parts");

View File

@@ -404,7 +404,42 @@ impl Node for NodeService {
}))
}
}
async fn read_parts(&self, request: Request<ReadPartsRequest>) -> Result<Response<ReadPartsResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_parts(&request.bucket, &request.paths).await {
Ok(data) => {
let data = match rmp_serde::to_vec(&data) {
Ok(data) => data,
Err(err) => {
return Ok(tonic::Response::new(ReadPartsResponse {
success: false,
object_part_infos: Bytes::new(),
error: Some(DiskError::other(format!("encode data failed: {err}")).into()),
}));
}
};
Ok(tonic::Response::new(ReadPartsResponse {
success: true,
object_part_infos: Bytes::copy_from_slice(&data),
error: None,
}))
}
Err(err) => Ok(tonic::Response::new(ReadPartsResponse {
success: false,
object_part_infos: Bytes::new(),
error: Some(err.into()),
})),
}
} else {
Ok(tonic::Response::new(ReadPartsResponse {
success: false,
object_part_infos: Bytes::new(),
error: Some(DiskError::other("can not find disk".to_string()).into()),
}))
}
}
async fn check_parts(&self, request: Request<CheckPartsRequest>) -> Result<Response<CheckPartsResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {

View File

@@ -24,13 +24,13 @@ use crate::disk::{
};
use crate::erasure_coding;
use crate::erasure_coding::bitrot_verify;
use crate::error::ObjectApiError;
use crate::error::{Error, Result};
use crate::error::{ObjectApiError, is_err_object_not_found};
use crate::global::GLOBAL_MRFState;
use crate::global::{GLOBAL_LocalNodeName, GLOBAL_TierConfigMgr};
use crate::heal::data_usage_cache::DataUsageCache;
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::store_api::ObjectToDelete;
use crate::store_api::{ListPartsInfo, ObjectToDelete};
use crate::{
bucket::lifecycle::bucket_lifecycle_ops::{gen_transition_objname, get_transitioned_object_reader, put_restore_opts},
cache_value::metacache_set::{ListPathRawOptions, list_path_raw},
@@ -119,6 +119,7 @@ use tracing::{debug, info, warn};
use uuid::Uuid;
pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024;
pub const MAX_PARTS_COUNT: usize = 10000;
#[derive(Debug, Clone)]
pub struct SetDisks {
@@ -316,6 +317,9 @@ impl SetDisks {
.filter(|v| v.as_ref().is_some_and(|d| d.is_local()))
.collect()
}
fn default_read_quorum(&self) -> usize {
self.set_drive_count - self.default_parity_count
}
fn default_write_quorum(&self) -> usize {
let mut data_count = self.set_drive_count - self.default_parity_count;
if data_count == self.default_parity_count {
@@ -550,6 +554,183 @@ impl SetDisks {
}
}
async fn read_parts(
disks: &[Option<DiskStore>],
bucket: &str,
part_meta_paths: &[String],
part_numbers: &[usize],
read_quorum: usize,
) -> disk::error::Result<Vec<ObjectPartInfo>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
futures.push(async move {
if let Some(disk) = disk {
disk.read_parts(bucket, part_meta_paths).await
} else {
Err(DiskError::DiskNotFound)
}
});
}
let mut errs = Vec::with_capacity(disks.len());
let mut object_parts = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(res) => {
errs.push(None);
object_parts.push(res);
}
Err(e) => {
errs.push(Some(e));
object_parts.push(vec![]);
}
}
}
if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum) {
return Err(err);
}
let mut ret = vec![ObjectPartInfo::default(); part_meta_paths.len()];
for (part_idx, part_info) in part_meta_paths.iter().enumerate() {
let mut part_meta_quorum = HashMap::new();
let mut part_infos = Vec::new();
for (j, parts) in object_parts.iter().enumerate() {
if parts.len() != part_meta_paths.len() {
*part_meta_quorum.entry(part_info.clone()).or_insert(0) += 1;
continue;
}
if !parts[part_idx].etag.is_empty() {
*part_meta_quorum.entry(parts[part_idx].etag.clone()).or_insert(0) += 1;
part_infos.push(parts[part_idx].clone());
continue;
}
*part_meta_quorum.entry(part_info.clone()).or_insert(0) += 1;
}
let mut max_quorum = 0;
let mut max_etag = None;
let mut max_part_meta = None;
for (etag, quorum) in part_meta_quorum.iter() {
if quorum > &max_quorum {
max_quorum = *quorum;
max_etag = Some(etag);
max_part_meta = Some(etag);
}
}
let mut found = None;
for info in part_infos.iter() {
if let Some(etag) = max_etag
&& info.etag == *etag
{
found = Some(info.clone());
break;
}
if let Some(part_meta) = max_part_meta
&& info.etag.is_empty()
&& part_meta.ends_with(format!("part.{0}.meta", info.number).as_str())
{
found = Some(info.clone());
break;
}
}
if let (Some(found), Some(max_etag)) = (found, max_etag)
&& !found.etag.is_empty()
&& part_meta_quorum.get(max_etag).unwrap_or(&0) >= &read_quorum
{
ret[part_idx] = found;
} else {
ret[part_idx] = ObjectPartInfo {
number: part_numbers[part_idx],
error: Some(format!("part.{} not found", part_numbers[part_idx])),
..Default::default()
};
}
}
Ok(ret)
}
async fn list_parts(disks: &[Option<DiskStore>], part_path: &str, read_quorum: usize) -> disk::error::Result<Vec<usize>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
futures.push(async move {
if let Some(disk) = disk {
disk.list_dir(RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_MULTIPART_BUCKET, part_path, -1)
.await
} else {
Err(DiskError::DiskNotFound)
}
});
}
let mut errs = Vec::with_capacity(disks.len());
let mut object_parts = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(res) => {
errs.push(None);
object_parts.push(res);
}
Err(e) => {
errs.push(Some(e));
object_parts.push(vec![]);
}
}
}
if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum) {
return Err(err);
}
let mut part_quorum_map: HashMap<usize, usize> = HashMap::new();
for drive_parts in object_parts {
let mut parts_with_meta_count: HashMap<usize, usize> = HashMap::new();
// part files can be either part.N or part.N.meta
for part_path in drive_parts {
if let Some(num_str) = part_path.strip_prefix("part.") {
if let Some(meta_idx) = num_str.find(".meta") {
if let Ok(part_num) = num_str[..meta_idx].parse::<usize>() {
*parts_with_meta_count.entry(part_num).or_insert(0) += 1;
}
} else if let Ok(part_num) = num_str.parse::<usize>() {
*parts_with_meta_count.entry(part_num).or_insert(0) += 1;
}
}
}
// Include only part.N.meta files with corresponding part.N
for (&part_num, &cnt) in &parts_with_meta_count {
if cnt >= 2 {
*part_quorum_map.entry(part_num).or_insert(0) += 1;
}
}
}
let mut part_numbers = Vec::with_capacity(part_quorum_map.len());
for (part_num, count) in part_quorum_map {
if count >= read_quorum {
part_numbers.push(part_num);
}
}
part_numbers.sort();
Ok(part_numbers)
}
#[tracing::instrument(skip(disks, meta))]
async fn rename_part(
disks: &[Option<DiskStore>],
@@ -4884,7 +5065,7 @@ impl StorageAPI for SetDisks {
) -> Result<PartInfo> {
let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id);
let (mut fi, _) = self.check_upload_id_exists(bucket, object, upload_id, true).await?;
let (fi, _) = self.check_upload_id_exists(bucket, object, upload_id, true).await?;
let write_quorum = fi.write_quorum(self.default_write_quorum());
@@ -5037,9 +5218,9 @@ impl StorageAPI for SetDisks {
// debug!("put_object_part part_info {:?}", part_info);
fi.parts = vec![part_info];
// fi.parts = vec![part_info.clone()];
let fi_buff = fi.marshal_msg()?;
let part_info_buff = part_info.marshal_msg()?;
drop(writers); // drop writers to close all files
@@ -5050,7 +5231,7 @@ impl StorageAPI for SetDisks {
&tmp_part_path,
RUSTFS_META_MULTIPART_BUCKET,
&part_path,
fi_buff.into(),
part_info_buff.into(),
write_quorum,
)
.await?;
@@ -5068,6 +5249,123 @@ impl StorageAPI for SetDisks {
Ok(ret)
}
#[tracing::instrument(skip(self))]
async fn list_object_parts(
&self,
bucket: &str,
object: &str,
upload_id: &str,
part_number_marker: Option<usize>,
mut max_parts: usize,
opts: &ObjectOptions,
) -> Result<ListPartsInfo> {
let (fi, _) = self.check_upload_id_exists(bucket, object, upload_id, false).await?;
let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id);
if max_parts > MAX_PARTS_COUNT {
max_parts = MAX_PARTS_COUNT;
}
let part_number_marker = part_number_marker.unwrap_or_default();
let mut ret = ListPartsInfo {
bucket: bucket.to_owned(),
object: object.to_owned(),
upload_id: upload_id.to_owned(),
max_parts,
part_number_marker,
user_defined: fi.metadata.clone(),
..Default::default()
};
if max_parts == 0 {
return Ok(ret);
}
let online_disks = self.get_disks_internal().await;
let read_quorum = fi.read_quorum(self.default_read_quorum());
let part_path = format!(
"{}{}",
path_join_buf(&[
&upload_id_path,
fi.data_dir.map(|v| v.to_string()).unwrap_or_default().as_str(),
]),
SLASH_SEPARATOR
);
let mut part_numbers = match Self::list_parts(&online_disks, &part_path, read_quorum).await {
Ok(parts) => parts,
Err(err) => {
if err == DiskError::FileNotFound {
return Ok(ret);
}
return Err(to_object_err(err.into(), vec![bucket, object]));
}
};
if part_numbers.is_empty() {
return Ok(ret);
}
let start_op = part_numbers.iter().find(|&&v| v != 0 && v == part_number_marker);
if part_number_marker > 0 && start_op.is_none() {
return Ok(ret);
}
if let Some(start) = start_op {
if start + 1 > part_numbers.len() {
return Ok(ret);
}
part_numbers = part_numbers[start + 1..].to_vec();
}
let mut parts = Vec::with_capacity(part_numbers.len());
let part_meta_paths = part_numbers
.iter()
.map(|v| format!("{part_path}part.{v}.meta"))
.collect::<Vec<String>>();
let object_parts =
Self::read_parts(&online_disks, RUSTFS_META_MULTIPART_BUCKET, &part_meta_paths, &part_numbers, read_quorum)
.await
.map_err(|e| to_object_err(e.into(), vec![bucket, object, upload_id]))?;
let mut count = max_parts;
for (i, part) in object_parts.iter().enumerate() {
if let Some(err) = &part.error {
warn!("list_object_parts part error: {:?}", &err);
}
parts.push(PartInfo {
etag: Some(part.etag.clone()),
part_num: part.number,
last_mod: part.mod_time,
size: part.size,
actual_size: part.actual_size,
});
count -= 1;
if count == 0 {
break;
}
}
ret.parts = parts;
if object_parts.len() > ret.parts.len() {
ret.is_truncated = true;
ret.next_part_number_marker = ret.parts.last().map(|v| v.part_num).unwrap_or_default();
}
Ok(ret)
}
#[tracing::instrument(skip(self))]
async fn list_multipart_uploads(
&self,
@@ -5143,8 +5441,8 @@ impl StorageAPI for SetDisks {
let splits: Vec<&str> = upload_id.split("x").collect();
if splits.len() == 2 {
if let Ok(unix) = splits[1].parse::<i64>() {
OffsetDateTime::from_unix_timestamp(unix)?
if let Ok(unix) = splits[1].parse::<i128>() {
OffsetDateTime::from_unix_timestamp_nanos(unix)?
} else {
now
}
@@ -5363,49 +5661,31 @@ impl StorageAPI for SetDisks {
let part_path = format!("{}/{}/", upload_id_path, fi.data_dir.unwrap_or(Uuid::nil()));
let files: Vec<String> = uploaded_parts.iter().map(|v| format!("part.{}.meta", v.part_num)).collect();
let part_meta_paths = uploaded_parts
.iter()
.map(|v| format!("{part_path}part.{0}.meta", v.part_num))
.collect::<Vec<String>>();
// readMultipleFiles
let part_numbers = uploaded_parts.iter().map(|v| v.part_num).collect::<Vec<usize>>();
let req = ReadMultipleReq {
bucket: RUSTFS_META_MULTIPART_BUCKET.to_string(),
prefix: part_path,
files,
max_size: 1 << 20,
metadata_only: true,
abort404: true,
max_results: 0,
};
let object_parts =
Self::read_parts(&disks, RUSTFS_META_MULTIPART_BUCKET, &part_meta_paths, &part_numbers, write_quorum).await?;
let part_files_resp = Self::read_multiple_files(&disks, req, write_quorum).await;
if part_files_resp.len() != uploaded_parts.len() {
if object_parts.len() != uploaded_parts.len() {
return Err(Error::other("part result number err"));
}
for (i, res) in part_files_resp.iter().enumerate() {
let part_id = uploaded_parts[i].part_num;
if !res.error.is_empty() || !res.exists {
error!("complete_multipart_upload part_id err {:?}, exists={}", res, res.exists);
return Err(Error::InvalidPart(part_id, bucket.to_owned(), object.to_owned()));
for (i, part) in object_parts.iter().enumerate() {
if let Some(err) = &part.error {
error!("complete_multipart_upload part error: {:?}", &err);
}
let part_fi = FileInfo::unmarshal(&res.data).map_err(|e| {
if uploaded_parts[i].part_num != part.number {
error!(
"complete_multipart_upload FileInfo::unmarshal err {:?}, part_id={}, bucket={}, object={}",
e, part_id, bucket, object
"complete_multipart_upload part_id err part_id != part_num {} != {}",
uploaded_parts[i].part_num, part.number
);
Error::InvalidPart(part_id, bucket.to_owned(), object.to_owned())
})?;
let part = &part_fi.parts[0];
let part_num = part.number;
// debug!("complete part {} file info {:?}", part_num, &part_fi);
// debug!("complete part {} object info {:?}", part_num, &part);
if part_id != part_num {
error!("complete_multipart_upload part_id err part_id != part_num {} != {}", part_id, part_num);
return Err(Error::InvalidPart(part_id, bucket.to_owned(), object.to_owned()));
return Err(Error::InvalidPart(uploaded_parts[i].part_num, bucket.to_owned(), object.to_owned()));
}
fi.add_object_part(

View File

@@ -17,6 +17,7 @@ use std::{collections::HashMap, sync::Arc};
use crate::disk::error_reduce::count_errs;
use crate::error::{Error, Result};
use crate::store_api::ListPartsInfo;
use crate::{
disk::{
DiskAPI, DiskInfo, DiskOption, DiskStore,
@@ -619,6 +620,20 @@ impl StorageAPI for Sets {
Ok((del_objects, del_errs))
}
async fn list_object_parts(
&self,
bucket: &str,
object: &str,
upload_id: &str,
part_number_marker: Option<usize>,
max_parts: usize,
opts: &ObjectOptions,
) -> Result<ListPartsInfo> {
self.get_disks_by_key(object)
.list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts)
.await
}
#[tracing::instrument(skip(self))]
async fn list_multipart_uploads(
&self,

View File

@@ -38,7 +38,7 @@ use crate::new_object_layer_fn;
use crate::notification_sys::get_global_notification_sys;
use crate::pools::PoolMeta;
use crate::rebalance::RebalanceMeta;
use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO};
use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, ListPartsInfo, MultipartInfo, ObjectIO};
use crate::store_init::{check_disk_fatal_errs, ec_drives_no_config};
use crate::{
bucket::{lifecycle::bucket_lifecycle_ops::TransitionState, metadata::BucketMetadata},
@@ -1810,6 +1810,47 @@ impl StorageAPI for ECStore {
Ok((del_objects, del_errs))
}
#[tracing::instrument(skip(self))]
async fn list_object_parts(
&self,
bucket: &str,
object: &str,
upload_id: &str,
part_number_marker: Option<usize>,
max_parts: usize,
opts: &ObjectOptions,
) -> Result<ListPartsInfo> {
check_list_parts_args(bucket, object, upload_id)?;
// TODO: nslock
if self.single_pool() {
return self.pools[0]
.list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts)
.await;
}
for pool in self.pools.iter() {
if self.is_suspended(pool.pool_idx).await {
continue;
}
match pool
.list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts)
.await
{
Ok(res) => return Ok(res),
Err(err) => {
if is_err_invalid_upload_id(&err) {
continue;
}
return Err(err);
}
};
}
Err(StorageError::InvalidUploadID(bucket.to_owned(), object.to_owned(), upload_id.to_owned()))
}
#[tracing::instrument(skip(self))]
async fn list_multipart_uploads(
&self,

View File

@@ -548,6 +548,7 @@ impl ObjectInfo {
mod_time: part.mod_time,
checksums: part.checksums.clone(),
number: part.number,
error: part.error.clone(),
})
.collect();
@@ -844,6 +845,48 @@ pub struct ListMultipartsInfo {
// encoding_type: String, // Not supported yet.
}
/// ListPartsInfo - represents list of all parts.
#[derive(Debug, Clone, Default)]
pub struct ListPartsInfo {
/// Name of the bucket.
pub bucket: String,
/// Name of the object.
pub object: String,
/// Upload ID identifying the multipart upload whose parts are being listed.
pub upload_id: String,
/// The class of storage used to store the object.
pub storage_class: String,
/// Part number after which listing begins.
pub part_number_marker: usize,
/// When a list is truncated, this element specifies the last part in the list,
/// as well as the value to use for the part-number-marker request parameter
/// in a subsequent request.
pub next_part_number_marker: usize,
/// Maximum number of parts that were allowed in the response.
pub max_parts: usize,
/// Indicates whether the returned list of parts is truncated.
pub is_truncated: bool,
/// List of all parts.
pub parts: Vec<PartInfo>,
/// Any metadata set during InitMultipartUpload, including encryption headers.
pub user_defined: HashMap<String, String>,
/// ChecksumAlgorithm if set
pub checksum_algorithm: String,
/// ChecksumType if set
pub checksum_type: String,
}
#[derive(Debug, Default, Clone)]
pub struct ObjectToDelete {
pub object_name: String,
@@ -923,10 +966,7 @@ pub trait StorageAPI: ObjectIO {
) -> Result<ListObjectVersionsInfo>;
// Walk TODO:
// GetObjectNInfo ObjectIO
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
// PutObject ObjectIO
// CopyObject
async fn copy_object(
&self,
src_bucket: &str,
@@ -949,7 +989,6 @@ pub trait StorageAPI: ObjectIO {
// TransitionObject TODO:
// RestoreTransitionedObject TODO:
// ListMultipartUploads
async fn list_multipart_uploads(
&self,
bucket: &str,
@@ -960,7 +999,6 @@ pub trait StorageAPI: ObjectIO {
max_uploads: usize,
) -> Result<ListMultipartsInfo>;
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult>;
// CopyObjectPart
async fn copy_object_part(
&self,
src_bucket: &str,
@@ -984,7 +1022,6 @@ pub trait StorageAPI: ObjectIO {
data: &mut PutObjReader,
opts: &ObjectOptions,
) -> Result<PartInfo>;
// GetMultipartInfo
async fn get_multipart_info(
&self,
bucket: &str,
@@ -992,7 +1029,15 @@ pub trait StorageAPI: ObjectIO {
upload_id: &str,
opts: &ObjectOptions,
) -> Result<MultipartInfo>;
// ListObjectParts
async fn list_object_parts(
&self,
bucket: &str,
object: &str,
upload_id: &str,
part_number_marker: Option<usize>,
max_parts: usize,
opts: &ObjectOptions,
) -> Result<ListPartsInfo>;
async fn abort_multipart_upload(&self, bucket: &str, object: &str, upload_id: &str, opts: &ObjectOptions) -> Result<()>;
async fn complete_multipart_upload(
self: Arc<Self>,
@@ -1002,13 +1047,10 @@ pub trait StorageAPI: ObjectIO {
uploaded_parts: Vec<CompletePart>,
opts: &ObjectOptions,
) -> Result<ObjectInfo>;
// GetDisks
async fn get_disks(&self, pool_idx: usize, set_idx: usize) -> Result<Vec<Option<DiskStore>>>;
// SetDriveCounts
fn set_drive_counts(&self) -> Vec<usize>;
// Health TODO:
// PutObjectMetadata
async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
// DecomTieredObject
async fn get_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<String>;

View File

@@ -46,6 +46,20 @@ pub struct ObjectPartInfo {
pub index: Option<Bytes>,
// Checksums holds checksums of the part
pub checksums: Option<HashMap<String, String>>,
pub error: Option<String>,
}
impl ObjectPartInfo {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut Serializer::new(&mut buf))?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: ObjectPartInfo = rmp_serde::from_slice(buf)?;
Ok(t)
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)]
@@ -287,6 +301,7 @@ impl FileInfo {
actual_size,
index,
checksums: None,
error: None,
};
for p in self.parts.iter_mut() {

View File

@@ -1,15 +1 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod models;

View File

@@ -1,17 +1,3 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// automatically generated by the FlatBuffers compiler, do not modify
// @generated

View File

@@ -1,17 +1,3 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(clippy::all)]
pub mod proto_gen;

View File

@@ -1,15 +1 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod node_service;

View File

@@ -1,17 +1,3 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is @generated by prost-build.
/// --------------------------------------------------------------------
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -184,6 +170,24 @@ pub struct VerifyFileResponse {
pub error: ::core::option::Option<Error>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadPartsRequest {
#[prost(string, tag = "1")]
pub disk: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub bucket: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "3")]
pub paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadPartsResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(bytes = "bytes", tag = "2")]
pub object_part_infos: ::prost::bytes::Bytes,
#[prost(message, optional, tag = "3")]
pub error: ::core::option::Option<Error>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CheckPartsRequest {
/// indicate which one in the disks
#[prost(string, tag = "1")]
@@ -1295,6 +1299,21 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "VerifyFile"));
self.inner.unary(req, path, codec).await
}
pub async fn read_parts(
&mut self,
request: impl tonic::IntoRequest<super::ReadPartsRequest>,
) -> std::result::Result<tonic::Response<super::ReadPartsResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadParts");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "ReadParts"));
self.inner.unary(req, path, codec).await
}
pub async fn check_parts(
&mut self,
request: impl tonic::IntoRequest<super::CheckPartsRequest>,
@@ -2338,6 +2357,10 @@ pub mod node_service_server {
&self,
request: tonic::Request<super::VerifyFileRequest>,
) -> std::result::Result<tonic::Response<super::VerifyFileResponse>, tonic::Status>;
async fn read_parts(
&self,
request: tonic::Request<super::ReadPartsRequest>,
) -> std::result::Result<tonic::Response<super::ReadPartsResponse>, tonic::Status>;
async fn check_parts(
&self,
request: tonic::Request<super::CheckPartsRequest>,
@@ -2972,6 +2995,34 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/ReadParts" => {
#[allow(non_camel_case_types)]
struct ReadPartsSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::ReadPartsRequest> for ReadPartsSvc<T> {
type Response = super::ReadPartsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::ReadPartsRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::read_parts(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ReadPartsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/CheckParts" => {
#[allow(non_camel_case_types)]
struct CheckPartsSvc<T: NodeService>(pub Arc<T>);

View File

@@ -45,7 +45,7 @@ fn main() -> Result<(), AnyError> {
}
// path of proto file
let project_root_dir = env::current_dir()?.join("");
let project_root_dir = env::current_dir()?.join("crates/protos/src");
let proto_dir = project_root_dir.clone();
let proto_files = &["node.proto"];
let proto_out_dir = project_root_dir.join("generated").join("proto_gen");
@@ -268,7 +268,7 @@ fn protobuf_compiler_version() -> Result<Version, String> {
}
fn fmt() {
let output = Command::new("cargo").arg("fmt").arg("-p").arg("protos").status();
let output = Command::new("cargo").arg("fmt").arg("-p").arg("rustfs-protos").status();
match output {
Ok(status) => {

View File

@@ -130,6 +130,18 @@ message VerifyFileResponse {
optional Error error = 3;
}
message ReadPartsRequest {
string disk = 1;
string bucket = 2;
repeated string paths = 3;
}
message ReadPartsResponse {
bool success = 1;
bytes object_part_infos = 2;
optional Error error = 3;
}
message CheckPartsRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
@@ -768,6 +780,7 @@ service NodeService {
rpc WriteAll(WriteAllRequest) returns (WriteAllResponse) {};
rpc Delete(DeleteRequest) returns (DeleteResponse) {};
rpc VerifyFile(VerifyFileRequest) returns (VerifyFileResponse) {};
rpc ReadParts(ReadPartsRequest) returns (ReadPartsResponse) {};
rpc CheckParts(CheckPartsRequest) returns (CheckPartsResponse) {};
rpc RenamePart(RenamePartRequest) returns (RenamePartResponse) {};
rpc RenameFile(RenameFileRequest) returns (RenameFileResponse) {};

View File

@@ -1,232 +0,0 @@
{
"mode": "online",
"domain": null,
"region": null,
"sqsARN": null,
"deploymentID": null,
"buckets": {
"count": 0,
"error": null
},
"objects": {
"count": 0,
"error": null
},
"versions": {
"count": 0,
"error": null
},
"deletemarkers": {
"count": 0,
"error": null
},
"usage": {
"size": 0,
"error": null
},
"services": {
"kms": null,
"kmsStatus": null,
"ldap": null,
"logger": null,
"audit": null,
"notifications": null
},
"backend": {
"backendType": "Erasure",
"onlineDisks": 5,
"offlineDisks": 0,
"standardSCParity": 2,
"rrSCParity": 1,
"totalSets": [
1
],
"totalDrivesPerSet": [
5
]
},
"servers": [
{
"state": "online",
"endpoint": "127.0.0.1:9000",
"scheme": "",
"uptime": 1736146443,
"version": "",
"commitID": "",
"network": {
"127.0.0.1:9000": "online"
},
"drives": [
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0",
"healing": false,
"scanning": true,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 0
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1",
"healing": false,
"scanning": true,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 1
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2",
"healing": false,
"scanning": false,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 2
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3",
"healing": false,
"scanning": false,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 3
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4",
"healing": false,
"scanning": false,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 4
}
],
"poolNumber": 1,
"poolNumbers": [
1
],
"mem_stats": {
"alloc": 0,
"total_alloc": 0,
"mallocs": 0,
"frees": 0,
"heap_alloc": 0
},
"max_procs": 0,
"num_cpu": 0,
"runtime_version": "",
"rustfs_env_vars": {}
}
],
"pools": {
"0": {
"0": {
"id": 0,
"rawUsage": 1418554060800,
"rawCapacity": 2471923978240,
"usage": 0,
"objectsCount": 0,
"versionsCount": 0,
"deleteMarkersCount": 0,
"healDisks": 0
}
}
}
}

View File

@@ -28,6 +28,7 @@ use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
@@ -1489,18 +1490,114 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
let ListPartsInput {
bucket, key, upload_id, ..
bucket,
key,
upload_id,
part_number_marker,
max_parts,
..
} = req.input;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let part_number_marker = part_number_marker.map(|x| x as usize);
let max_parts = max_parts.map(|x| x as usize).unwrap_or(MAX_PARTS_COUNT);
let res = store
.list_object_parts(&bucket, &key, &upload_id, part_number_marker, max_parts, &ObjectOptions::default())
.await
.map_err(ApiError::from)?;
let output = ListPartsOutput {
bucket: Some(bucket),
key: Some(key),
upload_id: Some(upload_id),
bucket: Some(res.bucket),
key: Some(res.object),
upload_id: Some(res.upload_id),
parts: Some(
res.parts
.into_iter()
.map(|p| Part {
e_tag: p.etag,
last_modified: p.last_mod.map(Timestamp::from),
part_number: Some(p.part_num as i32),
size: Some(p.size as i64),
..Default::default()
})
.collect(),
),
..Default::default()
};
Ok(S3Response::new(output))
}
async fn list_multipart_uploads(
&self,
req: S3Request<ListMultipartUploadsInput>,
) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
let ListMultipartUploadsInput {
bucket,
prefix,
delimiter,
key_marker,
upload_id_marker,
max_uploads,
..
} = req.input;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let prefix = prefix.unwrap_or_default();
let max_uploads = max_uploads.map(|x| x as usize).unwrap_or(MAX_PARTS_COUNT);
if let Some(key_marker) = &key_marker {
if !key_marker.starts_with(prefix.as_str()) {
return Err(s3_error!(NotImplemented, "Invalid key marker"));
}
}
let result = store
.list_multipart_uploads(&bucket, &prefix, delimiter, key_marker, upload_id_marker, max_uploads)
.await
.map_err(ApiError::from)?;
let output = ListMultipartUploadsOutput {
bucket: Some(bucket),
prefix: Some(prefix),
delimiter: result.delimiter,
key_marker: result.key_marker,
upload_id_marker: result.upload_id_marker,
max_uploads: Some(result.max_uploads as i32),
is_truncated: Some(result.is_truncated),
uploads: Some(
result
.uploads
.into_iter()
.map(|u| MultipartUpload {
key: Some(u.object),
upload_id: Some(u.upload_id),
initiated: u.initiated.map(Timestamp::from),
..Default::default()
})
.collect(),
),
common_prefixes: Some(
result
.common_prefixes
.into_iter()
.map(|c| CommonPrefix { prefix: Some(c) })
.collect(),
),
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument(level = "debug", skip(self, req))]
async fn complete_multipart_upload(
&self,