Merge branch 'upload'

This commit is contained in:
weisd
2024-09-10 15:29:55 +08:00
12 changed files with 647 additions and 118 deletions

1
Cargo.lock generated
View File

@@ -1168,6 +1168,7 @@ dependencies = [
"tracing-error",
"tracing-subscriber",
"transform-stream",
"uuid",
]
[[package]]

View File

@@ -4,7 +4,7 @@ members = ["rustfs", "ecstore"]
[workspace.package]
edition = "2021"
license = "MIT OR Apache-2.0"
license = "Apache-2.0"
repository = "https://github.com/rustfs/rustfs"
rust-version = "1.75"

View File

@@ -29,7 +29,7 @@
- [x] 提交完成 CompleteMultipartUpload
- [x] 取消上传 AbortMultipartUpload
- [x] 下载 GetObject
- [ ] 删除 DeleteObjects
- [x] 删除 DeleteObjects
- [ ] 版本控制
- [ ] 对象锁
- [ ] 复制 CopyObject

View File

@@ -1,7 +1,7 @@
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
DeleteOptions, DiskAPI, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions,
RenameDataResp, VolumeInfo, WalkDirOptions,
DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp,
ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions,
};
use crate::disk::STORAGE_FORMAT_FILE;
use crate::{
@@ -138,8 +138,34 @@ impl LocalDisk {
Ok(())
}
pub async fn move_to_trash(&self, delete_path: &PathBuf, _recursive: bool, _immediate_purge: bool) -> Result<()> {
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
// TODO: 清空回收站
if let Err(err) = fs::rename(&delete_path, &trash_path).await {
match err.kind() {
ErrorKind::NotFound => (),
_ => {
warn!("delete_file rename {:?} err {:?}", &delete_path, &err);
return Err(Error::from(err));
}
}
}
// FIXME: 先清空回收站吧,有时间再添加判断逻辑
let _ = fs::remove_dir_all(&trash_path).await;
// TODO: immediate
Ok(())
}
// #[tracing::instrument(skip(self))]
pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf, recursive: bool, _immediate: bool) -> Result<()> {
pub async fn delete_file(
&self,
base_path: &PathBuf,
delete_path: &PathBuf,
recursive: bool,
immediate_purge: bool,
) -> Result<()> {
debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path);
if is_root_path(base_path) || is_root_path(delete_path) {
@@ -153,29 +179,7 @@ impl LocalDisk {
}
if recursive {
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
if let Some(dir_path) = trash_path.parent() {
fs::create_dir_all(dir_path).await?;
}
debug!("delete_file ranme to trash {:?} to {:?}", &delete_path, &trash_path);
// TODO: 清空回收站
if let Err(err) = fs::rename(&delete_path, &trash_path).await {
match err.kind() {
ErrorKind::NotFound => (),
_ => {
warn!("delete_file rename {:?} err {:?}", &delete_path, &err);
return Err(Error::from(err));
}
}
}
// FIXME: 先清空回收站吧,有时间再添加判断逻辑
let _ = fs::remove_dir_all(&trash_path).await;
// TODO: immediate
self.move_to_trash(delete_path, recursive, immediate_purge).await?;
} else {
if delete_path.is_dir() {
if let Err(err) = fs::remove_dir(&delete_path).await {
@@ -253,6 +257,52 @@ impl LocalDisk {
Ok((data, modtime))
}
async fn delete_versions_internal(&self, volume: &str, path: &str, fis: &Vec<FileInfo>) -> Result<()> {
let volume_dir = self.get_bucket_path(volume)?;
let xlpath = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
let (data, _) = match self.read_all_data(volume, volume_dir.as_path(), &xlpath).await {
Ok(res) => res,
Err(_err) => {
// TODO: check if not found return err
(Vec::new(), OffsetDateTime::UNIX_EPOCH)
}
};
if data.is_empty() {
return Err(Error::new(DiskError::FileNotFound));
}
let mut fm = FileMeta::default();
fm.unmarshal_msg(&data)?;
for fi in fis {
let data_dir = fm.delete_version(fi)?;
if data_dir.is_some() {
let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap().to_string()).as_str())?;
self.move_to_trash(&dir_path, true, false).await?;
}
}
// 没有版本了删除xl.meta
if fm.versions.is_empty() {
self.delete_file(&volume_dir, &xlpath, true, false).await?;
return Ok(());
}
// 更新xl.meta
let buf = fm.marshal_msg()?;
self.write_all(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str(), buf)
.await?;
Ok(())
}
}
fn is_root_path(path: impl AsRef<Path>) -> bool {
@@ -600,7 +650,7 @@ impl DiskAPI for LocalDisk {
let (src_data_path, dst_data_path) = {
let mut data_dir = String::new();
if !fi.is_remote() {
data_dir = utils::path::retain_slash(fi.data_dir.to_string().as_str());
data_dir = utils::path::retain_slash(fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str());
}
if !data_dir.is_empty() {
@@ -639,10 +689,9 @@ impl DiskAPI for LocalDisk {
let old_data_dir = meta
.find_version(fi.version_id)
.map(|(_, version)| {
version.get_data_dir().filter(|data_dir| {
warn!("get data dir {}", &data_dir);
meta.shard_data_dir_count(&fi.version_id, data_dir) == 0
})
version
.get_data_dir()
.filter(|data_dir| meta.shard_data_dir_count(&fi.version_id, &Some(data_dir.clone())) == 0)
})
.unwrap_or_default();
@@ -815,6 +864,27 @@ impl DiskAPI for LocalDisk {
Ok(RawFileInfo { buf })
}
async fn delete_versions(
&self,
volume: &str,
versions: Vec<FileInfoVersions>,
_opts: DeleteOptions,
) -> Result<Vec<Option<Error>>> {
let mut errs = Vec::with_capacity(versions.len());
for _ in 0..versions.len() {
errs.push(None);
}
for (i, ver) in versions.iter().enumerate() {
if let Err(e) = self.delete_versions_internal(volume, ver.name.as_str(), &ver.versions).await {
errs[i] = Some(e);
} else {
errs[i] = None;
}
}
Ok(errs)
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
let mut results = Vec::new();
let mut found = 0;

View File

@@ -13,7 +13,7 @@ const STORAGE_FORMAT_FILE: &str = "xl.meta";
use crate::{
erasure::ReadAt,
error::Result,
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
};
@@ -79,9 +79,31 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
opts: &ReadOptions,
) -> Result<FileInfo>;
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo>;
async fn delete_versions(
&self,
volume: &str,
versions: Vec<FileInfoVersions>,
opts: DeleteOptions,
) -> Result<Vec<Option<Error>>>;
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
#[derive(Debug, Default, Clone)]
pub struct FileInfoVersions {
// Name of the volume.
pub volume: String,
// Name of the file.
pub name: String,
// Represents the latest mod time of the
// latest version.
pub latest_mod_time: Option<OffsetDateTime>,
pub versions: Vec<FileInfo>,
pub free_versions: Vec<FileInfo>,
}
#[derive(Debug, Default, Clone)]
pub struct WalkDirOptions {
// Bucket to scanner

View File

@@ -1,3 +1,6 @@
use std::collections::HashMap;
use futures::future::join_all;
use http::HeaderMap;
use uuid::Uuid;
@@ -7,11 +10,11 @@ use crate::{
DiskStore,
},
endpoints::PoolEndpoints,
error::Result,
error::{Error, Result},
set_disk::SetDisks,
store_api::{
BucketInfo, BucketOptions, CompletePart, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info, MakeBucketOptions,
MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo, PutObjReader, StorageAPI,
BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info,
MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
},
utils::hash,
};
@@ -110,6 +113,22 @@ impl Sets {
// ) -> Vec<Option<Error>> {
// unimplemented!()
// }
async fn delete_prefix(&self, bucket: &str, object: &str) -> Result<()> {
let mut futures = Vec::new();
let opt = ObjectOptions {
delete_prefix: true,
..Default::default()
};
for set in self.disk_set.iter() {
futures.push(set.delete_object(bucket, object, opt.clone()));
}
let _results = join_all(futures).await;
Ok(())
}
}
// #[derive(Debug)]
@@ -122,6 +141,12 @@ impl Sets {
// pub default_parity_count: usize,
// }
struct DelObj {
// set_idx: usize,
orig_idx: usize,
obj: ObjectToDelete,
}
#[async_trait::async_trait]
impl StorageAPI for Sets {
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
@@ -134,7 +159,77 @@ impl StorageAPI for Sets {
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
unimplemented!()
}
async fn delete_objects(
&self,
bucket: &str,
objects: Vec<ObjectToDelete>,
opts: ObjectOptions,
) -> Result<(Vec<DeletedObject>, Vec<Option<Error>>)> {
// 默认返回值
let mut del_objects = vec![DeletedObject::default(); objects.len()];
let mut del_errs = Vec::with_capacity(objects.len());
for _ in 0..objects.len() {
del_errs.push(None)
}
let mut set_obj_map = HashMap::new();
// hash key
let mut i = 0;
for obj in objects.iter() {
let idx = self.get_hashed_set_index(obj.object_name.as_str());
if !set_obj_map.contains_key(&idx) {
set_obj_map.insert(
idx,
vec![DelObj {
// set_idx: idx,
orig_idx: i,
obj: obj.clone(),
}],
);
} else {
if let Some(val) = set_obj_map.get_mut(&idx) {
val.push(DelObj {
// set_idx: idx,
orig_idx: i,
obj: obj.clone(),
});
}
}
i += 1;
}
// TODO: 并发
for (k, v) in set_obj_map {
let disks = self.get_disks(k);
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?;
let mut i = 0;
for err in errs {
let obj = v.get(i).unwrap();
del_errs[obj.orig_idx] = err;
del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone();
i += 1;
}
}
Ok((del_objects, del_errs))
}
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
if opts.delete_prefix {
self.delete_prefix(bucket, object).await?;
return Ok(ObjectInfo::default());
}
self.get_disks_by_key(object).delete_object(bucket, object, opts).await
}
async fn list_objects_v2(
&self,
_bucket: &str,

View File

@@ -7,8 +7,9 @@ use crate::{
peer::{PeerS3Client, S3PeerSys},
sets::Sets,
store_api::{
BucketInfo, BucketOptions, CompletePart, GetObjectReader, HTTPRangeSpec, ListObjectsInfo, ListObjectsV2Info,
MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo, PutObjReader, StorageAPI,
BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsInfo,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
PutObjReader, StorageAPI,
},
store_init, utils,
};
@@ -16,6 +17,7 @@ use futures::future::join_all;
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
use std::collections::{HashMap, HashSet};
use time::OffsetDateTime;
use tracing::{debug, warn};
use uuid::Uuid;
@@ -227,6 +229,77 @@ impl ECStore {
Ok(())
}
async fn delete_prefix(&self, _bucket: &str, _object: &str) -> Result<()> {
unimplemented!()
}
async fn get_pool_info_existing_with_opts(
&self,
bucket: &str,
object: &str,
opts: &ObjectOptions,
) -> Result<(PoolObjInfo, Vec<Error>)> {
let mut futures = Vec::new();
for pool in self.pools.iter() {
futures.push(pool.get_object_info(bucket, object, opts));
}
let results = join_all(futures).await;
let mut ress = Vec::new();
let mut i = 0;
// join_all结果跟输入顺序一致
for res in results {
let index = i;
match res {
Ok(r) => {
ress.push(PoolObjInfo {
index,
object_info: r,
err: None,
});
}
Err(e) => {
ress.push(PoolObjInfo {
index,
err: Some(e),
..Default::default()
});
}
}
i += 1;
}
ress.sort_by(|a, b| {
let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
at.cmp(&bt)
});
for res in ress {
// check
if res.err.is_none() {
// TODO: let errs = self.poolsWithObject()
return Ok((res, Vec::new()));
}
}
let ret = PoolObjInfo::default();
Ok((ret, Vec::new()))
}
}
#[derive(Debug, Default)]
pub struct PoolObjInfo {
pub index: usize,
pub object_info: ObjectInfo,
pub err: Option<Error>,
}
#[derive(Debug, Default)]
@@ -301,7 +374,149 @@ impl StorageAPI for ECStore {
Ok(info)
}
async fn delete_objects(
&self,
bucket: &str,
objects: Vec<ObjectToDelete>,
opts: ObjectOptions,
) -> Result<(Vec<DeletedObject>, Vec<Option<Error>>)> {
// encode object name
let objects: Vec<ObjectToDelete> = objects
.iter()
.map(|v| {
let mut v = v.clone();
v.object_name = utils::path::encode_dir_object(v.object_name.as_str());
v
})
.collect();
// 默认返回值
let mut del_objects = vec![DeletedObject::default(); objects.len()];
let mut del_errs = Vec::with_capacity(objects.len());
for _ in 0..objects.len() {
del_errs.push(None)
}
// TODO: limte 限制并发数量
let opt = ObjectOptions::default();
// 取所有poolObjInfo
let mut futures = Vec::new();
for obj in objects.iter() {
futures.push(self.get_pool_info_existing_with_opts(bucket, &obj.object_name, &opt));
}
let results = join_all(futures).await;
// 记录pool Index 对应的objects pool_idx -> objects idx
let mut pool_index_objects = HashMap::new();
let mut i = 0;
for res in results {
match res {
Ok((pinfo, _)) => {
if pinfo.object_info.delete_marker && opts.version_id.is_empty() {
del_objects[i] = DeletedObject {
delete_marker: pinfo.object_info.delete_marker,
delete_marker_version_id: pinfo.object_info.version_id.map(|v| v.to_string()),
object_name: utils::path::decode_dir_object(&pinfo.object_info.name),
delete_marker_mtime: pinfo.object_info.mod_time,
..Default::default()
};
}
if !pool_index_objects.contains_key(&pinfo.index) {
pool_index_objects.insert(pinfo.index, vec![i]);
} else {
// let mut vals = pool_index_objects.
if let Some(val) = pool_index_objects.get_mut(&pinfo.index) {
val.push(i);
}
}
}
Err(e) => {
//TODO: check not found
del_errs[i] = Some(e)
}
}
i += 1;
}
if !pool_index_objects.is_empty() {
for sets in self.pools.iter() {
// 取pool idx 对应的 objects index
let vals = pool_index_objects.get(&sets.pool_idx);
if vals.is_none() {
continue;
}
let obj_idxs = vals.unwrap();
// 取对应obj,理论上不会none
let objs: Vec<ObjectToDelete> = obj_idxs
.iter()
.filter_map(|&idx| {
if let Some(obj) = objects.get(idx) {
Some(obj.clone())
} else {
None
}
})
.collect();
if objs.is_empty() {
continue;
}
let (pdel_objs, perrs) = sets.delete_objects(bucket, objs, opts.clone()).await?;
// perrs的顺序理论上跟obj_idxs顺序一致
let mut i = 0;
for err in perrs {
let obj_idx = obj_idxs[i];
if err.is_some() {
del_errs[obj_idx] = err;
}
let mut dobj = pdel_objs.get(i).unwrap().clone();
dobj.object_name = utils::path::decode_dir_object(&dobj.object_name);
del_objects[obj_idx] = dobj;
i += 1;
}
}
}
Ok((del_objects, del_errs))
}
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
if opts.delete_prefix {
self.delete_prefix(bucket, &object).await?;
return Ok(ObjectInfo::default());
}
let object = utils::path::encode_dir_object(object);
let object = object.as_str();
// 查询在哪个pool
let (mut pinfo, errs) = self.get_pool_info_existing_with_opts(bucket, object, &opts).await?;
if pinfo.object_info.delete_marker && opts.version_id.is_empty() {
pinfo.object_info.name = utils::path::decode_dir_object(object);
return Ok(pinfo.object_info);
}
if !errs.is_empty() {
// TODO: deleteObjectFromAllPools
}
let mut obj = self.pools[pinfo.index].delete_object(bucket, object, opts.clone()).await?;
obj.name = utils::path::decode_dir_object(object);
Ok(obj)
}
async fn list_objects_v2(
&self,
bucket: &str,

View File

@@ -10,15 +10,15 @@ pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
pub const BLOCK_SIZE_V2: usize = 1048576; // 1M
// #[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct FileInfo {
pub name: String,
pub volume: String,
pub version_id: Uuid,
pub version_id: Option<Uuid>,
pub erasure: ErasureInfo,
pub deleted: bool,
// DataDir of the file
pub data_dir: Uuid,
pub data_dir: Option<Uuid>,
pub mod_time: Option<OffsetDateTime>,
pub size: usize,
pub data: Option<Vec<u8>>,
@@ -27,7 +27,66 @@ pub struct FileInfo {
pub is_latest: bool,
}
// impl Default for FileInfo {
// fn default() -> Self {
// Self {
// version_id: Default::default(),
// erasure: Default::default(),
// deleted: Default::default(),
// data_dir: Default::default(),
// mod_time: None,
// size: Default::default(),
// data: Default::default(),
// fresh: Default::default(),
// name: Default::default(),
// volume: Default::default(),
// parts: Default::default(),
// is_latest: Default::default(),
// }
// }
// }
impl FileInfo {
pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
let indexs = {
let cardinality = data_blocks + parity_blocks;
let mut nums = vec![0; cardinality];
let key_crc = crc32fast::hash(object.as_bytes());
let start = key_crc as usize % cardinality;
for i in 1..=cardinality {
nums[i - 1] = 1 + ((start + i) % cardinality);
}
nums
};
Self {
erasure: ErasureInfo {
algorithm: String::from(ERASURE_ALGORITHM),
data_blocks,
parity_blocks,
block_size: BLOCK_SIZE_V2,
distribution: indexs,
..Default::default()
},
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
if self.deleted {
return true;
}
let data_blocks = self.erasure.data_blocks;
let parity_blocks = self.erasure.parity_blocks;
(data_blocks >= parity_blocks)
&& (data_blocks > 0)
&& (self.erasure.index > 0
&& self.erasure.index <= data_blocks + parity_blocks
&& self.erasure.distribution.len() == (data_blocks + parity_blocks))
}
pub fn is_remote(&self) -> bool {
// TODO: when lifecycle
false
@@ -86,7 +145,7 @@ impl FileInfo {
parity_blocks: self.erasure.parity_blocks,
data_blocks: self.erasure.data_blocks,
version_id: self.version_id,
deleted: self.deleted,
delete_marker: self.deleted,
mod_time: self.mod_time,
size: self.size,
parts: self.parts.clone(),
@@ -113,68 +172,6 @@ impl FileInfo {
}
}
impl Default for FileInfo {
fn default() -> Self {
Self {
version_id: Uuid::nil(),
erasure: Default::default(),
deleted: Default::default(),
data_dir: Uuid::nil(),
mod_time: None,
size: Default::default(),
data: Default::default(),
fresh: Default::default(),
name: Default::default(),
volume: Default::default(),
parts: Default::default(),
is_latest: Default::default(),
}
}
}
impl FileInfo {
pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
let indexs = {
let cardinality = data_blocks + parity_blocks;
let mut nums = vec![0; cardinality];
let key_crc = crc32fast::hash(object.as_bytes());
let start = key_crc as usize % cardinality;
for i in 1..=cardinality {
nums[i - 1] = 1 + ((start + i) % cardinality);
}
nums
};
Self {
erasure: ErasureInfo {
algorithm: String::from(ERASURE_ALGORITHM),
data_blocks,
parity_blocks,
block_size: BLOCK_SIZE_V2,
distribution: indexs,
..Default::default()
},
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
if self.deleted {
return true;
}
let data_blocks = self.erasure.data_blocks;
let parity_blocks = self.erasure.parity_blocks;
(data_blocks >= parity_blocks)
&& (data_blocks > 0)
&& (self.erasure.index > 0
&& self.erasure.index <= data_blocks + parity_blocks
&& self.erasure.distribution.len() == (data_blocks + parity_blocks))
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ObjectPartInfo {
// pub etag: Option<String>,
@@ -358,12 +355,15 @@ impl HTTPRangeSpec {
}
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files
pub max_parity: bool,
pub mod_time: Option<OffsetDateTime>,
pub part_number: usize,
pub delete_prefix: bool,
pub version_id: String,
}
// impl Default for ObjectOptions {
@@ -413,13 +413,13 @@ impl From<s3s::dto::CompletedPart> for CompletePart {
pub struct ObjectInfo {
pub bucket: String,
pub name: String,
pub mod_time: Option<OffsetDateTime>,
pub size: usize,
pub is_dir: bool,
pub parity_blocks: usize,
pub data_blocks: usize,
pub version_id: Uuid,
pub deleted: bool,
pub mod_time: Option<OffsetDateTime>,
pub size: usize,
pub version_id: Option<Uuid>,
pub delete_marker: bool,
pub parts: Vec<ObjectPartInfo>,
pub is_latest: bool,
}
@@ -468,13 +468,36 @@ pub struct ListObjectsV2Info {
pub prefixes: Vec<String>,
}
#[derive(Debug, Default, Clone)]
pub struct ObjectToDelete {
pub object_name: String,
pub version_id: Option<Uuid>,
}
#[derive(Debug, Default, Clone)]
pub struct DeletedObject {
pub delete_marker: bool,
pub delete_marker_version_id: Option<String>,
pub object_name: String,
pub version_id: Option<String>,
// MTime of DeleteMarker on source that needs to be propagated to replica
pub delete_marker_mtime: Option<OffsetDateTime>,
// to support delete marker replication
// pub replication_state: ReplicationState,
}
#[async_trait::async_trait]
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo>;
async fn delete_objects(
&self,
bucket: &str,
objects: Vec<ObjectToDelete>,
opts: ObjectOptions,
) -> Result<(Vec<DeletedObject>, Vec<Option<Error>>)>;
async fn list_objects_v2(
&self,
bucket: &str,

BIN
rustfs-inner.zip Normal file

Binary file not shown.

View File

@@ -23,7 +23,7 @@ http.workspace = true
bytes.workspace = true
futures.workspace = true
futures-util.workspace = true
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
ecstore = { path = "../ecstore" }
s3s = "0.10.0"
clap = { version = "4.5.7", features = ["derive"] }

View File

@@ -31,10 +31,13 @@ fn setup_tracing() {
}
fn main() -> Result<()> {
//解析获得到的参数
let opt = config::Opt::parse();
//设置trace
setup_tracing();
//运行参数
run(opt)
}
@@ -42,7 +45,9 @@ fn main() -> Result<()> {
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
//监听地址,端口从参数中获取
let listener = TcpListener::bind(opt.address.clone()).await?;
//获取监听地址
let local_addr: SocketAddr = listener.local_addr()?;
// let mut domain_name = {
@@ -62,9 +67,11 @@ async fn run(opt: config::Opt) -> Result<()> {
// };
// Setup S3 service
// 本项目使用s3s库来实现s3服务
let service = {
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?);
//设置AK和SK
//其中部份内容从config配置文件中读取
let mut access_key = String::from_str(config::DEFAULT_ACCESS_KEY).unwrap();
let mut secret_key = String::from_str(config::DEFAULT_SECRET_KEY).unwrap();
@@ -73,7 +80,7 @@ async fn run(opt: config::Opt) -> Result<()> {
access_key = ak;
secret_key = sk;
}
//显示info信息
info!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(SimpleAuth::from_single(access_key, secret_key));

View File

@@ -6,6 +6,7 @@ use ecstore::store_api::HTTPRangeSpec;
use ecstore::store_api::MakeBucketOptions;
use ecstore::store_api::MultipartUploadResult;
use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
use futures::pin_mut;
@@ -21,6 +22,7 @@ use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
use ecstore::error::Result;
use ecstore::store::ECStore;
@@ -91,17 +93,111 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let _input = req.input;
let DeleteObjectInput {
bucket, key, version_id, ..
} = req.input;
let output = DeleteObjectOutput::default();
let version_id = version_id
.as_ref()
.map(|v| match Uuid::parse_str(v) {
Ok(id) => Some(id),
Err(_) => None,
})
.unwrap_or_default();
let dobj = ObjectToDelete {
object_name: key,
version_id,
};
let objects: Vec<ObjectToDelete> = vec![dobj];
let (dobjs, _errs) = try_!(self.store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
// TODO: let errors;
let (delete_marker, version_id) = {
if let Some((a, b)) = dobjs
.iter()
.map(|v| {
let delete_marker = {
if v.delete_marker {
Some(true)
} else {
None
}
};
let version_id = v.version_id.clone();
(delete_marker, version_id)
})
.next()
{
(a, b)
} else {
(None, None)
}
};
let output = DeleteObjectOutput {
delete_marker,
version_id,
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let _input = req.input;
// info!("delete_objects args {:?}", req.input);
let output = DeleteObjectsOutput { ..Default::default() };
let DeleteObjectsInput { bucket, delete, .. } = req.input;
let objects: Vec<ObjectToDelete> = delete
.objects
.iter()
.map(|v| {
let version_id = v
.version_id
.as_ref()
.map(|v| match Uuid::parse_str(v) {
Ok(id) => Some(id),
Err(_) => None,
})
.unwrap_or_default();
ObjectToDelete {
object_name: v.key.clone(),
version_id: version_id,
}
})
.collect();
let (dobjs, _errs) = try_!(self.store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
// info!("delete_objects res {:?} {:?}", &dobjs, errs);
let deleted = dobjs
.iter()
.map(|v| DeletedObject {
delete_marker: {
if v.delete_marker {
Some(true)
} else {
None
}
},
delete_marker_version_id: v.delete_marker_version_id.clone(),
key: Some(v.object_name.clone()),
version_id: v.version_id.clone(),
})
.collect();
// TODO: let errors;
let output = DeleteObjectsOutput {
deleted: Some(deleted),
// errors,
..Default::default()
};
Ok(S3Response::new(output))
}