list_bucket done

This commit is contained in:
weisd
2024-07-25 16:57:43 +08:00
parent b53056fe09
commit 449f8633cf
9 changed files with 182 additions and 13 deletions

View File

@@ -1,10 +1 @@
# s3-rustfs
# TODO LIST
## ecstore
- [ ] 删除旧版本文件
- [ ] EC可用读写数量判断
- [ ] 小文件存储到metafile, inlinedata
- [ ] 优化并发执行
# s3-rustfs

30
TODO.md Normal file
View File

@@ -0,0 +1,30 @@
# TODO LIST
## 基础存储
- [ ] 删除旧版本文件
- [ ] EC可用读写数量判断
- [ ] 小文件存储到metafile, inlinedata
- [ ] 错误类型判断
- [ ] 优化并发执行
- [ ] 抽象出metafile存储
- [ ] 代码优化
## 基础功能
- [ ] 桶操作
- [x] 创建
- [ ] 列表
- [ ] 详情
- [ ] 删除
- [ ] 文件操作
- [x] 上传
- [x] 大文件上传
- [x] 下载
- [ ] 删除
## 扩展功能
- [ ] 版本控制
- [ ] 对象锁
- [ ] 修复

View File

@@ -1,6 +1,7 @@
use std::{
fs::Metadata,
io::SeekFrom,
os::unix::ffi::OsStringExt,
path::{Path, PathBuf},
sync::Arc,
};
@@ -638,7 +639,35 @@ impl DiskAPI for LocalDisk {
Err(Error::new(DiskError::VolumeExists))
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
let mut entries = fs::read_dir(&self.root).await?;
let mut volumes = Vec::new();
while let Some(entry) = entries.next_entry().await? {
if let Ok(metadata) = entry.metadata().await {
let vec = entry.file_name().into_vec();
if !metadata.is_dir() {
continue;
}
let name = match String::from_utf8(vec) {
Ok(s) => s,
Err(_) => return Err(Error::msg("Not supported utf8 file name on this platform")),
};
let created = match metadata.created() {
Ok(md) => OffsetDateTime::from(md),
Err(_) => return Err(Error::msg("Not supported created on this platform")),
};
volumes.push(VolumeInfo { name, created });
}
}
Ok(volumes)
}
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
let p = self.get_bucket_path(volume)?;

View File

@@ -36,6 +36,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
) -> Result<RenameDataResp>;
async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>;
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>>;
async fn make_volume(&self, volume: &str) -> Result<()>;
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo>;

View File

@@ -1,11 +1,12 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use futures::future::join_all;
use std::{fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tracing::warn;
use crate::{
disk::DiskStore,
disk_api::DiskError,
disk_api::{DiskError, VolumeInfo},
endpoint::{EndpointServerPools, Node},
store_api::{BucketInfo, BucketOptions, MakeBucketOptions},
};
@@ -15,6 +16,7 @@ type Client = Arc<Box<dyn PeerS3Client>>;
#[async_trait]
pub trait PeerS3Client: Debug + Sync + Send + 'static {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
fn get_pools(&self) -> Vec<usize>;
}
@@ -58,6 +60,51 @@ impl PeerS3Client for S3PeerSys {
fn get_pools(&self) -> Vec<usize> {
unimplemented!()
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.list_bucket(opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
let mut ress = Vec::with_capacity(self.clients.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(res) => {
ress.push(Some(res));
errors.push(None);
}
Err(e) => {
ress.push(None);
errors.push(Some(e));
}
}
}
// TODO: reduceWriteQuorumErrs
// for i in 0..self.pools_count {}
let mut uniq_map: HashMap<&String, &BucketInfo> = HashMap::new();
for res in ress.iter() {
if res.is_none() {
continue;
}
let buckets = res.as_ref().unwrap();
for bucket in buckets.iter() {
if !uniq_map.contains_key(&bucket.name) {
uniq_map.insert(&bucket.name, bucket);
}
}
}
let buckets: Vec<BucketInfo> = uniq_map.values().map(|&v| v.clone()).collect();
Ok(buckets)
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
@@ -159,6 +206,50 @@ impl PeerS3Client for LocalPeerS3Client {
fn get_pools(&self) -> Vec<usize> {
self.pools.clone()
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
futures.push(disk.list_volumes());
}
let results = join_all(futures).await;
let mut ress = Vec::new();
let mut errs = Vec::new();
for result in results {
match result {
Ok(res) => {
ress.push(res);
errs.push(None);
}
Err(e) => errs.push(Some(e)),
}
}
warn!("list_bucket errs {:?}", &errs);
let mut uniq_map: HashMap<&String, &VolumeInfo> = HashMap::new();
for info_list in ress.iter() {
for info in info_list.iter() {
// TODO: check name valid
if !uniq_map.contains_key(&info.name) {
uniq_map.insert(&info.name, info);
}
}
}
let buckets: Vec<BucketInfo> = uniq_map
.values()
.map(|&v| BucketInfo {
name: v.name.clone(),
created: v.created,
})
.collect();
Ok(buckets)
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
@@ -248,6 +339,9 @@ impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Vec<usize> {
unimplemented!()
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
unimplemented!()
}
async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> {
unimplemented!()
}

View File

@@ -122,6 +122,9 @@ impl Sets {
#[async_trait::async_trait]
impl StorageAPI for Sets {
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
unimplemented!()
}
async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> {
unimplemented!()
}

View File

@@ -114,6 +114,11 @@ impl ECStore {
#[async_trait::async_trait]
impl StorageAPI for ECStore {
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let buckets = self.peer_sys.list_bucket(opts).await?;
Ok(buckets)
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
// TODO: check valid bucket name

View File

@@ -426,6 +426,7 @@ pub struct ObjectInfo {
#[async_trait::async_trait]
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn get_object_reader(

View File

@@ -227,7 +227,22 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self))]
async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
let output = ListBucketsOutput { ..Default::default() };
// mc ls
let bucket_infos = try_!(self.store.list_bucket(&BucketOptions {}).await);
let buckets: Vec<Bucket> = bucket_infos
.iter()
.map(|v| Bucket {
creation_date: Some(Timestamp::from(v.created)),
name: Some(v.name.clone()),
})
.collect();
let output = ListBucketsOutput {
buckets: Some(buckets),
..Default::default()
};
Ok(S3Response::new(output))
}