From 449f8633cf68cf59c3aed97ee07ec782dfa34ed0 Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 25 Jul 2024 16:57:43 +0800 Subject: [PATCH] list_bucket done --- README.md | 11 +---- TODO.md | 30 ++++++++++++ ecstore/src/disk.rs | 29 +++++++++++ ecstore/src/disk_api.rs | 1 + ecstore/src/peer.rs | 98 +++++++++++++++++++++++++++++++++++++- ecstore/src/sets.rs | 3 ++ ecstore/src/store.rs | 5 ++ ecstore/src/store_api.rs | 1 + rustfs/src/storage/ecfs.rs | 17 ++++++- 9 files changed, 182 insertions(+), 13 deletions(-) create mode 100644 TODO.md diff --git a/README.md b/README.md index e1fe0475..3bfa6b7a 100644 --- a/README.md +++ b/README.md @@ -1,10 +1 @@ -# s3-rustfs - -# TODO LIST - -## ecstore - - - [ ] 删除旧版本文件 - - [ ] EC可用读写数量判断 - - [ ] 小文件存储到metafile, inlinedata - - [ ] 优化并发执行 \ No newline at end of file +# s3-rustfs \ No newline at end of file diff --git a/TODO.md b/TODO.md new file mode 100644 index 00000000..857e82e5 --- /dev/null +++ b/TODO.md @@ -0,0 +1,30 @@ +# TODO LIST + +## 基础存储 + + - [ ] 删除旧版本文件 + - [ ] EC可用读写数量判断 + - [ ] 小文件存储到metafile, inlinedata + - [ ] 错误类型判断 + - [ ] 优化并发执行 + - [ ] 抽象出metafile存储 + - [ ] 代码优化 + +## 基础功能 + + - [ ] 桶操作 + - [x] 创建 + - [ ] 列表 + - [ ] 详情 + - [ ] 删除 +- [ ] 文件操作 + - [x] 上传 + - [x] 大文件上传 + - [x] 下载 + - [ ] 删除 + +## 扩展功能 + +- [ ] 版本控制 +- [ ] 对象锁 +- [ ] 修复 \ No newline at end of file diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 0235ab29..ad3ea699 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -1,6 +1,7 @@ use std::{ fs::Metadata, io::SeekFrom, + os::unix::ffi::OsStringExt, path::{Path, PathBuf}, sync::Arc, }; @@ -638,7 +639,35 @@ impl DiskAPI for LocalDisk { Err(Error::new(DiskError::VolumeExists)) } + async fn list_volumes(&self) -> Result> { + let mut entries = fs::read_dir(&self.root).await?; + let mut volumes = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + if let Ok(metadata) = entry.metadata().await { + let vec = entry.file_name().into_vec(); + + if !metadata.is_dir() { + continue; + } + + let name = match String::from_utf8(vec) { + Ok(s) => s, + Err(_) => return Err(Error::msg("Not supported utf8 file name on this platform")), + }; + + let created = match metadata.created() { + Ok(md) => OffsetDateTime::from(md), + Err(_) => return Err(Error::msg("Not supported created on this platform")), + }; + + volumes.push(VolumeInfo { name, created }); + } + } + + Ok(volumes) + } async fn stat_volume(&self, volume: &str) -> Result { let p = self.get_bucket_path(volume)?; diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index a2678d64..8f2c2316 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -36,6 +36,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { ) -> Result; async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>; + async fn list_volumes(&self) -> Result>; async fn make_volume(&self, volume: &str) -> Result<()>; async fn stat_volume(&self, volume: &str) -> Result; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 40d0f47f..5638436d 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -1,11 +1,12 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use futures::future::join_all; -use std::{fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use tracing::warn; use crate::{ disk::DiskStore, - disk_api::DiskError, + disk_api::{DiskError, VolumeInfo}, endpoint::{EndpointServerPools, Node}, store_api::{BucketInfo, BucketOptions, MakeBucketOptions}, }; @@ -15,6 +16,7 @@ type Client = Arc>; #[async_trait] pub trait PeerS3Client: Debug + Sync + Send + 'static { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; + async fn list_bucket(&self, opts: &BucketOptions) -> Result>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; fn get_pools(&self) -> Vec; } @@ -58,6 +60,51 @@ impl PeerS3Client for S3PeerSys { fn get_pools(&self) -> Vec { unimplemented!() } + async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + let mut futures = Vec::with_capacity(self.clients.len()); + for cli in self.clients.iter() { + futures.push(cli.list_bucket(opts)); + } + + let mut errors = Vec::with_capacity(self.clients.len()); + let mut ress = Vec::with_capacity(self.clients.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(res) => { + ress.push(Some(res)); + errors.push(None); + } + Err(e) => { + ress.push(None); + errors.push(Some(e)); + } + } + } + // TODO: reduceWriteQuorumErrs + // for i in 0..self.pools_count {} + + let mut uniq_map: HashMap<&String, &BucketInfo> = HashMap::new(); + + for res in ress.iter() { + if res.is_none() { + continue; + } + + let buckets = res.as_ref().unwrap(); + + for bucket in buckets.iter() { + if !uniq_map.contains_key(&bucket.name) { + uniq_map.insert(&bucket.name, bucket); + } + } + } + + let buckets: Vec = uniq_map.values().map(|&v| v.clone()).collect(); + + Ok(buckets) + } async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { let mut futures = Vec::with_capacity(self.clients.len()); for cli in self.clients.iter() { @@ -159,6 +206,50 @@ impl PeerS3Client for LocalPeerS3Client { fn get_pools(&self) -> Vec { self.pools.clone() } + async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + let mut futures = Vec::with_capacity(self.local_disks.len()); + for disk in self.local_disks.iter() { + futures.push(disk.list_volumes()); + } + + let results = join_all(futures).await; + + let mut ress = Vec::new(); + let mut errs = Vec::new(); + + for result in results { + match result { + Ok(res) => { + ress.push(res); + errs.push(None); + } + Err(e) => errs.push(Some(e)), + } + } + + warn!("list_bucket errs {:?}", &errs); + + let mut uniq_map: HashMap<&String, &VolumeInfo> = HashMap::new(); + + for info_list in ress.iter() { + for info in info_list.iter() { + // TODO: check name valid + if !uniq_map.contains_key(&info.name) { + uniq_map.insert(&info.name, info); + } + } + } + + let buckets: Vec = uniq_map + .values() + .map(|&v| BucketInfo { + name: v.name.clone(), + created: v.created, + }) + .collect(); + + Ok(buckets) + } async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { let mut futures = Vec::with_capacity(self.local_disks.len()); for disk in self.local_disks.iter() { @@ -248,6 +339,9 @@ impl PeerS3Client for RemotePeerS3Client { fn get_pools(&self) -> Vec { unimplemented!() } + async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + unimplemented!() + } async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> { unimplemented!() } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 60ca10f2..baea12ae 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -122,6 +122,9 @@ impl Sets { #[async_trait::async_trait] impl StorageAPI for Sets { + async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + unimplemented!() + } async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> { unimplemented!() } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 0eec6455..51955ad7 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -114,6 +114,11 @@ impl ECStore { #[async_trait::async_trait] impl StorageAPI for ECStore { + async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + let buckets = self.peer_sys.list_bucket(opts).await?; + + Ok(buckets) + } async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { // TODO: check valid bucket name diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index c2c46f3c..0e8ec934 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -426,6 +426,7 @@ pub struct ObjectInfo { #[async_trait::async_trait] pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; + async fn list_bucket(&self, opts: &BucketOptions) -> Result>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; async fn get_object_reader( diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 780b3cf2..6712973f 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -227,7 +227,22 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self))] async fn list_buckets(&self, _: S3Request) -> S3Result> { - let output = ListBucketsOutput { ..Default::default() }; + // mc ls + + let bucket_infos = try_!(self.store.list_bucket(&BucketOptions {}).await); + + let buckets: Vec = bucket_infos + .iter() + .map(|v| Bucket { + creation_date: Some(Timestamp::from(v.created)), + name: Some(v.name.clone()), + }) + .collect(); + + let output = ListBucketsOutput { + buckets: Some(buckets), + ..Default::default() + }; Ok(S3Response::new(output)) }