mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
3 Commits
1.0.0-alph
...
1.0.0-alph
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
10c949af62 | ||
|
|
4a3325276d | ||
|
|
c5f6c66f72 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8205,6 +8205,7 @@ name = "rustfs-ecstore"
|
||||
version = "0.0.5"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
"aws-sdk-s3",
|
||||
"base64 0.22.1",
|
||||
|
||||
@@ -100,6 +100,7 @@ rustfs-rio.workspace = true
|
||||
rustfs-signer.workspace = true
|
||||
rustfs-checksums.workspace = true
|
||||
futures-util.workspace = true
|
||||
async-recursion.workspace = true
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
nix = { workspace = true }
|
||||
|
||||
@@ -440,6 +440,7 @@ impl LocalDisk {
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
#[async_recursion::async_recursion]
|
||||
pub async fn delete_file(
|
||||
&self,
|
||||
base_path: &PathBuf,
|
||||
@@ -803,13 +804,17 @@ impl LocalDisk {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn scan_dir<W: AsyncWrite + Unpin>(
|
||||
#[async_recursion::async_recursion]
|
||||
async fn scan_dir<W>(
|
||||
&self,
|
||||
current: &mut String,
|
||||
opts: &WalkDirOptions,
|
||||
out: &mut MetacacheWriter<W>,
|
||||
objs_returned: &mut i32,
|
||||
) -> Result<()> {
|
||||
) -> Result<()>
|
||||
where
|
||||
W: AsyncWrite + Unpin + Send,
|
||||
{
|
||||
let forward = {
|
||||
opts.forward_to.as_ref().filter(|v| v.starts_with(&*current)).map(|v| {
|
||||
let forward = v.trim_start_matches(&*current);
|
||||
|
||||
@@ -177,15 +177,17 @@ impl S3PeerSys {
|
||||
let pools = cli.get_pools();
|
||||
let idx = i;
|
||||
if pools.unwrap_or_default().contains(&idx) {
|
||||
per_pool_errs.push(errors[j].as_ref());
|
||||
per_pool_errs.push(errors[j].clone());
|
||||
}
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
if let Some(pool_err) =
|
||||
reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1)
|
||||
{
|
||||
return Err(pool_err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO:
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
|
||||
@@ -387,7 +389,6 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
if opts.force_create && matches!(e, Error::VolumeExists) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
@@ -405,7 +406,9 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
if let Some(err) = reduce_write_quorum_errs(&errs, BUCKET_OP_IGNORED_ERRS, (local_disks.len() / 2) + 1) {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1221,7 +1221,7 @@ impl StorageAPI for ECStore {
|
||||
}
|
||||
|
||||
if let Err(err) = self.peer_sys.make_bucket(bucket, opts).await {
|
||||
let err = err.into();
|
||||
let err = to_object_err(err.into(), vec![bucket]);
|
||||
if !is_err_bucket_exists(&err) {
|
||||
let _ = self
|
||||
.delete_bucket(
|
||||
@@ -1234,7 +1234,6 @@ impl StorageAPI for ECStore {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Err(err);
|
||||
};
|
||||
|
||||
|
||||
@@ -231,6 +231,20 @@ Retrieve an object from S3 with two operation modes: read content directly or do
|
||||
- `local_path` (string, optional): Local file path (required when mode is "download")
|
||||
- `max_content_size` (number, optional): Maximum content size in bytes for read mode (default: 1MB)
|
||||
|
||||
### `create_bucket`
|
||||
|
||||
Create a new S3 bucket with the specified name.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
- `bucket_name` (string): Source S3 bucket.
|
||||
|
||||
### `delete_bucket`
|
||||
|
||||
Delete the specified S3 bucket. If the bucket is not empty, the deletion will fail. You should delete all objects and objects inside them before calling this method.**WARNING: This operation will permanently delete the bucket and all objects within it!**
|
||||
|
||||
- `bucket_name` (string): Source S3 bucket.
|
||||
|
||||
## Architecture
|
||||
|
||||
The MCP server is built with a modular architecture:
|
||||
|
||||
@@ -151,6 +151,36 @@ impl S3Client {
|
||||
Ok(Self { client })
|
||||
}
|
||||
|
||||
pub async fn create_bucket(&self, bucket_name: &str) -> Result<BucketInfo> {
|
||||
info!("Creating S3 bucket: {}", bucket_name);
|
||||
|
||||
self.client
|
||||
.create_bucket()
|
||||
.bucket(bucket_name)
|
||||
.send()
|
||||
.await
|
||||
.context(format!("Failed to create S3 bucket: {bucket_name}"))?;
|
||||
|
||||
info!("Bucket '{}' created successfully", bucket_name);
|
||||
Ok(BucketInfo {
|
||||
name: bucket_name.to_string(),
|
||||
creation_date: None, // Creation date not returned by create_bucket
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn delete_bucket(&self, bucket_name: &str) -> Result<()> {
|
||||
info!("Deleting S3 bucket: {}", bucket_name);
|
||||
self.client
|
||||
.delete_bucket()
|
||||
.bucket(bucket_name)
|
||||
.send()
|
||||
.await
|
||||
.context(format!("Failed to delete S3 bucket: {bucket_name}"))?;
|
||||
|
||||
info!("Bucket '{}' deleted successfully", bucket_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
|
||||
debug!("Listing S3 buckets");
|
||||
|
||||
|
||||
@@ -54,6 +54,18 @@ pub struct UploadFileRequest {
|
||||
pub cache_control: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, JsonSchema)]
|
||||
pub struct CreateBucketReqeust {
|
||||
#[schemars(description = "Name of the S3 bucket to create")]
|
||||
pub bucket_name: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, JsonSchema)]
|
||||
pub struct DeleteBucketReqeust {
|
||||
#[schemars(description = "Name of the S3 bucket to delete")]
|
||||
pub bucket_name: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, JsonSchema)]
|
||||
pub struct GetObjectRequest {
|
||||
#[schemars(description = "Name of the S3 bucket")]
|
||||
@@ -110,6 +122,53 @@ impl RustfsMcpServer {
|
||||
})
|
||||
}
|
||||
|
||||
#[tool(description = "Create a new S3 bucket with the specified name")]
|
||||
pub async fn create_bucket(&self, Parameters(req): Parameters<CreateBucketReqeust>) -> String {
|
||||
info!("Executing create_bucket tool for bucket: {}", req.bucket_name);
|
||||
|
||||
match self.s3_client.create_bucket(&req.bucket_name).await {
|
||||
Ok(_) => {
|
||||
format!("Successfully created bucket: {}", req.bucket_name)
|
||||
}
|
||||
Err(e) => {
|
||||
format!("Failed to create bucket '{}': {:?}", req.bucket_name, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tool(description = "Delete an existing S3 bucket with the specified name")]
|
||||
pub async fn delete_bucket(&self, Parameters(req): Parameters<DeleteBucketReqeust>) -> String {
|
||||
info!("Executing delete_bucket tool for bucket: {}", req.bucket_name);
|
||||
|
||||
// check if bucket is empty, if not, can not delete bucket directly.
|
||||
let object_result = match self
|
||||
.s3_client
|
||||
.list_objects_v2(&req.bucket_name, ListObjectsOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
error!("Failed to list objects in bucket '{}': {:?}", req.bucket_name, e);
|
||||
return format!("Failed to list objects in bucket '{}': {:?}", req.bucket_name, e);
|
||||
}
|
||||
};
|
||||
|
||||
if !object_result.objects.is_empty() {
|
||||
error!("Bucket '{}' is not empty", req.bucket_name);
|
||||
return format!("Failed to delete bucket '{}': bucket is not empty", req.bucket_name);
|
||||
}
|
||||
|
||||
// delete the bucket.
|
||||
match self.s3_client.delete_bucket(&req.bucket_name).await {
|
||||
Ok(_) => {
|
||||
format!("Successfully deleted bucket: {}", req.bucket_name)
|
||||
}
|
||||
Err(e) => {
|
||||
format!("Failed to delete bucket '{}': {:?}", req.bucket_name, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tool(description = "List all S3 buckets accessible with the configured credentials")]
|
||||
pub async fn list_buckets(&self) -> String {
|
||||
info!("Executing list_buckets tool");
|
||||
@@ -667,4 +726,20 @@ mod tests {
|
||||
assert_eq!(read_mode_deser, GetObjectMode::Read);
|
||||
assert_eq!(download_mode_deser, GetObjectMode::Download);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bucket_creation() {
|
||||
let request = CreateBucketReqeust {
|
||||
bucket_name: "test-bucket".to_string(),
|
||||
};
|
||||
assert_eq!(request.bucket_name, "test-bucket");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bucket_deletion() {
|
||||
let request = DeleteBucketReqeust {
|
||||
bucket_name: "test-bucket".to_string(),
|
||||
};
|
||||
assert_eq!(request.bucket_name, "test-bucket");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,7 +311,7 @@ impl S3 for FS {
|
||||
.make_bucket(
|
||||
&bucket,
|
||||
&MakeBucketOptions {
|
||||
force_create: true,
|
||||
force_create: false, // TODO: force support
|
||||
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
|
||||
..Default::default()
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user