feat:add mcp integration (#300)

* add list_buckets mcp server

* add list_objects mcp

* add upload object mcp

* add get object mcp

* add list_buckets mcp server

* fix: resolve clippy warnings in rustfs-mcp-server

* fix: rename mcp package

* fix

* fix:remove useless comment

* feat:add mcp doc
This commit is contained in:
wangsl
2025-07-30 14:25:01 +08:00
committed by GitHub
parent d56cee26db
commit 4f1770d3fe
8 changed files with 2147 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ members = [
"crates/workers", # Worker thread pools and task scheduling
"crates/zip", # ZIP file handling and compression
"crates/ahm",
"crates/mcp", # MCP server for S3 operations
]
resolver = "2"
@@ -87,6 +88,7 @@ rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" }
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
rustfs-checksums = { path = "crates/checksums", version = "0.0.5" }
rustfs-workers = { path = "crates/workers", version = "0.0.5" }
rustfs-mcp = { path = "crates/mcp", version = "0.0.5" }
aes-gcm = { version = "0.10.3", features = ["std"] }
arc-swap = "1.7.1"
argon2 = { version = "0.5.3", features = ["std"] }

70
crates/mcp/Cargo.toml Normal file
View File

@@ -0,0 +1,70 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-mcp"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
homepage.workspace = true
description = "RustFS MCP (Model Context Protocol) Server"
keywords = ["mcp", "s3", "aws", "rustfs", "server"]
categories = ["development-tools", "web-programming"]
[[bin]]
name = "rustfs-mcp"
path = "src/main.rs"
[dependencies]
# AWS SDK for S3 operations
aws-sdk-s3.workspace = true
# Async runtime and utilities
tokio = { workspace = true, features = ["io-std", "io-util", "macros", "signal"] }
# MCP SDK with macros support
rmcp = { version = "0.3.0", features = ["server", "transport-io", "macros"] }
# Command line argument parsing
clap = { workspace = true, features = ["derive", "env"] }
# Serialization (still needed for S3 data structures)
serde.workspace = true
serde_json.workspace = true
schemars = "1.0"
# Error handling
anyhow.workspace = true
thiserror.workspace = true
# Logging
tracing.workspace = true
tracing-subscriber.workspace = true
# File handling and MIME type detection
mime_guess = "2.0"
tokio-util = { version = "0.7", features = ["io"] }
futures-util = "0.3"
# Async trait support for trait abstractions
async-trait = "0.1"
[dev-dependencies]
# Testing framework and utilities
mockall = "0.13"
tempfile = "3.12"
tokio-test = "0.4"
test-case = "3.3"

184
crates/mcp/README.md Normal file
View File

@@ -0,0 +1,184 @@
[![RustFS](https://rustfs.com/images/rustfs-github.png)](https://rustfs.com)
# RustFS MCP Server - Model Context Protocol
<p align="center">
<strong>High-performance MCP server providing S3-compatible object storage operations for AI/LLM integration</strong>
</p>
<p align="center">
<a href="https://github.com/rustfs/rustfs/actions/workflows/ci.yml"><img alt="CI" src="https://github.com/rustfs/rustfs/actions/workflows/ci.yml/badge.svg" /></a>
<a href="https://docs.rustfs.com/en/">📖 Documentation</a>
<a href="https://github.com/rustfs/rustfs/issues">🐛 Bug Reports</a>
<a href="https://github.com/rustfs/rustfs/discussions">💬 Discussions</a>
</p>
---
## 📖 Overview
**RustFS MCP Server** is a high-performance [Model Context Protocol (MCP)](https://spec.modelcontextprotocol.org) server that provides AI/LLM tools with seamless access to S3-compatible object storage operations. Built with Rust for maximum performance and safety, it enables AI assistants like Claude Desktop to interact with cloud storage through a standardized protocol.
### What is MCP?
The Model Context Protocol is an open standard that enables secure, controlled connections between AI applications and external systems. This server acts as a bridge between AI tools and S3-compatible storage services, providing structured access to file operations while maintaining security and observability.
## ✨ Features
### Supported S3 Operations
- **List Buckets**: List all accessible S3 buckets
- **List Objects**: Browse bucket contents with optional prefix filtering
- **Upload Files**: Upload local files with automatic MIME type detection and cache control
- **Get Objects**: Retrieve objects from S3 storage with read or download modes
## 🔧 Installation
### Prerequisites
- Rust 1.70+ (for building from source)
- AWS credentials configured (via environment variables, AWS CLI, or IAM roles)
- Access to S3-compatible storage service
### Build from Source
```bash
# Clone the repository
git clone https://github.com/rustfs/rustfs.git
cd rustfs
# Build the MCP server
cargo build --release -p rustfs-mcp
# The binary will be available at
./target/release/rustfs-mcp
```
## ⚙️ Configuration
### Environment Variables
```bash
# AWS Credentials (required)
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_REGION=us-east-1 # Optional, defaults to us-east-1
# Optional: Custom S3 endpoint (for MinIO, etc.)
export AWS_ENDPOINT_URL=http://localhost:9000
# Logging level (optional)
export RUST_LOG=info
```
### Command Line Options
```bash
rustfs-mcp --help
```
The server supports various command-line options for customizing behavior:
- `--access-key-id`: AWS Access Key ID for S3 authentication
- `--secret-access-key`: AWS Secret Access Key for S3 authentication
- `--region`: AWS region to use for S3 operations (default: us-east-1)
- `--endpoint-url`: Custom S3 endpoint URL (for MinIO, LocalStack, etc.)
- `--log-level`: Log level configuration (default: rustfs_mcp_server=info)
## 🚀 Usage
### Starting the Server
```bash
# Start the MCP server
rustfs-mcp
# Or with custom options
rustfs-mcp --log-level debug --region us-west-2
```
### Integration with chat client
#### Option 1: Using Command Line Arguments
```json
{
"mcpServers": {
"rustfs-mcp": {
"command": "/path/to/rustfs-mcp",
"args": [
"--access-key-id", "your_access_key",
"--secret-access-key", "your_secret_key",
"--region", "us-west-2",
"--log-level", "info"
]
}
}
}
```
#### Option 2: Using Environment Variables
```json
{
"mcpServers": {
"rustfs-mcp": {
"command": "/path/to/rustfs-mcp",
"env": {
"AWS_ACCESS_KEY_ID": "your_access_key",
"AWS_SECRET_ACCESS_KEY": "your_secret_key",
"AWS_REGION": "us-east-1"
}
}
}
}
```
## 🛠️ Available Tools
The MCP server exposes the following tools that AI assistants can use:
### `list_buckets`
List all S3 buckets accessible with the configured credentials.
**Parameters:** None
### `list_objects`
List objects in an S3 bucket with optional prefix filtering.
**Parameters:**
- `bucket_name` (string): Name of the S3 bucket
- `prefix` (string, optional): Prefix to filter objects
### `upload_file`
Upload a local file to S3 with automatic MIME type detection.
**Parameters:**
- `local_file_path` (string): Path to the local file
- `bucket_name` (string): Target S3 bucket
- `object_key` (string): S3 object key (destination path)
- `content_type` (string, optional): Content type (auto-detected if not provided)
- `storage_class` (string, optional): S3 storage class
- `cache_control` (string, optional): Cache control header
### `get_object`
Retrieve an object from S3 with two operation modes: read content directly or download to a file.
**Parameters:**
- `bucket_name` (string): Source S3 bucket
- `object_key` (string): S3 object key
- `version_id` (string, optional): Version ID for versioned objects
- `mode` (string, optional): Operation mode - "read" (default) returns content directly, "download" saves to local file
- `local_path` (string, optional): Local file path (required when mode is "download")
- `max_content_size` (number, optional): Maximum content size in bytes for read mode (default: 1MB)
## Architecture
The MCP server is built with a modular architecture:
```
rustfs-mcp/
├── src/
│ ├── main.rs # Entry point, CLI parsing, and server initialization
│ ├── server.rs # MCP server implementation and tool handlers
│ ├── s3_client.rs # S3 client wrapper with async operations
│ ├── config.rs # Configuration management and CLI options
│ └── lib.rs # Library exports and public API
└── Cargo.toml # Dependencies, metadata, and binary configuration
```

224
crates/mcp/src/config.rs Normal file
View File

@@ -0,0 +1,224 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Result;
use clap::Parser;
use tracing::info;
/// Configuration for RustFS MCP Server
#[derive(Parser, Debug, Clone)]
#[command(
name = "rustfs-mcp-server",
about = "RustFS MCP (Model Context Protocol) Server for S3 operations",
version,
long_about = r#"
RustFS MCP Server - Model Context Protocol server for S3 operations
This server provides S3 operations through the Model Context Protocol (MCP),
allowing AI assistants to interact with S3-compatible storage systems.
ENVIRONMENT VARIABLES:
All command-line options can also be set via environment variables.
Command-line arguments take precedence over environment variables.
EXAMPLES:
# Using command-line arguments
rustfs-mcp-server --access-key-id your_key --secret-access-key your_secret
# Using environment variables
export AWS_ACCESS_KEY_ID=your_key
export AWS_SECRET_ACCESS_KEY=your_secret
rustfs-mcp-server
# Mixed usage (command-line overrides environment)
export AWS_REGION=us-east-1
rustfs-mcp-server --access-key-id mykey --secret-access-key mysecret --endpoint-url http://localhost:9000
"#
)]
pub struct Config {
/// AWS Access Key ID
#[arg(
long = "access-key-id",
env = "AWS_ACCESS_KEY_ID",
help = "AWS Access Key ID for S3 authentication"
)]
pub access_key_id: Option<String>,
/// AWS Secret Access Key
#[arg(
long = "secret-access-key",
env = "AWS_SECRET_ACCESS_KEY",
help = "AWS Secret Access Key for S3 authentication"
)]
pub secret_access_key: Option<String>,
/// AWS Region
#[arg(
long = "region",
env = "AWS_REGION",
default_value = "us-east-1",
help = "AWS region to use for S3 operations"
)]
pub region: String,
/// Custom S3 endpoint URL
#[arg(
long = "endpoint-url",
env = "AWS_ENDPOINT_URL",
help = "Custom S3 endpoint URL (for MinIO, LocalStack, etc.)"
)]
pub endpoint_url: Option<String>,
/// Log level
#[arg(
long = "log-level",
env = "RUST_LOG",
default_value = "rustfs_mcp_server=info",
help = "Log level configuration"
)]
pub log_level: String,
/// Force path-style addressing
#[arg(
long = "force-path-style",
help = "Force path-style S3 addressing (automatically enabled for custom endpoints)"
)]
pub force_path_style: bool,
}
impl Config {
pub fn new() -> Self {
Config::parse()
}
pub fn validate(&self) -> Result<()> {
if self.access_key_id.is_none() {
anyhow::bail!("AWS Access Key ID is required. Set via --access-key-id or AWS_ACCESS_KEY_ID environment variable");
}
if self.secret_access_key.is_none() {
anyhow::bail!(
"AWS Secret Access Key is required. Set via --secret-access-key or AWS_SECRET_ACCESS_KEY environment variable"
);
}
Ok(())
}
pub fn access_key_id(&self) -> &str {
self.access_key_id.as_ref().expect("Access key ID should be validated")
}
pub fn secret_access_key(&self) -> &str {
self.secret_access_key
.as_ref()
.expect("Secret access key should be validated")
}
pub fn log_configuration(&self) {
let access_key_display = self
.access_key_id
.as_ref()
.map(|key| {
if key.len() > 8 {
format!("{}...{}", &key[..4], &key[key.len() - 4..])
} else {
"*".repeat(key.len())
}
})
.unwrap_or_else(|| "Not set".to_string());
let endpoint_display = self
.endpoint_url
.as_ref()
.map(|url| format!("Custom endpoint: {url}"))
.unwrap_or_else(|| "Default AWS endpoints".to_string());
info!("Configuration:");
info!(" AWS Region: {}", self.region);
info!(" AWS Access Key ID: {}", access_key_display);
info!(" AWS Secret Access Key: [HIDDEN]");
info!(" S3 Endpoint: {}", endpoint_display);
info!(" Force Path Style: {}", self.force_path_style);
info!(" Log Level: {}", self.log_level);
}
}
impl Default for Config {
fn default() -> Self {
Config {
access_key_id: None,
secret_access_key: None,
region: "us-east-1".to_string(),
endpoint_url: None,
log_level: "rustfs_mcp_server=info".to_string(),
force_path_style: false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_validation_success() {
let config = Config {
access_key_id: Some("test_key".to_string()),
secret_access_key: Some("test_secret".to_string()),
..Config::default()
};
assert!(config.validate().is_ok());
assert_eq!(config.access_key_id(), "test_key");
assert_eq!(config.secret_access_key(), "test_secret");
}
#[test]
fn test_config_validation_missing_access_key() {
let config = Config {
access_key_id: None,
secret_access_key: Some("test_secret".to_string()),
..Config::default()
};
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Access Key ID"));
}
#[test]
fn test_config_validation_missing_secret_key() {
let config = Config {
access_key_id: Some("test_key".to_string()),
secret_access_key: None,
..Config::default()
};
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Secret Access Key"));
}
#[test]
fn test_config_default() {
let config = Config::default();
assert_eq!(config.region, "us-east-1");
assert_eq!(config.log_level, "rustfs_mcp_server=info");
assert!(!config.force_path_style);
assert!(config.access_key_id.is_none());
assert!(config.secret_access_key.is_none());
assert!(config.endpoint_url.is_none());
}
}

97
crates/mcp/src/lib.rs Normal file
View File

@@ -0,0 +1,97 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod config;
pub mod s3_client;
pub mod server;
pub use config::Config;
pub use s3_client::{BucketInfo, S3Client};
pub use server::RustfsMcpServer;
use anyhow::{Context, Result};
use rmcp::ServiceExt;
use tokio::io::{stdin, stdout};
use tracing::info;
/// Run the MCP server with the provided configuration
pub async fn run_server_with_config(config: Config) -> Result<()> {
info!("Starting RustFS MCP Server with provided configuration");
config.validate().context("Configuration validation failed")?;
let server = RustfsMcpServer::new(config).await?;
info!("Running MCP server with stdio transport");
// Run the server with stdio
server
.serve((stdin(), stdout()))
.await
.context("Failed to serve MCP server")?
.waiting()
.await
.context("Error while waiting for server shutdown")?;
Ok(())
}
/// Run the MCP server with default configuration (from environment variables)
pub async fn run_server() -> Result<()> {
info!("Starting RustFS MCP Server with default configuration");
let config = Config::default();
run_server_with_config(config).await
}
/// Validate environment configuration (legacy function for backward compatibility)
pub fn validate_environment() -> Result<()> {
use std::env;
if env::var("AWS_ACCESS_KEY_ID").is_err() {
anyhow::bail!("AWS_ACCESS_KEY_ID environment variable is required");
}
if env::var("AWS_SECRET_ACCESS_KEY").is_err() {
anyhow::bail!("AWS_SECRET_ACCESS_KEY environment variable is required");
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_creation() {
let config = Config {
access_key_id: Some("test_key".to_string()),
secret_access_key: Some("test_secret".to_string()),
..Config::default()
};
assert!(config.validate().is_ok());
assert_eq!(config.access_key_id(), "test_key");
assert_eq!(config.secret_access_key(), "test_secret");
}
#[tokio::test]
async fn test_run_server_with_invalid_config() {
let config = Config::default();
let result = run_server_with_config(config).await;
assert!(result.is_err());
}
}

104
crates/mcp/src/main.rs Normal file
View File

@@ -0,0 +1,104 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::{Context, Result};
use clap::Parser;
use rmcp::ServiceExt;
use rustfs_mcp::{Config, RustfsMcpServer};
use std::env;
use tokio::io::{stdin, stdout};
use tracing::{Level, error, info};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::parse();
init_tracing(&config)?;
info!("Starting RustFS MCP Server v{}", env!("CARGO_PKG_VERSION"));
if let Err(e) = config.validate() {
error!("Configuration validation failed: {}", e);
print_usage_help();
std::process::exit(1);
}
config.log_configuration();
if let Err(e) = run_server(config).await {
error!("Server error: {}", e);
std::process::exit(1);
}
info!("RustFS MCP Server shutdown complete");
Ok(())
}
async fn run_server(config: Config) -> Result<()> {
info!("Initializing RustFS MCP Server");
let server = RustfsMcpServer::new(config).await?;
info!("Starting MCP server with stdio transport");
server
.serve((stdin(), stdout()))
.await
.context("Failed to serve MCP server")?
.waiting()
.await
.context("Error while waiting for server shutdown")?;
Ok(())
}
fn init_tracing(config: &Config) -> Result<()> {
let filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(&config.log_level))
.context("Failed to create log filter")?;
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.with_env_filter(filter)
.with_target(false)
.with_thread_ids(false)
.with_thread_names(false)
.with_writer(std::io::stderr) // Force logs to stderr to avoid interfering with MCP protocol on stdout
.finish();
tracing::subscriber::set_global_default(subscriber).context("Failed to set global tracing subscriber")?;
Ok(())
}
fn print_usage_help() {
eprintln!();
eprintln!("RustFS MCP Server - Model Context Protocol server for S3 operations");
eprintln!();
eprintln!("For more help, run: rustfs-mcp --help");
eprintln!();
eprintln!("QUICK START:");
eprintln!(" # Using command-line arguments");
eprintln!(" rustfs-mcp --access-key-id YOUR_KEY --secret-access-key YOUR_SECRET");
eprintln!();
eprintln!(" # Using environment variables");
eprintln!(" export AWS_ACCESS_KEY_ID=YOUR_KEY");
eprintln!(" export AWS_SECRET_ACCESS_KEY=YOUR_SECRET");
eprintln!(" rustfs-mcp");
eprintln!();
eprintln!(" # For local development with RustFS");
eprintln!(" rustfs-mcp --access-key-id minioadmin --secret-access-key minioadmin --endpoint-url http://localhost:9000");
eprintln!();
}

796
crates/mcp/src/s3_client.rs Normal file
View File

@@ -0,0 +1,796 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::{Context, Result};
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{Client, Config as S3Config};
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info};
use crate::config::Config;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketInfo {
pub name: String,
pub creation_date: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectInfo {
pub key: String,
pub size: Option<i64>,
pub last_modified: Option<String>,
pub etag: Option<String>,
pub storage_class: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct ListObjectsOptions {
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub max_keys: Option<i32>,
pub continuation_token: Option<String>,
pub start_after: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListObjectsResult {
pub objects: Vec<ObjectInfo>,
pub common_prefixes: Vec<String>,
pub is_truncated: bool,
pub next_continuation_token: Option<String>,
pub max_keys: Option<i32>,
pub key_count: i32,
}
#[derive(Debug, Clone, Default)]
pub struct UploadFileOptions {
pub content_type: Option<String>,
pub metadata: Option<std::collections::HashMap<String, String>>,
pub storage_class: Option<String>,
pub server_side_encryption: Option<String>,
pub cache_control: Option<String>,
pub content_disposition: Option<String>,
pub content_encoding: Option<String>,
pub content_language: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct GetObjectOptions {
pub version_id: Option<String>,
pub range: Option<String>,
pub if_modified_since: Option<String>,
pub if_unmodified_since: Option<String>,
pub max_content_size: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DetectedFileType {
Text,
NonText(String), // mime type for non-text files
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetObjectResult {
pub bucket: String,
pub key: String,
pub content_type: String,
pub content_length: u64,
pub last_modified: Option<String>,
pub etag: Option<String>,
pub version_id: Option<String>,
pub detected_type: DetectedFileType,
pub content: Option<Vec<u8>>, // Raw content bytes
pub text_content: Option<String>, // UTF-8 decoded content for text files
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UploadResult {
pub bucket: String,
pub key: String,
pub etag: String,
pub location: String,
pub version_id: Option<String>,
pub file_size: u64,
pub content_type: String,
pub upload_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct S3Client {
client: Client,
}
impl S3Client {
pub async fn new(config: &Config) -> Result<Self> {
info!("Initializing S3 client from configuration");
let access_key = config.access_key_id();
let secret_key = config.secret_access_key();
debug!("Using AWS region: {}", config.region);
if let Some(ref endpoint) = config.endpoint_url {
debug!("Using custom endpoint: {}", endpoint);
}
let credentials = Credentials::new(access_key, secret_key, None, None, "rustfs-mcp-server");
let mut config_builder = S3Config::builder()
.credentials_provider(credentials)
.region(Region::new(config.region.clone()))
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest());
// Set force path style if custom endpoint or explicitly requested
let should_force_path_style = config.endpoint_url.is_some() || config.force_path_style;
if should_force_path_style {
config_builder = config_builder.force_path_style(true);
}
if let Some(endpoint) = &config.endpoint_url {
config_builder = config_builder.endpoint_url(endpoint);
}
let s3_config = config_builder.build();
let client = Client::from_conf(s3_config);
info!("S3 client initialized successfully");
Ok(Self { client })
}
pub async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
debug!("Listing S3 buckets");
let response = self.client.list_buckets().send().await.context("Failed to list S3 buckets")?;
let buckets: Vec<BucketInfo> = response
.buckets()
.iter()
.map(|bucket| {
let name = bucket.name().unwrap_or("unknown").to_string();
let creation_date = bucket
.creation_date()
.map(|dt| dt.fmt(aws_sdk_s3::primitives::DateTimeFormat::DateTime).unwrap());
BucketInfo { name, creation_date }
})
.collect();
debug!("Found {} buckets", buckets.len());
Ok(buckets)
}
pub async fn list_objects_v2(&self, bucket_name: &str, options: ListObjectsOptions) -> Result<ListObjectsResult> {
debug!("Listing objects in bucket '{}' with options: {:?}", bucket_name, options);
let mut request = self.client.list_objects_v2().bucket(bucket_name);
if let Some(prefix) = options.prefix {
request = request.prefix(prefix);
}
if let Some(delimiter) = options.delimiter {
request = request.delimiter(delimiter);
}
if let Some(max_keys) = options.max_keys {
request = request.max_keys(max_keys);
}
if let Some(continuation_token) = options.continuation_token {
request = request.continuation_token(continuation_token);
}
if let Some(start_after) = options.start_after {
request = request.start_after(start_after);
}
let response = request
.send()
.await
.context(format!("Failed to list objects in bucket '{bucket_name}'"))?;
let objects: Vec<ObjectInfo> = response
.contents()
.iter()
.map(|obj| {
let key = obj.key().unwrap_or("unknown").to_string();
let size = obj.size();
let last_modified = obj
.last_modified()
.map(|dt| dt.fmt(aws_sdk_s3::primitives::DateTimeFormat::DateTime).unwrap());
let etag = obj.e_tag().map(|e| e.to_string());
let storage_class = obj.storage_class().map(|sc| sc.as_str().to_string());
ObjectInfo {
key,
size,
last_modified,
etag,
storage_class,
}
})
.collect();
let common_prefixes: Vec<String> = response
.common_prefixes()
.iter()
.filter_map(|cp| cp.prefix())
.map(|p| p.to_string())
.collect();
let result = ListObjectsResult {
objects,
common_prefixes,
is_truncated: response.is_truncated().unwrap_or(false),
next_continuation_token: response.next_continuation_token().map(|t| t.to_string()),
max_keys: response.max_keys(),
key_count: response.key_count().unwrap_or(0),
};
debug!(
"Found {} objects and {} common prefixes in bucket '{}'",
result.objects.len(),
result.common_prefixes.len(),
bucket_name
);
Ok(result)
}
pub async fn upload_file(
&self,
local_path: &str,
bucket_name: &str,
object_key: &str,
options: UploadFileOptions,
) -> Result<UploadResult> {
info!("Starting file upload: '{}' -> s3://{}/{}", local_path, bucket_name, object_key);
let path = Path::new(local_path);
let canonical_path = path
.canonicalize()
.context(format!("Failed to resolve file path: {local_path}"))?;
if !canonical_path.exists() {
anyhow::bail!("File does not exist: {}", local_path);
}
if !canonical_path.is_file() {
anyhow::bail!("Path is not a file: {}", local_path);
}
let metadata = tokio::fs::metadata(&canonical_path)
.await
.context(format!("Failed to read file metadata: {local_path}"))?;
let file_size = metadata.len();
debug!("File size: {file_size} bytes");
let content_type = options.content_type.unwrap_or_else(|| {
let detected = mime_guess::from_path(&canonical_path).first_or_octet_stream().to_string();
debug!("Auto-detected content type: {detected}");
detected
});
let file_content = tokio::fs::read(&canonical_path)
.await
.context(format!("Failed to read file content: {local_path}"))?;
let byte_stream = ByteStream::from(file_content);
let mut request = self
.client
.put_object()
.bucket(bucket_name)
.key(object_key)
.body(byte_stream)
.content_type(&content_type)
.content_length(file_size as i64);
if let Some(storage_class) = &options.storage_class {
request = request.storage_class(storage_class.as_str().into());
}
if let Some(cache_control) = &options.cache_control {
request = request.cache_control(cache_control);
}
if let Some(content_disposition) = &options.content_disposition {
request = request.content_disposition(content_disposition);
}
if let Some(content_encoding) = &options.content_encoding {
request = request.content_encoding(content_encoding);
}
if let Some(content_language) = &options.content_language {
request = request.content_language(content_language);
}
if let Some(sse) = &options.server_side_encryption {
request = request.server_side_encryption(sse.as_str().into());
}
if let Some(metadata_map) = &options.metadata {
for (key, value) in metadata_map {
request = request.metadata(key, value);
}
}
debug!("Executing S3 put_object request");
let response = request
.send()
.await
.context(format!("Failed to upload file to s3://{bucket_name}/{object_key}"))?;
let etag = response.e_tag().unwrap_or("unknown").to_string();
let version_id = response.version_id().map(|v| v.to_string());
let location = format!("s3://{bucket_name}/{object_key}");
let upload_result = UploadResult {
bucket: bucket_name.to_string(),
key: object_key.to_string(),
etag,
location,
version_id,
file_size,
content_type,
upload_id: None,
};
info!(
"File upload completed successfully: {} bytes uploaded to s3://{}/{}",
file_size, bucket_name, object_key
);
Ok(upload_result)
}
pub async fn get_object(&self, bucket_name: &str, object_key: &str, options: GetObjectOptions) -> Result<GetObjectResult> {
info!("Getting object: s3://{}/{}", bucket_name, object_key);
let mut request = self.client.get_object().bucket(bucket_name).key(object_key);
if let Some(version_id) = &options.version_id {
request = request.version_id(version_id);
}
if let Some(range) = &options.range {
request = request.range(range);
}
if let Some(if_modified_since) = &options.if_modified_since {
request = request.if_modified_since(
aws_sdk_s3::primitives::DateTime::from_str(if_modified_since, aws_sdk_s3::primitives::DateTimeFormat::DateTime)
.context("Failed to parse if_modified_since date")?,
);
}
debug!("Executing S3 get_object request");
let response = request
.send()
.await
.context(format!("Failed to get object from s3://{bucket_name}/{object_key}"))?;
let content_type = response.content_type().unwrap_or("application/octet-stream").to_string();
let content_length = response.content_length().unwrap_or(0) as u64;
let last_modified = response
.last_modified()
.map(|dt| dt.fmt(aws_sdk_s3::primitives::DateTimeFormat::DateTime).unwrap());
let etag = response.e_tag().map(|e| e.to_string());
let version_id = response.version_id().map(|v| v.to_string());
let max_size = options.max_content_size.unwrap_or(10 * 1024 * 1024);
let mut content = Vec::new();
let mut byte_stream = response.body;
let mut total_read = 0;
while let Some(bytes_result) = byte_stream.try_next().await.context("Failed to read object content")? {
if total_read + bytes_result.len() > max_size {
anyhow::bail!("Object size exceeds maximum allowed size of {} bytes", max_size);
}
content.extend_from_slice(&bytes_result);
total_read += bytes_result.len();
}
debug!("Read {} bytes from object", content.len());
let detected_type = Self::detect_file_type(Some(&content_type), &content);
debug!("Detected file type: {detected_type:?}");
let text_content = match &detected_type {
DetectedFileType::Text => match std::str::from_utf8(&content) {
Ok(text) => Some(text.to_string()),
Err(_) => {
debug!("Failed to decode content as UTF-8, treating as binary");
None
}
},
_ => None,
};
let result = GetObjectResult {
bucket: bucket_name.to_string(),
key: object_key.to_string(),
content_type,
content_length,
last_modified,
etag,
version_id,
detected_type,
content: Some(content),
text_content,
};
info!(
"Object retrieved successfully: {} bytes from s3://{}/{}",
result.content_length, bucket_name, object_key
);
Ok(result)
}
fn detect_file_type(content_type: Option<&str>, content_bytes: &[u8]) -> DetectedFileType {
if let Some(ct) = content_type {
let ct_lower = ct.to_lowercase();
if ct_lower.starts_with("text/")
|| ct_lower == "application/json"
|| ct_lower == "application/xml"
|| ct_lower == "application/yaml"
|| ct_lower == "application/javascript"
|| ct_lower == "application/x-yaml"
|| ct_lower == "application/x-sh"
|| ct_lower == "application/x-shellscript"
|| ct_lower.contains("script")
|| ct_lower.contains("xml")
|| ct_lower.contains("json")
{
return DetectedFileType::Text;
}
return DetectedFileType::NonText(ct.to_string());
}
if content_bytes.len() >= 4 {
match &content_bytes[0..4] {
// PNG: 89 50 4E 47
[0x89, 0x50, 0x4E, 0x47] => return DetectedFileType::NonText("image/png".to_string()),
// JPEG: FF D8 FF
[0xFF, 0xD8, 0xFF, _] => return DetectedFileType::NonText("image/jpeg".to_string()),
// GIF: 47 49 46 38
[0x47, 0x49, 0x46, 0x38] => return DetectedFileType::NonText("image/gif".to_string()),
// BMP: 42 4D
[0x42, 0x4D, _, _] => return DetectedFileType::NonText("image/bmp".to_string()),
// RIFF container (WebP/WAV)
[0x52, 0x49, 0x46, 0x46] if content_bytes.len() >= 12 => {
if &content_bytes[8..12] == b"WEBP" {
return DetectedFileType::NonText("image/webp".to_string());
} else if &content_bytes[8..12] == b"WAVE" {
return DetectedFileType::NonText("audio/wav".to_string());
}
return DetectedFileType::NonText("application/octet-stream".to_string());
}
_ => {}
}
}
// 3. Check if content is valid UTF-8 text as fallback
if std::str::from_utf8(content_bytes).is_ok() {
// Additional heuristics for text detection
let non_printable_count = content_bytes
.iter()
.filter(|&&b| b < 0x20 && b != 0x09 && b != 0x0A && b != 0x0D) // Control chars except tab, LF, CR
.count();
let total_chars = content_bytes.len();
// If less than 5% are non-printable control characters, consider it text
if total_chars > 0 && (non_printable_count as f64 / total_chars as f64) < 0.05 {
return DetectedFileType::Text;
}
}
// Default to non-text binary
DetectedFileType::NonText("application/octet-stream".to_string())
}
pub async fn download_object_to_file(
&self,
bucket_name: &str,
object_key: &str,
local_path: &str,
options: GetObjectOptions,
) -> Result<(u64, String)> {
info!("Downloading object: s3://{}/{} -> {}", bucket_name, object_key, local_path);
let mut request = self.client.get_object().bucket(bucket_name).key(object_key);
if let Some(version_id) = &options.version_id {
request = request.version_id(version_id);
}
if let Some(range) = &options.range {
request = request.range(range);
}
if let Some(if_modified_since) = &options.if_modified_since {
request = request.if_modified_since(
aws_sdk_s3::primitives::DateTime::from_str(if_modified_since, aws_sdk_s3::primitives::DateTimeFormat::DateTime)
.context("Failed to parse if_modified_since date")?,
);
}
debug!("Executing S3 get_object request for download");
let response = request
.send()
.await
.context(format!("Failed to get object from s3://{bucket_name}/{object_key}"))?;
let local_file_path = Path::new(local_path);
if let Some(parent) = local_file_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.context(format!("Failed to create parent directories for {local_path}"))?;
}
let mut file = tokio::fs::File::create(local_file_path)
.await
.context(format!("Failed to create local file: {local_path}"))?;
let mut byte_stream = response.body;
let mut total_bytes = 0u64;
while let Some(bytes_result) = byte_stream.try_next().await.context("Failed to read object content")? {
file.write_all(&bytes_result)
.await
.context(format!("Failed to write to local file: {local_path}"))?;
total_bytes += bytes_result.len() as u64;
}
file.flush().await.context("Failed to flush file to disk")?;
let absolute_path = local_file_path
.canonicalize()
.unwrap_or_else(|_| local_file_path.to_path_buf())
.to_string_lossy()
.to_string();
info!(
"Object downloaded successfully: {} bytes from s3://{}/{} to {}",
total_bytes, bucket_name, object_key, absolute_path
);
Ok((total_bytes, absolute_path))
}
pub async fn health_check(&self) -> Result<()> {
debug!("Performing S3 health check");
self.client.list_buckets().send().await.context("S3 health check failed")?;
debug!("S3 health check passed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore] // Requires AWS credentials
async fn test_s3_client_creation() {
let config = Config {
access_key_id: Some("test_key".to_string()),
secret_access_key: Some("test_secret".to_string()),
region: "us-east-1".to_string(),
..Config::default()
};
let result = S3Client::new(&config).await;
assert!(result.is_ok());
}
#[test]
fn test_bucket_info_serialization() {
let bucket = BucketInfo {
name: "test-bucket".to_string(),
creation_date: Some("2024-01-01T00:00:00Z".to_string()),
};
let json = serde_json::to_string(&bucket).unwrap();
let deserialized: BucketInfo = serde_json::from_str(&json).unwrap();
assert_eq!(bucket.name, deserialized.name);
assert_eq!(bucket.creation_date, deserialized.creation_date);
}
#[test]
fn test_detect_file_type_text_content_type() {
let test_cases = vec![
("text/plain", "Hello world"),
("text/html", "<html></html>"),
("application/json", r#"{"key": "value"}"#),
("application/xml", "<xml></xml>"),
("application/yaml", "key: value"),
("application/javascript", "console.log('hello');"),
];
for (content_type, content) in test_cases {
let result = S3Client::detect_file_type(Some(content_type), content.as_bytes());
match result {
DetectedFileType::Text => {}
_ => panic!("Expected Text for content type {content_type}"),
}
}
}
#[test]
fn test_detect_file_type_non_text_content_type() {
// Test various non-text content types
let test_cases = vec![
("image/png", "image/png"),
("image/jpeg", "image/jpeg"),
("audio/mp3", "audio/mp3"),
("video/mp4", "video/mp4"),
("application/pdf", "application/pdf"),
];
for (content_type, expected_mime) in test_cases {
let result = S3Client::detect_file_type(Some(content_type), b"some content");
match result {
DetectedFileType::NonText(mime_type) => {
assert_eq!(mime_type, expected_mime);
}
_ => panic!("Expected NonText for content type {content_type}"),
}
}
}
#[test]
fn test_detect_file_type_magic_bytes_simplified() {
// Test magic bytes detection (now all return NonText)
let test_cases = vec![
// PNG magic bytes: 89 50 4E 47
(vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A], "image/png"),
// JPEG magic bytes: FF D8 FF
(vec![0xFF, 0xD8, 0xFF, 0xE0], "image/jpeg"),
// GIF magic bytes: 47 49 46 38
(vec![0x47, 0x49, 0x46, 0x38, 0x37, 0x61], "image/gif"),
];
for (content, expected_mime) in test_cases {
let result = S3Client::detect_file_type(None, &content);
match result {
DetectedFileType::NonText(mime_type) => {
assert_eq!(mime_type, expected_mime);
}
_ => panic!("Expected NonText for magic bytes: {content:?}"),
}
}
}
#[test]
fn test_detect_file_type_webp_magic_bytes() {
// WebP has more complex magic bytes: RIFF....WEBP
let mut webp_content = vec![0x52, 0x49, 0x46, 0x46]; // RIFF
webp_content.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // Size (4 bytes)
webp_content.extend_from_slice(b"WEBP"); // WEBP signature
let result = S3Client::detect_file_type(None, &webp_content);
match result {
DetectedFileType::NonText(mime_type) => {
assert_eq!(mime_type, "image/webp");
}
_ => panic!("Expected WebP NonText detection"),
}
}
#[test]
fn test_detect_file_type_wav_magic_bytes() {
// WAV has magic bytes: RIFF....WAVE
let mut wav_content = vec![0x52, 0x49, 0x46, 0x46]; // RIFF
wav_content.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // Size (4 bytes)
wav_content.extend_from_slice(b"WAVE"); // WAVE signature
let result = S3Client::detect_file_type(None, &wav_content);
match result {
DetectedFileType::NonText(mime_type) => {
assert_eq!(mime_type, "audio/wav");
}
_ => panic!("Expected WAV NonText detection"),
}
}
#[test]
fn test_detect_file_type_utf8_text() {
// Test UTF-8 text detection
let utf8_content = "Hello, 世界! 🌍".as_bytes();
let result = S3Client::detect_file_type(None, utf8_content);
match result {
DetectedFileType::Text => {}
_ => panic!("Expected Text for UTF-8 content"),
}
// Test ASCII text
let ascii_content = b"Hello, world! This is ASCII text.";
let result = S3Client::detect_file_type(None, ascii_content);
match result {
DetectedFileType::Text => {}
_ => panic!("Expected Text for ASCII content"),
}
}
#[test]
fn test_detect_file_type_binary() {
// Test binary content that should not be detected as text
let binary_content = vec![0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE, 0xFD, 0xFC];
let result = S3Client::detect_file_type(None, &binary_content);
match result {
DetectedFileType::NonText(mime_type) => {
assert_eq!(mime_type, "application/octet-stream");
}
_ => panic!("Expected NonText for binary content"),
}
}
#[test]
fn test_detect_file_type_priority() {
// Content-Type should take priority over magic bytes
let png_magic_bytes = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
// Even with PNG magic bytes, text content-type should win
let result = S3Client::detect_file_type(Some("text/plain"), &png_magic_bytes);
match result {
DetectedFileType::Text => {}
_ => panic!("Expected Text due to content-type priority"),
}
}
#[test]
fn test_get_object_options_default() {
let options = GetObjectOptions::default();
assert!(options.version_id.is_none());
assert!(options.range.is_none());
assert!(options.if_modified_since.is_none());
assert!(options.if_unmodified_since.is_none());
assert!(options.max_content_size.is_none());
}
#[test]
fn test_detected_file_type_serialization() {
let test_cases = vec![
DetectedFileType::Text,
DetectedFileType::NonText("image/png".to_string()),
DetectedFileType::NonText("audio/mpeg".to_string()),
DetectedFileType::NonText("application/octet-stream".to_string()),
];
for file_type in test_cases {
let json = serde_json::to_string(&file_type).unwrap();
let deserialized: DetectedFileType = serde_json::from_str(&json).unwrap();
match (&file_type, &deserialized) {
(DetectedFileType::Text, DetectedFileType::Text) => {}
(DetectedFileType::NonText(a), DetectedFileType::NonText(b)) => assert_eq!(a, b),
_ => panic!("Serialization/deserialization mismatch"),
}
}
}
}

670
crates/mcp/src/server.rs Normal file
View File

@@ -0,0 +1,670 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Result;
use rmcp::{
ErrorData, RoleServer, ServerHandler,
handler::server::{router::tool::ToolRouter, tool::Parameters},
model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, ToolsCapability},
service::{NotificationContext, RequestContext},
tool, tool_handler, tool_router,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info};
use crate::config::Config;
use crate::s3_client::{DetectedFileType, GetObjectOptions, ListObjectsOptions, S3Client, UploadFileOptions};
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct ListObjectsRequest {
pub bucket_name: String,
#[serde(default)]
#[schemars(description = "Optional prefix to filter objects")]
pub prefix: Option<String>,
}
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct UploadFileRequest {
#[schemars(description = "Path to the local file to upload")]
pub local_file_path: String,
#[schemars(description = "Name of the S3 bucket to upload to")]
pub bucket_name: String,
#[schemars(description = "S3 object key (path/filename in the bucket)")]
pub object_key: String,
#[serde(default)]
#[schemars(description = "Optional content type (auto-detected if not specified)")]
pub content_type: Option<String>,
#[serde(default)]
#[schemars(description = "Optional storage class (STANDARD, REDUCED_REDUNDANCY, etc.)")]
pub storage_class: Option<String>,
#[serde(default)]
#[schemars(description = "Optional cache control header")]
pub cache_control: Option<String>,
}
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct GetObjectRequest {
#[schemars(description = "Name of the S3 bucket")]
pub bucket_name: String,
#[schemars(description = "S3 object key (path/filename in the bucket)")]
pub object_key: String,
#[serde(default)]
#[schemars(description = "Optional version ID for versioned objects")]
pub version_id: Option<String>,
#[serde(default = "default_operation_mode")]
#[schemars(description = "Operation mode: read (return content) or download (save to local file)")]
pub mode: GetObjectMode,
#[serde(default)]
#[schemars(description = "Local file path for download mode (required when mode is download)")]
pub local_path: Option<String>,
#[serde(default = "default_max_content_size")]
#[schemars(description = "Maximum content size to read in bytes for read mode (default: 1MB)")]
pub max_content_size: usize,
}
#[derive(Serialize, Deserialize, JsonSchema, Debug, Clone, PartialEq)]
pub enum GetObjectMode {
#[serde(rename = "read")]
Read,
#[serde(rename = "download")]
Download,
}
fn default_operation_mode() -> GetObjectMode {
GetObjectMode::Read
}
fn default_max_content_size() -> usize {
1024 * 1024
}
#[derive(Debug, Clone)]
pub struct RustfsMcpServer {
s3_client: S3Client,
_config: Config,
tool_router: ToolRouter<Self>,
}
#[tool_router(router = tool_router)]
impl RustfsMcpServer {
pub async fn new(config: Config) -> Result<Self> {
info!("Creating RustFS MCP Server");
let s3_client = S3Client::new(&config).await?;
Ok(Self {
s3_client,
_config: config,
tool_router: Self::tool_router(),
})
}
#[tool(description = "List all S3 buckets accessible with the configured credentials")]
pub async fn list_buckets(&self) -> String {
info!("Executing list_buckets tool");
match self.s3_client.list_buckets().await {
Ok(buckets) => {
debug!("Successfully retrieved {} buckets", buckets.len());
if buckets.is_empty() {
return "No S3 buckets found. The AWS credentials may not have access to any buckets, or no buckets exist in this account.".to_string();
}
let mut result_text = format!("Found {} S3 bucket(s):\n\n", buckets.len());
for (index, bucket) in buckets.iter().enumerate() {
result_text.push_str(&format!("{}. **{}**", index + 1, bucket.name));
if let Some(ref creation_date) = bucket.creation_date {
result_text.push_str(&format!("\n - Created: {creation_date}"));
}
result_text.push_str("\n\n");
}
result_text.push_str("---\n");
result_text.push_str(&format!("Total buckets: {}\n", buckets.len()));
result_text.push_str("Note: Only buckets accessible with the current AWS credentials are shown.");
info!("list_buckets tool executed successfully");
result_text
}
Err(e) => {
error!("Failed to list buckets: {:?}", e);
format!(
"Failed to list S3 buckets: {e}\n\nPossible causes:\n\
• AWS credentials are not set or invalid\n\
• Network connectivity issues\n\
• AWS region is not set correctly\n\
• Insufficient permissions to list buckets\n\
• Custom endpoint is misconfigured\n\n\
Please verify your AWS configuration and try again."
)
}
}
}
#[tool(description = "List objects in a specific S3 bucket with optional prefix filtering")]
pub async fn list_objects(&self, Parameters(req): Parameters<ListObjectsRequest>) -> String {
info!("Executing list_objects tool for bucket: {}", req.bucket_name);
let options = ListObjectsOptions {
prefix: req.prefix.clone(),
delimiter: None,
max_keys: Some(1000),
..ListObjectsOptions::default()
};
match self.s3_client.list_objects_v2(&req.bucket_name, options).await {
Ok(result) => {
debug!(
"Successfully retrieved {} objects and {} common prefixes from bucket '{}'",
result.objects.len(),
result.common_prefixes.len(),
req.bucket_name
);
if result.objects.is_empty() && result.common_prefixes.is_empty() {
let prefix_msg = req.prefix.as_ref().map(|p| format!(" with prefix '{p}'")).unwrap_or_default();
return format!(
"No objects found in bucket '{}'{prefix_msg}. The bucket may be empty or the prefix may not match any objects.",
req.bucket_name
);
}
let mut result_text = format!("Found {} object(s) in bucket **{}**", result.key_count, req.bucket_name);
if let Some(ref p) = req.prefix {
result_text.push_str(&format!(" with prefix '{p}'"));
}
result_text.push_str(":\n\n");
if !result.common_prefixes.is_empty() {
result_text.push_str("**Directories:**\n");
for (index, prefix) in result.common_prefixes.iter().enumerate() {
result_text.push_str(&format!("{}. 📁 {prefix}\n", index + 1));
}
result_text.push('\n');
}
if !result.objects.is_empty() {
result_text.push_str("**Objects:**\n");
for (index, obj) in result.objects.iter().enumerate() {
result_text.push_str(&format!("{}. **{}**\n", index + 1, obj.key));
if let Some(size) = obj.size {
result_text.push_str(&format!(" - Size: {size} bytes\n"));
}
if let Some(ref last_modified) = obj.last_modified {
result_text.push_str(&format!(" - Last Modified: {last_modified}\n"));
}
if let Some(ref etag) = obj.etag {
result_text.push_str(&format!(" - ETag: {etag}\n"));
}
if let Some(ref storage_class) = obj.storage_class {
result_text.push_str(&format!(" - Storage Class: {storage_class}\n"));
}
result_text.push('\n');
}
}
if result.is_truncated {
result_text.push_str("**Note:** Results are truncated. ");
if let Some(ref token) = result.next_continuation_token {
result_text.push_str(&format!("Use continuation token '{token}' to get more results.\n"));
}
result_text.push('\n');
}
result_text.push_str("---\n");
result_text.push_str(&format!(
"Total: {} object(s), {} directory/ies",
result.objects.len(),
result.common_prefixes.len()
));
if let Some(max_keys) = result.max_keys {
result_text.push_str(&format!(", Max keys: {max_keys}"));
}
info!("list_objects tool executed successfully for bucket '{}'", req.bucket_name);
result_text
}
Err(e) => {
error!("Failed to list objects in bucket '{}': {:?}", req.bucket_name, e);
format!(
"Failed to list objects in S3 bucket '{}': {}\n\nPossible causes:\n\
• Bucket does not exist or is not accessible\n\
• AWS credentials lack permissions to list objects in this bucket\n\
• Network connectivity issues\n\
• Custom endpoint is misconfigured\n\
• Bucket name contains invalid characters\n\n\
Please verify the bucket name, your AWS configuration, and permissions.",
req.bucket_name, e
)
}
}
}
#[tool(
description = "Get/download an object from an S3 bucket - supports read mode for text files and download mode for all files"
)]
pub async fn get_object(&self, Parameters(req): Parameters<GetObjectRequest>) -> String {
info!(
"Executing get_object tool: s3://{}/{} (mode: {:?})",
req.bucket_name, req.object_key, req.mode
);
match req.mode {
GetObjectMode::Read => self.handle_read_mode(req).await,
GetObjectMode::Download => self.handle_download_mode(req).await,
}
}
async fn handle_read_mode(&self, req: GetObjectRequest) -> String {
let options = GetObjectOptions {
version_id: req.version_id.clone(),
max_content_size: Some(req.max_content_size),
..GetObjectOptions::default()
};
match self.s3_client.get_object(&req.bucket_name, &req.object_key, options).await {
Ok(result) => {
debug!(
"Successfully retrieved object s3://{}/{} ({} bytes)",
req.bucket_name, req.object_key, result.content_length
);
match result.detected_type {
DetectedFileType::Text => {
if let Some(ref text_content) = result.text_content {
format!(
"✅ **Text file content retrieved!**\n\n\
**S3 Location:** s3://{}/{}\n\
**File Size:** {} bytes\n\
**Content Type:** {}\n\n\
**Content:**\n```\n{}\n```",
result.bucket, result.key, result.content_length, result.content_type, text_content
)
} else {
format!(
"⚠️ **Text file detected but content could not be decoded!**\n\n\
**S3 Location:** s3://{}/{}\n\
**File Size:** {} bytes\n\
**Content Type:** {}\n\n\
**Note:** Could not decode file as UTF-8 text. \
Try using download mode instead.",
result.bucket, result.key, result.content_length, result.content_type
)
}
}
DetectedFileType::NonText(ref mime_type) => {
let file_category = if mime_type.starts_with("image/") {
"Image"
} else if mime_type.starts_with("audio/") {
"Audio"
} else if mime_type.starts_with("video/") {
"Video"
} else {
"Binary"
};
format!(
"⚠️ **Non-text file detected!**\n\n\
**S3 Location:** s3://{}/{}\n\
**File Type:** {} ({})\n\
**File Size:** {} bytes ({:.2} MB)\n\n\
**Note:** This file type cannot be displayed as text.\n\
Please use download mode to save it to a local file:\n\n\
```json\n{{\n \"mode\": \"download\",\n \"local_path\": \"/path/to/save/file\"\n}}\n```",
result.bucket,
result.key,
file_category,
mime_type,
result.content_length,
result.content_length as f64 / 1_048_576.0
)
}
}
}
Err(e) => {
error!("Failed to read object s3://{}/{}: {:?}", req.bucket_name, req.object_key, e);
self.format_error_message(&req, e)
}
}
}
async fn handle_download_mode(&self, req: GetObjectRequest) -> String {
let local_path = match req.local_path {
Some(ref path) => path,
None => {
return "❌ **Error:** local_path is required when using download mode.\n\n\
**Example:**\n```json\n{\n \"mode\": \"download\",\n \"local_path\": \"/path/to/save/file.ext\"\n}\n```"
.to_string();
}
};
let options = GetObjectOptions {
version_id: req.version_id.clone(),
..GetObjectOptions::default()
};
match self
.s3_client
.download_object_to_file(&req.bucket_name, &req.object_key, local_path, options)
.await
{
Ok((bytes_downloaded, absolute_path)) => {
info!(
"Successfully downloaded object s3://{}/{} to {} ({} bytes)",
req.bucket_name, req.object_key, absolute_path, bytes_downloaded
);
format!(
"✅ **File downloaded successfully!**\n\n\
**S3 Location:** s3://{}/{}\n\
**Local Path (requested):** {}\n\
**Absolute Path:** {}\n\
**File Size:** {} bytes ({:.2} MB)\n\n\
**✨ File saved successfully!** You can now access it at:\n\
`{}`",
req.bucket_name,
req.object_key,
local_path,
absolute_path,
bytes_downloaded,
bytes_downloaded as f64 / 1_048_576.0,
absolute_path
)
}
Err(e) => {
error!(
"Failed to download object s3://{}/{} to {}: {:?}",
req.bucket_name, req.object_key, local_path, e
);
format!(
"❌ **Failed to download file from S3**\n\n\
**S3 Location:** s3://{}/{}\n\
**Local Path:** {}\n\
**Error:** {}\n\n\
**Possible causes:**\n\
• Object does not exist in the specified bucket\n\
• AWS credentials lack permissions to read this object\n\
• Cannot write to the specified local path\n\
• Insufficient disk space\n\
• Network connectivity issues\n\n\
**Troubleshooting steps:**\n\
1. Verify the object exists using list_objects\n\
2. Check your AWS credentials and permissions\n\
3. Ensure the local directory exists and is writable\n\
4. Check available disk space",
req.bucket_name, req.object_key, local_path, e
)
}
}
}
fn format_error_message(&self, req: &GetObjectRequest, error: anyhow::Error) -> String {
format!(
"❌ **Failed to get object from S3 bucket '{}'**\n\n\
**Object Key:** {}\n\
**Mode:** {:?}\n\
**Error:** {}\n\n\
**Possible causes:**\n\
• Object does not exist in the specified bucket\n\
• AWS credentials lack permissions to read this object\n\
• Network connectivity issues\n\
• Object key contains invalid characters\n\
• Bucket does not exist or is not accessible\n\
• Object is in a different AWS region\n\
• Version ID is invalid (for versioned objects)\n\n\
**Troubleshooting steps:**\n\
1. Verify the object exists using list_objects\n\
2. Check your AWS credentials and permissions\n\
3. Ensure the bucket name and object key are correct\n\
4. Try with a different object to test connectivity\n\
5. Check if the bucket has versioning enabled",
req.bucket_name, req.object_key, req.mode, error
)
}
#[tool(description = "Upload a local file to an S3 bucket")]
pub async fn upload_file(&self, Parameters(req): Parameters<UploadFileRequest>) -> String {
info!(
"Executing upload_file tool: '{}' -> s3://{}/{}",
req.local_file_path, req.bucket_name, req.object_key
);
let options = UploadFileOptions {
content_type: req.content_type.clone(),
storage_class: req.storage_class.clone(),
cache_control: req.cache_control.clone(),
..UploadFileOptions::default()
};
match self
.s3_client
.upload_file(&req.local_file_path, &req.bucket_name, &req.object_key, options)
.await
{
Ok(result) => {
debug!(
"Successfully uploaded file '{}' to s3://{}/{} ({} bytes)",
req.local_file_path, req.bucket_name, req.object_key, result.file_size
);
let mut result_text = format!(
"✅ **File uploaded successfully!**\n\n\
**Local File:** {}\n\
**S3 Location:** s3://{}/{}\n\
**File Size:** {} bytes ({:.2} MB)\n\
**Content Type:** {}\n\
**ETag:** {}\n",
req.local_file_path,
result.bucket,
result.key,
result.file_size,
result.file_size as f64 / 1_048_576.0,
result.content_type,
result.etag
);
if let Some(ref version_id) = result.version_id {
result_text.push_str(&format!("**Version ID:** {version_id}\n"));
}
result_text.push_str("\n---\n");
result_text.push_str("**Upload Summary:**\n");
result_text.push_str(&format!("• Source: {}\n", req.local_file_path));
result_text.push_str(&format!("• Destination: {}\n", result.location));
result_text.push_str(&format!("• Size: {} bytes\n", result.file_size));
result_text.push_str(&format!("• Type: {}\n", result.content_type));
if result.file_size > 5 * 1024 * 1024 {
result_text.push_str("\n💡 **Note:** Large file uploaded successfully. Consider using multipart upload for files larger than 100MB for better performance and reliability.");
}
info!(
"upload_file tool executed successfully: {} bytes uploaded to s3://{}/{}",
result.file_size, req.bucket_name, req.object_key
);
result_text
}
Err(e) => {
error!(
"Failed to upload file '{}' to s3://{}/{}: {:?}",
req.local_file_path, req.bucket_name, req.object_key, e
);
format!(
"❌ **Failed to upload file '{}' to S3 bucket '{}'**\n\n\
**Error:** {}\n\n\
**Possible causes:**\n\
• Local file does not exist or is not readable\n\
• AWS credentials lack permissions to upload to this bucket\n\
• S3 bucket does not exist or is not accessible\n\
• Network connectivity issues\n\
• File path contains invalid characters or is too long\n\
• Insufficient disk space or memory\n\
• Custom endpoint is misconfigured\n\
• File is locked by another process\n\n\
**Troubleshooting steps:**\n\
1. Verify the local file exists and is readable\n\
2. Check your AWS credentials and permissions\n\
3. Ensure the bucket name is correct and accessible\n\
4. Try with a smaller file to test connectivity\n\
5. Check the file path for special characters\n\n\
**File:** {}\n\
**Bucket:** {}\n\
**Object Key:** {}",
req.local_file_path, req.bucket_name, e, req.local_file_path, req.bucket_name, req.object_key
)
}
}
}
}
#[tool_handler(router = self.tool_router)]
impl ServerHandler for RustfsMcpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ServerCapabilities {
tools: Some(ToolsCapability {
list_changed: Some(false),
}),
..Default::default()
},
instructions: Some("RustFS MCP Server providing S3 operations through Model Context Protocol".into()),
server_info: Implementation {
name: "rustfs-mcp-server".into(),
version: env!("CARGO_PKG_VERSION").into(),
},
}
}
async fn ping(&self, _ctx: RequestContext<RoleServer>) -> Result<(), ErrorData> {
info!("Received ping request");
Ok(())
}
async fn on_initialized(&self, _ctx: NotificationContext<RoleServer>) {
info!("Client initialized successfully");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_server_creation() {
let config = Config {
access_key_id: Some("test_key".to_string()),
secret_access_key: Some("test_secret".to_string()),
..Config::default()
};
let result = RustfsMcpServer::new(config).await;
assert!(result.is_err() || result.is_ok());
}
#[test]
fn test_get_object_request_defaults() {
let request = GetObjectRequest {
bucket_name: "test-bucket".to_string(),
object_key: "test-key".to_string(),
version_id: None,
mode: default_operation_mode(),
local_path: None,
max_content_size: default_max_content_size(),
};
assert_eq!(request.bucket_name, "test-bucket");
assert_eq!(request.object_key, "test-key");
assert!(request.version_id.is_none());
assert_eq!(request.mode, GetObjectMode::Read);
assert!(request.local_path.is_none());
assert_eq!(request.max_content_size, 1024 * 1024);
}
#[test]
fn test_get_object_request_serialization() {
let request = GetObjectRequest {
bucket_name: "test-bucket".to_string(),
object_key: "test-key".to_string(),
version_id: Some("version123".to_string()),
mode: GetObjectMode::Download,
local_path: Some("/path/to/file".to_string()),
max_content_size: 2048,
};
let json = serde_json::to_string(&request).unwrap();
let deserialized: GetObjectRequest = serde_json::from_str(&json).unwrap();
assert_eq!(request.bucket_name, deserialized.bucket_name);
assert_eq!(request.object_key, deserialized.object_key);
assert_eq!(request.version_id, deserialized.version_id);
assert_eq!(request.mode, deserialized.mode);
assert_eq!(request.local_path, deserialized.local_path);
assert_eq!(request.max_content_size, deserialized.max_content_size);
}
#[test]
fn test_get_object_request_serde_with_defaults() {
let json = r#"{
"bucket_name": "test-bucket",
"object_key": "test-key"
}"#;
let request: GetObjectRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.bucket_name, "test-bucket");
assert_eq!(request.object_key, "test-key");
assert!(request.version_id.is_none());
assert_eq!(request.mode, GetObjectMode::Read);
assert!(request.local_path.is_none());
assert_eq!(request.max_content_size, 1024 * 1024);
}
#[test]
fn test_default_functions() {
assert_eq!(default_operation_mode(), GetObjectMode::Read);
assert_eq!(default_max_content_size(), 1024 * 1024);
}
#[test]
fn test_get_object_mode_serialization() {
let read_mode = GetObjectMode::Read;
let download_mode = GetObjectMode::Download;
let read_json = serde_json::to_string(&read_mode).unwrap();
let download_json = serde_json::to_string(&download_mode).unwrap();
assert_eq!(read_json, r#""read""#);
assert_eq!(download_json, r#""download""#);
let read_mode_deser: GetObjectMode = serde_json::from_str(r#""read""#).unwrap();
let download_mode_deser: GetObjectMode = serde_json::from_str(r#""download""#).unwrap();
assert_eq!(read_mode_deser, GetObjectMode::Read);
assert_eq!(download_mode_deser, GetObjectMode::Download);
}
}