Compare commits

..

3 Commits

Author SHA1 Message Date
weisd
10c949af62 fix:make bucket exists (#428) 2025-08-19 16:14:59 +08:00
reigadegr
4a3325276d fix(ecstore): add async-recursion to resolve nightly trait solver reg… (#415)
* fix(ecstore): add async-recursion to resolve nightly trait solver regression

The newest nightly compiler switched to the new trait solver, which
currently rejects async recursive functions that were previously accepted.
This causes the following compilation failures:

- `LocalDisk::delete_file()`
- `LocalDisk::scan_dir()`

Add `async-recursion` as a workspace dependency and annotate both functions with `#[async_recursion]` so that the crate compiles cleanly with the latest nightly and will continue to build once the new solver lands in stable.

Signed-off-by: reigadegr <2722688642@qq.com>

* fix: resolve duplicate bound error in scan_dir function

Replaced inline trait bounds with where clause to avoid duplication caused by macro expansion.

Signed-off-by: reigadegr <2722688642@qq.com>

---------

Signed-off-by: reigadegr <2722688642@qq.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2025-08-18 20:58:05 +08:00
majinghe
c5f6c66f72 feat: extend rustfs mcp with bucket creation and deletion (#416)
* feat: extend rustfs mcp with bucket creation and deletion

* update file to fix pipeline error

* change variable name to fix pipeline error
2025-08-18 09:06:55 +08:00
9 changed files with 139 additions and 11 deletions

1
Cargo.lock generated
View File

@@ -8205,6 +8205,7 @@ name = "rustfs-ecstore"
version = "0.0.5"
dependencies = [
"async-channel",
"async-recursion",
"async-trait",
"aws-sdk-s3",
"base64 0.22.1",

View File

@@ -100,6 +100,7 @@ rustfs-rio.workspace = true
rustfs-signer.workspace = true
rustfs-checksums.workspace = true
futures-util.workspace = true
async-recursion.workspace = true
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -440,6 +440,7 @@ impl LocalDisk {
}
#[tracing::instrument(level = "debug", skip(self))]
#[async_recursion::async_recursion]
pub async fn delete_file(
&self,
base_path: &PathBuf,
@@ -803,13 +804,17 @@ impl LocalDisk {
Ok(())
}
async fn scan_dir<W: AsyncWrite + Unpin>(
#[async_recursion::async_recursion]
async fn scan_dir<W>(
&self,
current: &mut String,
opts: &WalkDirOptions,
out: &mut MetacacheWriter<W>,
objs_returned: &mut i32,
) -> Result<()> {
) -> Result<()>
where
W: AsyncWrite + Unpin + Send,
{
let forward = {
opts.forward_to.as_ref().filter(|v| v.starts_with(&*current)).map(|v| {
let forward = v.trim_start_matches(&*current);

View File

@@ -177,15 +177,17 @@ impl S3PeerSys {
let pools = cli.get_pools();
let idx = i;
if pools.unwrap_or_default().contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
per_pool_errs.push(errors[j].clone());
}
// TODO: reduceWriteQuorumErrs
if let Some(pool_err) =
reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1)
{
return Err(pool_err);
}
}
}
// TODO:
Ok(())
}
pub async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
@@ -387,7 +389,6 @@ impl PeerS3Client for LocalPeerS3Client {
if opts.force_create && matches!(e, Error::VolumeExists) {
return Ok(());
}
Err(e)
}
}
@@ -405,7 +406,9 @@ impl PeerS3Client for LocalPeerS3Client {
}
}
// TODO: reduceWriteQuorumErrs
if let Some(err) = reduce_write_quorum_errs(&errs, BUCKET_OP_IGNORED_ERRS, (local_disks.len() / 2) + 1) {
return Err(err);
}
Ok(())
}

View File

@@ -1221,7 +1221,7 @@ impl StorageAPI for ECStore {
}
if let Err(err) = self.peer_sys.make_bucket(bucket, opts).await {
let err = err.into();
let err = to_object_err(err.into(), vec![bucket]);
if !is_err_bucket_exists(&err) {
let _ = self
.delete_bucket(
@@ -1234,7 +1234,6 @@ impl StorageAPI for ECStore {
)
.await;
}
return Err(err);
};

View File

@@ -231,6 +231,20 @@ Retrieve an object from S3 with two operation modes: read content directly or do
- `local_path` (string, optional): Local file path (required when mode is "download")
- `max_content_size` (number, optional): Maximum content size in bytes for read mode (default: 1MB)
### `create_bucket`
Create a new S3 bucket with the specified name.
**Parameters:**
- `bucket_name` (string): Source S3 bucket.
### `delete_bucket`
Delete the specified S3 bucket. If the bucket is not empty, the deletion will fail. You should delete all objects and objects inside them before calling this method.**WARNING: This operation will permanently delete the bucket and all objects within it!**
- `bucket_name` (string): Source S3 bucket.
## Architecture
The MCP server is built with a modular architecture:

View File

@@ -151,6 +151,36 @@ impl S3Client {
Ok(Self { client })
}
pub async fn create_bucket(&self, bucket_name: &str) -> Result<BucketInfo> {
info!("Creating S3 bucket: {}", bucket_name);
self.client
.create_bucket()
.bucket(bucket_name)
.send()
.await
.context(format!("Failed to create S3 bucket: {bucket_name}"))?;
info!("Bucket '{}' created successfully", bucket_name);
Ok(BucketInfo {
name: bucket_name.to_string(),
creation_date: None, // Creation date not returned by create_bucket
})
}
pub async fn delete_bucket(&self, bucket_name: &str) -> Result<()> {
info!("Deleting S3 bucket: {}", bucket_name);
self.client
.delete_bucket()
.bucket(bucket_name)
.send()
.await
.context(format!("Failed to delete S3 bucket: {bucket_name}"))?;
info!("Bucket '{}' deleted successfully", bucket_name);
Ok(())
}
pub async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
debug!("Listing S3 buckets");

View File

@@ -54,6 +54,18 @@ pub struct UploadFileRequest {
pub cache_control: Option<String>,
}
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct CreateBucketReqeust {
#[schemars(description = "Name of the S3 bucket to create")]
pub bucket_name: String,
}
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct DeleteBucketReqeust {
#[schemars(description = "Name of the S3 bucket to delete")]
pub bucket_name: String,
}
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct GetObjectRequest {
#[schemars(description = "Name of the S3 bucket")]
@@ -110,6 +122,53 @@ impl RustfsMcpServer {
})
}
#[tool(description = "Create a new S3 bucket with the specified name")]
pub async fn create_bucket(&self, Parameters(req): Parameters<CreateBucketReqeust>) -> String {
info!("Executing create_bucket tool for bucket: {}", req.bucket_name);
match self.s3_client.create_bucket(&req.bucket_name).await {
Ok(_) => {
format!("Successfully created bucket: {}", req.bucket_name)
}
Err(e) => {
format!("Failed to create bucket '{}': {:?}", req.bucket_name, e)
}
}
}
#[tool(description = "Delete an existing S3 bucket with the specified name")]
pub async fn delete_bucket(&self, Parameters(req): Parameters<DeleteBucketReqeust>) -> String {
info!("Executing delete_bucket tool for bucket: {}", req.bucket_name);
// check if bucket is empty, if not, can not delete bucket directly.
let object_result = match self
.s3_client
.list_objects_v2(&req.bucket_name, ListObjectsOptions::default())
.await
{
Ok(result) => result,
Err(e) => {
error!("Failed to list objects in bucket '{}': {:?}", req.bucket_name, e);
return format!("Failed to list objects in bucket '{}': {:?}", req.bucket_name, e);
}
};
if !object_result.objects.is_empty() {
error!("Bucket '{}' is not empty", req.bucket_name);
return format!("Failed to delete bucket '{}': bucket is not empty", req.bucket_name);
}
// delete the bucket.
match self.s3_client.delete_bucket(&req.bucket_name).await {
Ok(_) => {
format!("Successfully deleted bucket: {}", req.bucket_name)
}
Err(e) => {
format!("Failed to delete bucket '{}': {:?}", req.bucket_name, e)
}
}
}
#[tool(description = "List all S3 buckets accessible with the configured credentials")]
pub async fn list_buckets(&self) -> String {
info!("Executing list_buckets tool");
@@ -667,4 +726,20 @@ mod tests {
assert_eq!(read_mode_deser, GetObjectMode::Read);
assert_eq!(download_mode_deser, GetObjectMode::Download);
}
#[test]
fn test_bucket_creation() {
let request = CreateBucketReqeust {
bucket_name: "test-bucket".to_string(),
};
assert_eq!(request.bucket_name, "test-bucket");
}
#[test]
fn test_bucket_deletion() {
let request = DeleteBucketReqeust {
bucket_name: "test-bucket".to_string(),
};
assert_eq!(request.bucket_name, "test-bucket");
}
}

View File

@@ -311,7 +311,7 @@ impl S3 for FS {
.make_bucket(
&bucket,
&MakeBucketOptions {
force_create: true,
force_create: false, // TODO: force support
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
..Default::default()
},