diff --git a/Cargo.toml b/Cargo.toml index 8800ceef..2dc9d961 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "crates/workers", # Worker thread pools and task scheduling "crates/zip", # ZIP file handling and compression "crates/ahm", + "crates/mcp", # MCP server for S3 operations ] resolver = "2" @@ -87,6 +88,7 @@ rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" } rustfs-signer = { path = "crates/signer", version = "0.0.5" } rustfs-checksums = { path = "crates/checksums", version = "0.0.5" } rustfs-workers = { path = "crates/workers", version = "0.0.5" } +rustfs-mcp = { path = "crates/mcp", version = "0.0.5" } aes-gcm = { version = "0.10.3", features = ["std"] } arc-swap = "1.7.1" argon2 = { version = "0.5.3", features = ["std"] } diff --git a/crates/mcp/Cargo.toml b/crates/mcp/Cargo.toml new file mode 100644 index 00000000..cb6a759d --- /dev/null +++ b/crates/mcp/Cargo.toml @@ -0,0 +1,70 @@ +# Copyright 2024 RustFS Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "rustfs-mcp" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +homepage.workspace = true +description = "RustFS MCP (Model Context Protocol) Server" +keywords = ["mcp", "s3", "aws", "rustfs", "server"] +categories = ["development-tools", "web-programming"] + +[[bin]] +name = "rustfs-mcp" +path = "src/main.rs" + +[dependencies] +# AWS SDK for S3 operations +aws-sdk-s3.workspace = true + +# Async runtime and utilities +tokio = { workspace = true, features = ["io-std", "io-util", "macros", "signal"] } + +# MCP SDK with macros support +rmcp = { version = "0.3.0", features = ["server", "transport-io", "macros"] } + +# Command line argument parsing +clap = { workspace = true, features = ["derive", "env"] } + +# Serialization (still needed for S3 data structures) +serde.workspace = true +serde_json.workspace = true +schemars = "1.0" + +# Error handling +anyhow.workspace = true +thiserror.workspace = true + +# Logging +tracing.workspace = true +tracing-subscriber.workspace = true + +# File handling and MIME type detection +mime_guess = "2.0" +tokio-util = { version = "0.7", features = ["io"] } +futures-util = "0.3" + +# Async trait support for trait abstractions +async-trait = "0.1" + +[dev-dependencies] +# Testing framework and utilities +mockall = "0.13" +tempfile = "3.12" +tokio-test = "0.4" +test-case = "3.3" diff --git a/crates/mcp/README.md b/crates/mcp/README.md new file mode 100644 index 00000000..b3ef9621 --- /dev/null +++ b/crates/mcp/README.md @@ -0,0 +1,184 @@ +[![RustFS](https://rustfs.com/images/rustfs-github.png)](https://rustfs.com) + +# RustFS MCP Server - Model Context Protocol + +

+ High-performance MCP server providing S3-compatible object storage operations for AI/LLM integration +

+ +

+ CI + πŸ“– Documentation + πŸ› Bug Reports + πŸ’¬ Discussions +

+ +--- + +## πŸ“– Overview + +**RustFS MCP Server** is a high-performance [Model Context Protocol (MCP)](https://spec.modelcontextprotocol.org) server that provides AI/LLM tools with seamless access to S3-compatible object storage operations. Built with Rust for maximum performance and safety, it enables AI assistants like Claude Desktop to interact with cloud storage through a standardized protocol. + +### What is MCP? + +The Model Context Protocol is an open standard that enables secure, controlled connections between AI applications and external systems. This server acts as a bridge between AI tools and S3-compatible storage services, providing structured access to file operations while maintaining security and observability. + +## ✨ Features + +### Supported S3 Operations + +- **List Buckets**: List all accessible S3 buckets +- **List Objects**: Browse bucket contents with optional prefix filtering +- **Upload Files**: Upload local files with automatic MIME type detection and cache control +- **Get Objects**: Retrieve objects from S3 storage with read or download modes + +## πŸ”§ Installation + +### Prerequisites + +- Rust 1.70+ (for building from source) +- AWS credentials configured (via environment variables, AWS CLI, or IAM roles) +- Access to S3-compatible storage service + +### Build from Source + +```bash +# Clone the repository +git clone https://github.com/rustfs/rustfs.git +cd rustfs + +# Build the MCP server +cargo build --release -p rustfs-mcp + +# The binary will be available at +./target/release/rustfs-mcp +``` + +## βš™οΈ Configuration + +### Environment Variables + +```bash +# AWS Credentials (required) +export AWS_ACCESS_KEY_ID=your_access_key +export AWS_SECRET_ACCESS_KEY=your_secret_key +export AWS_REGION=us-east-1 # Optional, defaults to us-east-1 + +# Optional: Custom S3 endpoint (for MinIO, etc.) +export AWS_ENDPOINT_URL=http://localhost:9000 + +# Logging level (optional) +export RUST_LOG=info +``` + +### Command Line Options + +```bash +rustfs-mcp --help +``` + +The server supports various command-line options for customizing behavior: + +- `--access-key-id`: AWS Access Key ID for S3 authentication +- `--secret-access-key`: AWS Secret Access Key for S3 authentication +- `--region`: AWS region to use for S3 operations (default: us-east-1) +- `--endpoint-url`: Custom S3 endpoint URL (for MinIO, LocalStack, etc.) +- `--log-level`: Log level configuration (default: rustfs_mcp_server=info) + +## πŸš€ Usage + +### Starting the Server + +```bash +# Start the MCP server +rustfs-mcp + +# Or with custom options +rustfs-mcp --log-level debug --region us-west-2 +``` + +### Integration with chat client +#### Option 1: Using Command Line Arguments +```json +{ + "mcpServers": { + "rustfs-mcp": { + "command": "/path/to/rustfs-mcp", + "args": [ + "--access-key-id", "your_access_key", + "--secret-access-key", "your_secret_key", + "--region", "us-west-2", + "--log-level", "info" + ] + } + } +} +``` + +#### Option 2: Using Environment Variables +```json +{ + "mcpServers": { + "rustfs-mcp": { + "command": "/path/to/rustfs-mcp", + "env": { + "AWS_ACCESS_KEY_ID": "your_access_key", + "AWS_SECRET_ACCESS_KEY": "your_secret_key", + "AWS_REGION": "us-east-1" + } + } + } +} +``` +## πŸ› οΈ Available Tools + +The MCP server exposes the following tools that AI assistants can use: + +### `list_buckets` +List all S3 buckets accessible with the configured credentials. + +**Parameters:** None + +### `list_objects` +List objects in an S3 bucket with optional prefix filtering. + +**Parameters:** +- `bucket_name` (string): Name of the S3 bucket +- `prefix` (string, optional): Prefix to filter objects + +### `upload_file` +Upload a local file to S3 with automatic MIME type detection. + +**Parameters:** +- `local_file_path` (string): Path to the local file +- `bucket_name` (string): Target S3 bucket +- `object_key` (string): S3 object key (destination path) +- `content_type` (string, optional): Content type (auto-detected if not provided) +- `storage_class` (string, optional): S3 storage class +- `cache_control` (string, optional): Cache control header + +### `get_object` +Retrieve an object from S3 with two operation modes: read content directly or download to a file. + +**Parameters:** +- `bucket_name` (string): Source S3 bucket +- `object_key` (string): S3 object key +- `version_id` (string, optional): Version ID for versioned objects +- `mode` (string, optional): Operation mode - "read" (default) returns content directly, "download" saves to local file +- `local_path` (string, optional): Local file path (required when mode is "download") +- `max_content_size` (number, optional): Maximum content size in bytes for read mode (default: 1MB) + +## Architecture + +The MCP server is built with a modular architecture: + +``` +rustfs-mcp/ +β”œβ”€β”€ src/ +β”‚ β”œβ”€β”€ main.rs # Entry point, CLI parsing, and server initialization +β”‚ β”œβ”€β”€ server.rs # MCP server implementation and tool handlers +β”‚ β”œβ”€β”€ s3_client.rs # S3 client wrapper with async operations +β”‚ β”œβ”€β”€ config.rs # Configuration management and CLI options +β”‚ └── lib.rs # Library exports and public API +└── Cargo.toml # Dependencies, metadata, and binary configuration +``` diff --git a/crates/mcp/src/config.rs b/crates/mcp/src/config.rs new file mode 100644 index 00000000..8ea795ff --- /dev/null +++ b/crates/mcp/src/config.rs @@ -0,0 +1,224 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use clap::Parser; +use tracing::info; + +/// Configuration for RustFS MCP Server +#[derive(Parser, Debug, Clone)] +#[command( + name = "rustfs-mcp-server", + about = "RustFS MCP (Model Context Protocol) Server for S3 operations", + version, + long_about = r#" +RustFS MCP Server - Model Context Protocol server for S3 operations + +This server provides S3 operations through the Model Context Protocol (MCP), +allowing AI assistants to interact with S3-compatible storage systems. + +ENVIRONMENT VARIABLES: + All command-line options can also be set via environment variables. + Command-line arguments take precedence over environment variables. + +EXAMPLES: + # Using command-line arguments + rustfs-mcp-server --access-key-id your_key --secret-access-key your_secret + + # Using environment variables + export AWS_ACCESS_KEY_ID=your_key + export AWS_SECRET_ACCESS_KEY=your_secret + rustfs-mcp-server + + # Mixed usage (command-line overrides environment) + export AWS_REGION=us-east-1 + rustfs-mcp-server --access-key-id mykey --secret-access-key mysecret --endpoint-url http://localhost:9000 +"# +)] +pub struct Config { + /// AWS Access Key ID + #[arg( + long = "access-key-id", + env = "AWS_ACCESS_KEY_ID", + help = "AWS Access Key ID for S3 authentication" + )] + pub access_key_id: Option, + + /// AWS Secret Access Key + #[arg( + long = "secret-access-key", + env = "AWS_SECRET_ACCESS_KEY", + help = "AWS Secret Access Key for S3 authentication" + )] + pub secret_access_key: Option, + + /// AWS Region + #[arg( + long = "region", + env = "AWS_REGION", + default_value = "us-east-1", + help = "AWS region to use for S3 operations" + )] + pub region: String, + + /// Custom S3 endpoint URL + #[arg( + long = "endpoint-url", + env = "AWS_ENDPOINT_URL", + help = "Custom S3 endpoint URL (for MinIO, LocalStack, etc.)" + )] + pub endpoint_url: Option, + + /// Log level + #[arg( + long = "log-level", + env = "RUST_LOG", + default_value = "rustfs_mcp_server=info", + help = "Log level configuration" + )] + pub log_level: String, + + /// Force path-style addressing + #[arg( + long = "force-path-style", + help = "Force path-style S3 addressing (automatically enabled for custom endpoints)" + )] + pub force_path_style: bool, +} + +impl Config { + pub fn new() -> Self { + Config::parse() + } + + pub fn validate(&self) -> Result<()> { + if self.access_key_id.is_none() { + anyhow::bail!("AWS Access Key ID is required. Set via --access-key-id or AWS_ACCESS_KEY_ID environment variable"); + } + + if self.secret_access_key.is_none() { + anyhow::bail!( + "AWS Secret Access Key is required. Set via --secret-access-key or AWS_SECRET_ACCESS_KEY environment variable" + ); + } + + Ok(()) + } + + pub fn access_key_id(&self) -> &str { + self.access_key_id.as_ref().expect("Access key ID should be validated") + } + + pub fn secret_access_key(&self) -> &str { + self.secret_access_key + .as_ref() + .expect("Secret access key should be validated") + } + + pub fn log_configuration(&self) { + let access_key_display = self + .access_key_id + .as_ref() + .map(|key| { + if key.len() > 8 { + format!("{}...{}", &key[..4], &key[key.len() - 4..]) + } else { + "*".repeat(key.len()) + } + }) + .unwrap_or_else(|| "Not set".to_string()); + + let endpoint_display = self + .endpoint_url + .as_ref() + .map(|url| format!("Custom endpoint: {url}")) + .unwrap_or_else(|| "Default AWS endpoints".to_string()); + + info!("Configuration:"); + info!(" AWS Region: {}", self.region); + info!(" AWS Access Key ID: {}", access_key_display); + info!(" AWS Secret Access Key: [HIDDEN]"); + info!(" S3 Endpoint: {}", endpoint_display); + info!(" Force Path Style: {}", self.force_path_style); + info!(" Log Level: {}", self.log_level); + } +} + +impl Default for Config { + fn default() -> Self { + Config { + access_key_id: None, + secret_access_key: None, + region: "us-east-1".to_string(), + endpoint_url: None, + log_level: "rustfs_mcp_server=info".to_string(), + force_path_style: false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_validation_success() { + let config = Config { + access_key_id: Some("test_key".to_string()), + secret_access_key: Some("test_secret".to_string()), + ..Config::default() + }; + + assert!(config.validate().is_ok()); + assert_eq!(config.access_key_id(), "test_key"); + assert_eq!(config.secret_access_key(), "test_secret"); + } + + #[test] + fn test_config_validation_missing_access_key() { + let config = Config { + access_key_id: None, + secret_access_key: Some("test_secret".to_string()), + ..Config::default() + }; + + let result = config.validate(); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Access Key ID")); + } + + #[test] + fn test_config_validation_missing_secret_key() { + let config = Config { + access_key_id: Some("test_key".to_string()), + secret_access_key: None, + ..Config::default() + }; + + let result = config.validate(); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Secret Access Key")); + } + + #[test] + fn test_config_default() { + let config = Config::default(); + assert_eq!(config.region, "us-east-1"); + assert_eq!(config.log_level, "rustfs_mcp_server=info"); + assert!(!config.force_path_style); + assert!(config.access_key_id.is_none()); + assert!(config.secret_access_key.is_none()); + assert!(config.endpoint_url.is_none()); + } +} diff --git a/crates/mcp/src/lib.rs b/crates/mcp/src/lib.rs new file mode 100644 index 00000000..44862285 --- /dev/null +++ b/crates/mcp/src/lib.rs @@ -0,0 +1,97 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod s3_client; +pub mod server; + +pub use config::Config; +pub use s3_client::{BucketInfo, S3Client}; +pub use server::RustfsMcpServer; + +use anyhow::{Context, Result}; +use rmcp::ServiceExt; +use tokio::io::{stdin, stdout}; +use tracing::info; + +/// Run the MCP server with the provided configuration +pub async fn run_server_with_config(config: Config) -> Result<()> { + info!("Starting RustFS MCP Server with provided configuration"); + + config.validate().context("Configuration validation failed")?; + + let server = RustfsMcpServer::new(config).await?; + + info!("Running MCP server with stdio transport"); + + // Run the server with stdio + server + .serve((stdin(), stdout())) + .await + .context("Failed to serve MCP server")? + .waiting() + .await + .context("Error while waiting for server shutdown")?; + + Ok(()) +} + +/// Run the MCP server with default configuration (from environment variables) +pub async fn run_server() -> Result<()> { + info!("Starting RustFS MCP Server with default configuration"); + + let config = Config::default(); + run_server_with_config(config).await +} + +/// Validate environment configuration (legacy function for backward compatibility) +pub fn validate_environment() -> Result<()> { + use std::env; + + if env::var("AWS_ACCESS_KEY_ID").is_err() { + anyhow::bail!("AWS_ACCESS_KEY_ID environment variable is required"); + } + + if env::var("AWS_SECRET_ACCESS_KEY").is_err() { + anyhow::bail!("AWS_SECRET_ACCESS_KEY environment variable is required"); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_creation() { + let config = Config { + access_key_id: Some("test_key".to_string()), + secret_access_key: Some("test_secret".to_string()), + ..Config::default() + }; + + assert!(config.validate().is_ok()); + assert_eq!(config.access_key_id(), "test_key"); + assert_eq!(config.secret_access_key(), "test_secret"); + } + + #[tokio::test] + async fn test_run_server_with_invalid_config() { + let config = Config::default(); + + let result = run_server_with_config(config).await; + assert!(result.is_err()); + } +} diff --git a/crates/mcp/src/main.rs b/crates/mcp/src/main.rs new file mode 100644 index 00000000..73e638ff --- /dev/null +++ b/crates/mcp/src/main.rs @@ -0,0 +1,104 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::{Context, Result}; +use clap::Parser; +use rmcp::ServiceExt; +use rustfs_mcp::{Config, RustfsMcpServer}; +use std::env; +use tokio::io::{stdin, stdout}; +use tracing::{Level, error, info}; +use tracing_subscriber::{EnvFilter, FmtSubscriber}; + +#[tokio::main] +async fn main() -> Result<()> { + let config = Config::parse(); + + init_tracing(&config)?; + + info!("Starting RustFS MCP Server v{}", env!("CARGO_PKG_VERSION")); + + if let Err(e) = config.validate() { + error!("Configuration validation failed: {}", e); + print_usage_help(); + std::process::exit(1); + } + + config.log_configuration(); + + if let Err(e) = run_server(config).await { + error!("Server error: {}", e); + std::process::exit(1); + } + + info!("RustFS MCP Server shutdown complete"); + Ok(()) +} + +async fn run_server(config: Config) -> Result<()> { + info!("Initializing RustFS MCP Server"); + + let server = RustfsMcpServer::new(config).await?; + + info!("Starting MCP server with stdio transport"); + + server + .serve((stdin(), stdout())) + .await + .context("Failed to serve MCP server")? + .waiting() + .await + .context("Error while waiting for server shutdown")?; + + Ok(()) +} + +fn init_tracing(config: &Config) -> Result<()> { + let filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new(&config.log_level)) + .context("Failed to create log filter")?; + + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .with_env_filter(filter) + .with_target(false) + .with_thread_ids(false) + .with_thread_names(false) + .with_writer(std::io::stderr) // Force logs to stderr to avoid interfering with MCP protocol on stdout + .finish(); + + tracing::subscriber::set_global_default(subscriber).context("Failed to set global tracing subscriber")?; + + Ok(()) +} + +fn print_usage_help() { + eprintln!(); + eprintln!("RustFS MCP Server - Model Context Protocol server for S3 operations"); + eprintln!(); + eprintln!("For more help, run: rustfs-mcp --help"); + eprintln!(); + eprintln!("QUICK START:"); + eprintln!(" # Using command-line arguments"); + eprintln!(" rustfs-mcp --access-key-id YOUR_KEY --secret-access-key YOUR_SECRET"); + eprintln!(); + eprintln!(" # Using environment variables"); + eprintln!(" export AWS_ACCESS_KEY_ID=YOUR_KEY"); + eprintln!(" export AWS_SECRET_ACCESS_KEY=YOUR_SECRET"); + eprintln!(" rustfs-mcp"); + eprintln!(); + eprintln!(" # For local development with RustFS"); + eprintln!(" rustfs-mcp --access-key-id minioadmin --secret-access-key minioadmin --endpoint-url http://localhost:9000"); + eprintln!(); +} diff --git a/crates/mcp/src/s3_client.rs b/crates/mcp/src/s3_client.rs new file mode 100644 index 00000000..c35dc66b --- /dev/null +++ b/crates/mcp/src/s3_client.rs @@ -0,0 +1,796 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::{Context, Result}; +use aws_sdk_s3::config::{Credentials, Region}; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::{Client, Config as S3Config}; +use serde::{Deserialize, Serialize}; +use std::path::Path; +use tokio::io::AsyncWriteExt; +use tracing::{debug, info}; + +use crate::config::Config; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BucketInfo { + pub name: String, + pub creation_date: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObjectInfo { + pub key: String, + pub size: Option, + pub last_modified: Option, + pub etag: Option, + pub storage_class: Option, +} + +#[derive(Debug, Clone, Default)] +pub struct ListObjectsOptions { + pub prefix: Option, + pub delimiter: Option, + pub max_keys: Option, + pub continuation_token: Option, + pub start_after: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListObjectsResult { + pub objects: Vec, + pub common_prefixes: Vec, + pub is_truncated: bool, + pub next_continuation_token: Option, + pub max_keys: Option, + pub key_count: i32, +} + +#[derive(Debug, Clone, Default)] +pub struct UploadFileOptions { + pub content_type: Option, + pub metadata: Option>, + pub storage_class: Option, + pub server_side_encryption: Option, + pub cache_control: Option, + pub content_disposition: Option, + pub content_encoding: Option, + pub content_language: Option, +} + +#[derive(Debug, Clone, Default)] +pub struct GetObjectOptions { + pub version_id: Option, + pub range: Option, + pub if_modified_since: Option, + pub if_unmodified_since: Option, + pub max_content_size: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DetectedFileType { + Text, + NonText(String), // mime type for non-text files +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetObjectResult { + pub bucket: String, + pub key: String, + pub content_type: String, + pub content_length: u64, + pub last_modified: Option, + pub etag: Option, + pub version_id: Option, + pub detected_type: DetectedFileType, + pub content: Option>, // Raw content bytes + pub text_content: Option, // UTF-8 decoded content for text files +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UploadResult { + pub bucket: String, + pub key: String, + pub etag: String, + pub location: String, + pub version_id: Option, + pub file_size: u64, + pub content_type: String, + pub upload_id: Option, +} +#[derive(Debug, Clone)] +pub struct S3Client { + client: Client, +} + +impl S3Client { + pub async fn new(config: &Config) -> Result { + info!("Initializing S3 client from configuration"); + + let access_key = config.access_key_id(); + let secret_key = config.secret_access_key(); + + debug!("Using AWS region: {}", config.region); + if let Some(ref endpoint) = config.endpoint_url { + debug!("Using custom endpoint: {}", endpoint); + } + + let credentials = Credentials::new(access_key, secret_key, None, None, "rustfs-mcp-server"); + + let mut config_builder = S3Config::builder() + .credentials_provider(credentials) + .region(Region::new(config.region.clone())) + .behavior_version(aws_sdk_s3::config::BehaviorVersion::latest()); + + // Set force path style if custom endpoint or explicitly requested + let should_force_path_style = config.endpoint_url.is_some() || config.force_path_style; + if should_force_path_style { + config_builder = config_builder.force_path_style(true); + } + + if let Some(endpoint) = &config.endpoint_url { + config_builder = config_builder.endpoint_url(endpoint); + } + + let s3_config = config_builder.build(); + let client = Client::from_conf(s3_config); + + info!("S3 client initialized successfully"); + + Ok(Self { client }) + } + + pub async fn list_buckets(&self) -> Result> { + debug!("Listing S3 buckets"); + + let response = self.client.list_buckets().send().await.context("Failed to list S3 buckets")?; + + let buckets: Vec = response + .buckets() + .iter() + .map(|bucket| { + let name = bucket.name().unwrap_or("unknown").to_string(); + let creation_date = bucket + .creation_date() + .map(|dt| dt.fmt(aws_sdk_s3::primitives::DateTimeFormat::DateTime).unwrap()); + + BucketInfo { name, creation_date } + }) + .collect(); + + debug!("Found {} buckets", buckets.len()); + Ok(buckets) + } + + pub async fn list_objects_v2(&self, bucket_name: &str, options: ListObjectsOptions) -> Result { + debug!("Listing objects in bucket '{}' with options: {:?}", bucket_name, options); + + let mut request = self.client.list_objects_v2().bucket(bucket_name); + + if let Some(prefix) = options.prefix { + request = request.prefix(prefix); + } + + if let Some(delimiter) = options.delimiter { + request = request.delimiter(delimiter); + } + + if let Some(max_keys) = options.max_keys { + request = request.max_keys(max_keys); + } + + if let Some(continuation_token) = options.continuation_token { + request = request.continuation_token(continuation_token); + } + + if let Some(start_after) = options.start_after { + request = request.start_after(start_after); + } + + let response = request + .send() + .await + .context(format!("Failed to list objects in bucket '{bucket_name}'"))?; + + let objects: Vec = response + .contents() + .iter() + .map(|obj| { + let key = obj.key().unwrap_or("unknown").to_string(); + let size = obj.size(); + let last_modified = obj + .last_modified() + .map(|dt| dt.fmt(aws_sdk_s3::primitives::DateTimeFormat::DateTime).unwrap()); + let etag = obj.e_tag().map(|e| e.to_string()); + let storage_class = obj.storage_class().map(|sc| sc.as_str().to_string()); + + ObjectInfo { + key, + size, + last_modified, + etag, + storage_class, + } + }) + .collect(); + + let common_prefixes: Vec = response + .common_prefixes() + .iter() + .filter_map(|cp| cp.prefix()) + .map(|p| p.to_string()) + .collect(); + + let result = ListObjectsResult { + objects, + common_prefixes, + is_truncated: response.is_truncated().unwrap_or(false), + next_continuation_token: response.next_continuation_token().map(|t| t.to_string()), + max_keys: response.max_keys(), + key_count: response.key_count().unwrap_or(0), + }; + + debug!( + "Found {} objects and {} common prefixes in bucket '{}'", + result.objects.len(), + result.common_prefixes.len(), + bucket_name + ); + + Ok(result) + } + + pub async fn upload_file( + &self, + local_path: &str, + bucket_name: &str, + object_key: &str, + options: UploadFileOptions, + ) -> Result { + info!("Starting file upload: '{}' -> s3://{}/{}", local_path, bucket_name, object_key); + + let path = Path::new(local_path); + let canonical_path = path + .canonicalize() + .context(format!("Failed to resolve file path: {local_path}"))?; + + if !canonical_path.exists() { + anyhow::bail!("File does not exist: {}", local_path); + } + + if !canonical_path.is_file() { + anyhow::bail!("Path is not a file: {}", local_path); + } + + let metadata = tokio::fs::metadata(&canonical_path) + .await + .context(format!("Failed to read file metadata: {local_path}"))?; + + let file_size = metadata.len(); + debug!("File size: {file_size} bytes"); + + let content_type = options.content_type.unwrap_or_else(|| { + let detected = mime_guess::from_path(&canonical_path).first_or_octet_stream().to_string(); + debug!("Auto-detected content type: {detected}"); + detected + }); + + let file_content = tokio::fs::read(&canonical_path) + .await + .context(format!("Failed to read file content: {local_path}"))?; + + let byte_stream = ByteStream::from(file_content); + + let mut request = self + .client + .put_object() + .bucket(bucket_name) + .key(object_key) + .body(byte_stream) + .content_type(&content_type) + .content_length(file_size as i64); + + if let Some(storage_class) = &options.storage_class { + request = request.storage_class(storage_class.as_str().into()); + } + + if let Some(cache_control) = &options.cache_control { + request = request.cache_control(cache_control); + } + + if let Some(content_disposition) = &options.content_disposition { + request = request.content_disposition(content_disposition); + } + + if let Some(content_encoding) = &options.content_encoding { + request = request.content_encoding(content_encoding); + } + + if let Some(content_language) = &options.content_language { + request = request.content_language(content_language); + } + + if let Some(sse) = &options.server_side_encryption { + request = request.server_side_encryption(sse.as_str().into()); + } + + if let Some(metadata_map) = &options.metadata { + for (key, value) in metadata_map { + request = request.metadata(key, value); + } + } + + debug!("Executing S3 put_object request"); + let response = request + .send() + .await + .context(format!("Failed to upload file to s3://{bucket_name}/{object_key}"))?; + + let etag = response.e_tag().unwrap_or("unknown").to_string(); + let version_id = response.version_id().map(|v| v.to_string()); + + let location = format!("s3://{bucket_name}/{object_key}"); + + let upload_result = UploadResult { + bucket: bucket_name.to_string(), + key: object_key.to_string(), + etag, + location, + version_id, + file_size, + content_type, + upload_id: None, + }; + + info!( + "File upload completed successfully: {} bytes uploaded to s3://{}/{}", + file_size, bucket_name, object_key + ); + + Ok(upload_result) + } + + pub async fn get_object(&self, bucket_name: &str, object_key: &str, options: GetObjectOptions) -> Result { + info!("Getting object: s3://{}/{}", bucket_name, object_key); + + let mut request = self.client.get_object().bucket(bucket_name).key(object_key); + + if let Some(version_id) = &options.version_id { + request = request.version_id(version_id); + } + + if let Some(range) = &options.range { + request = request.range(range); + } + + if let Some(if_modified_since) = &options.if_modified_since { + request = request.if_modified_since( + aws_sdk_s3::primitives::DateTime::from_str(if_modified_since, aws_sdk_s3::primitives::DateTimeFormat::DateTime) + .context("Failed to parse if_modified_since date")?, + ); + } + + debug!("Executing S3 get_object request"); + let response = request + .send() + .await + .context(format!("Failed to get object from s3://{bucket_name}/{object_key}"))?; + + let content_type = response.content_type().unwrap_or("application/octet-stream").to_string(); + let content_length = response.content_length().unwrap_or(0) as u64; + let last_modified = response + .last_modified() + .map(|dt| dt.fmt(aws_sdk_s3::primitives::DateTimeFormat::DateTime).unwrap()); + let etag = response.e_tag().map(|e| e.to_string()); + let version_id = response.version_id().map(|v| v.to_string()); + + let max_size = options.max_content_size.unwrap_or(10 * 1024 * 1024); + let mut content = Vec::new(); + let mut byte_stream = response.body; + let mut total_read = 0; + + while let Some(bytes_result) = byte_stream.try_next().await.context("Failed to read object content")? { + if total_read + bytes_result.len() > max_size { + anyhow::bail!("Object size exceeds maximum allowed size of {} bytes", max_size); + } + content.extend_from_slice(&bytes_result); + total_read += bytes_result.len(); + } + + debug!("Read {} bytes from object", content.len()); + + let detected_type = Self::detect_file_type(Some(&content_type), &content); + debug!("Detected file type: {detected_type:?}"); + + let text_content = match &detected_type { + DetectedFileType::Text => match std::str::from_utf8(&content) { + Ok(text) => Some(text.to_string()), + Err(_) => { + debug!("Failed to decode content as UTF-8, treating as binary"); + None + } + }, + _ => None, + }; + + let result = GetObjectResult { + bucket: bucket_name.to_string(), + key: object_key.to_string(), + content_type, + content_length, + last_modified, + etag, + version_id, + detected_type, + content: Some(content), + text_content, + }; + + info!( + "Object retrieved successfully: {} bytes from s3://{}/{}", + result.content_length, bucket_name, object_key + ); + + Ok(result) + } + + fn detect_file_type(content_type: Option<&str>, content_bytes: &[u8]) -> DetectedFileType { + if let Some(ct) = content_type { + let ct_lower = ct.to_lowercase(); + + if ct_lower.starts_with("text/") + || ct_lower == "application/json" + || ct_lower == "application/xml" + || ct_lower == "application/yaml" + || ct_lower == "application/javascript" + || ct_lower == "application/x-yaml" + || ct_lower == "application/x-sh" + || ct_lower == "application/x-shellscript" + || ct_lower.contains("script") + || ct_lower.contains("xml") + || ct_lower.contains("json") + { + return DetectedFileType::Text; + } + + return DetectedFileType::NonText(ct.to_string()); + } + + if content_bytes.len() >= 4 { + match &content_bytes[0..4] { + // PNG: 89 50 4E 47 + [0x89, 0x50, 0x4E, 0x47] => return DetectedFileType::NonText("image/png".to_string()), + // JPEG: FF D8 FF + [0xFF, 0xD8, 0xFF, _] => return DetectedFileType::NonText("image/jpeg".to_string()), + // GIF: 47 49 46 38 + [0x47, 0x49, 0x46, 0x38] => return DetectedFileType::NonText("image/gif".to_string()), + // BMP: 42 4D + [0x42, 0x4D, _, _] => return DetectedFileType::NonText("image/bmp".to_string()), + // RIFF container (WebP/WAV) + [0x52, 0x49, 0x46, 0x46] if content_bytes.len() >= 12 => { + if &content_bytes[8..12] == b"WEBP" { + return DetectedFileType::NonText("image/webp".to_string()); + } else if &content_bytes[8..12] == b"WAVE" { + return DetectedFileType::NonText("audio/wav".to_string()); + } + return DetectedFileType::NonText("application/octet-stream".to_string()); + } + _ => {} + } + } + + // 3. Check if content is valid UTF-8 text as fallback + if std::str::from_utf8(content_bytes).is_ok() { + // Additional heuristics for text detection + let non_printable_count = content_bytes + .iter() + .filter(|&&b| b < 0x20 && b != 0x09 && b != 0x0A && b != 0x0D) // Control chars except tab, LF, CR + .count(); + let total_chars = content_bytes.len(); + + // If less than 5% are non-printable control characters, consider it text + if total_chars > 0 && (non_printable_count as f64 / total_chars as f64) < 0.05 { + return DetectedFileType::Text; + } + } + + // Default to non-text binary + DetectedFileType::NonText("application/octet-stream".to_string()) + } + + pub async fn download_object_to_file( + &self, + bucket_name: &str, + object_key: &str, + local_path: &str, + options: GetObjectOptions, + ) -> Result<(u64, String)> { + info!("Downloading object: s3://{}/{} -> {}", bucket_name, object_key, local_path); + + let mut request = self.client.get_object().bucket(bucket_name).key(object_key); + + if let Some(version_id) = &options.version_id { + request = request.version_id(version_id); + } + + if let Some(range) = &options.range { + request = request.range(range); + } + + if let Some(if_modified_since) = &options.if_modified_since { + request = request.if_modified_since( + aws_sdk_s3::primitives::DateTime::from_str(if_modified_since, aws_sdk_s3::primitives::DateTimeFormat::DateTime) + .context("Failed to parse if_modified_since date")?, + ); + } + + debug!("Executing S3 get_object request for download"); + let response = request + .send() + .await + .context(format!("Failed to get object from s3://{bucket_name}/{object_key}"))?; + + let local_file_path = Path::new(local_path); + + if let Some(parent) = local_file_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .context(format!("Failed to create parent directories for {local_path}"))?; + } + + let mut file = tokio::fs::File::create(local_file_path) + .await + .context(format!("Failed to create local file: {local_path}"))?; + + let mut byte_stream = response.body; + let mut total_bytes = 0u64; + + while let Some(bytes_result) = byte_stream.try_next().await.context("Failed to read object content")? { + file.write_all(&bytes_result) + .await + .context(format!("Failed to write to local file: {local_path}"))?; + total_bytes += bytes_result.len() as u64; + } + + file.flush().await.context("Failed to flush file to disk")?; + + let absolute_path = local_file_path + .canonicalize() + .unwrap_or_else(|_| local_file_path.to_path_buf()) + .to_string_lossy() + .to_string(); + + info!( + "Object downloaded successfully: {} bytes from s3://{}/{} to {}", + total_bytes, bucket_name, object_key, absolute_path + ); + + Ok((total_bytes, absolute_path)) + } + + pub async fn health_check(&self) -> Result<()> { + debug!("Performing S3 health check"); + + self.client.list_buckets().send().await.context("S3 health check failed")?; + + debug!("S3 health check passed"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[ignore] // Requires AWS credentials + async fn test_s3_client_creation() { + let config = Config { + access_key_id: Some("test_key".to_string()), + secret_access_key: Some("test_secret".to_string()), + region: "us-east-1".to_string(), + ..Config::default() + }; + + let result = S3Client::new(&config).await; + assert!(result.is_ok()); + } + + #[test] + fn test_bucket_info_serialization() { + let bucket = BucketInfo { + name: "test-bucket".to_string(), + creation_date: Some("2024-01-01T00:00:00Z".to_string()), + }; + + let json = serde_json::to_string(&bucket).unwrap(); + let deserialized: BucketInfo = serde_json::from_str(&json).unwrap(); + + assert_eq!(bucket.name, deserialized.name); + assert_eq!(bucket.creation_date, deserialized.creation_date); + } + + #[test] + fn test_detect_file_type_text_content_type() { + let test_cases = vec![ + ("text/plain", "Hello world"), + ("text/html", ""), + ("application/json", r#"{"key": "value"}"#), + ("application/xml", ""), + ("application/yaml", "key: value"), + ("application/javascript", "console.log('hello');"), + ]; + + for (content_type, content) in test_cases { + let result = S3Client::detect_file_type(Some(content_type), content.as_bytes()); + match result { + DetectedFileType::Text => {} + _ => panic!("Expected Text for content type {content_type}"), + } + } + } + + #[test] + fn test_detect_file_type_non_text_content_type() { + // Test various non-text content types + let test_cases = vec![ + ("image/png", "image/png"), + ("image/jpeg", "image/jpeg"), + ("audio/mp3", "audio/mp3"), + ("video/mp4", "video/mp4"), + ("application/pdf", "application/pdf"), + ]; + + for (content_type, expected_mime) in test_cases { + let result = S3Client::detect_file_type(Some(content_type), b"some content"); + match result { + DetectedFileType::NonText(mime_type) => { + assert_eq!(mime_type, expected_mime); + } + _ => panic!("Expected NonText for content type {content_type}"), + } + } + } + + #[test] + fn test_detect_file_type_magic_bytes_simplified() { + // Test magic bytes detection (now all return NonText) + let test_cases = vec![ + // PNG magic bytes: 89 50 4E 47 + (vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A], "image/png"), + // JPEG magic bytes: FF D8 FF + (vec![0xFF, 0xD8, 0xFF, 0xE0], "image/jpeg"), + // GIF magic bytes: 47 49 46 38 + (vec![0x47, 0x49, 0x46, 0x38, 0x37, 0x61], "image/gif"), + ]; + + for (content, expected_mime) in test_cases { + let result = S3Client::detect_file_type(None, &content); + match result { + DetectedFileType::NonText(mime_type) => { + assert_eq!(mime_type, expected_mime); + } + _ => panic!("Expected NonText for magic bytes: {content:?}"), + } + } + } + + #[test] + fn test_detect_file_type_webp_magic_bytes() { + // WebP has more complex magic bytes: RIFF....WEBP + let mut webp_content = vec![0x52, 0x49, 0x46, 0x46]; // RIFF + webp_content.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // Size (4 bytes) + webp_content.extend_from_slice(b"WEBP"); // WEBP signature + + let result = S3Client::detect_file_type(None, &webp_content); + match result { + DetectedFileType::NonText(mime_type) => { + assert_eq!(mime_type, "image/webp"); + } + _ => panic!("Expected WebP NonText detection"), + } + } + + #[test] + fn test_detect_file_type_wav_magic_bytes() { + // WAV has magic bytes: RIFF....WAVE + let mut wav_content = vec![0x52, 0x49, 0x46, 0x46]; // RIFF + wav_content.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // Size (4 bytes) + wav_content.extend_from_slice(b"WAVE"); // WAVE signature + + let result = S3Client::detect_file_type(None, &wav_content); + match result { + DetectedFileType::NonText(mime_type) => { + assert_eq!(mime_type, "audio/wav"); + } + _ => panic!("Expected WAV NonText detection"), + } + } + + #[test] + fn test_detect_file_type_utf8_text() { + // Test UTF-8 text detection + let utf8_content = "Hello, δΈ–η•Œ! 🌍".as_bytes(); + let result = S3Client::detect_file_type(None, utf8_content); + match result { + DetectedFileType::Text => {} + _ => panic!("Expected Text for UTF-8 content"), + } + + // Test ASCII text + let ascii_content = b"Hello, world! This is ASCII text."; + let result = S3Client::detect_file_type(None, ascii_content); + match result { + DetectedFileType::Text => {} + _ => panic!("Expected Text for ASCII content"), + } + } + + #[test] + fn test_detect_file_type_binary() { + // Test binary content that should not be detected as text + let binary_content = vec![0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE, 0xFD, 0xFC]; + let result = S3Client::detect_file_type(None, &binary_content); + match result { + DetectedFileType::NonText(mime_type) => { + assert_eq!(mime_type, "application/octet-stream"); + } + _ => panic!("Expected NonText for binary content"), + } + } + + #[test] + fn test_detect_file_type_priority() { + // Content-Type should take priority over magic bytes + let png_magic_bytes = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]; + + // Even with PNG magic bytes, text content-type should win + let result = S3Client::detect_file_type(Some("text/plain"), &png_magic_bytes); + match result { + DetectedFileType::Text => {} + _ => panic!("Expected Text due to content-type priority"), + } + } + + #[test] + fn test_get_object_options_default() { + let options = GetObjectOptions::default(); + assert!(options.version_id.is_none()); + assert!(options.range.is_none()); + assert!(options.if_modified_since.is_none()); + assert!(options.if_unmodified_since.is_none()); + assert!(options.max_content_size.is_none()); + } + + #[test] + fn test_detected_file_type_serialization() { + let test_cases = vec![ + DetectedFileType::Text, + DetectedFileType::NonText("image/png".to_string()), + DetectedFileType::NonText("audio/mpeg".to_string()), + DetectedFileType::NonText("application/octet-stream".to_string()), + ]; + + for file_type in test_cases { + let json = serde_json::to_string(&file_type).unwrap(); + let deserialized: DetectedFileType = serde_json::from_str(&json).unwrap(); + + match (&file_type, &deserialized) { + (DetectedFileType::Text, DetectedFileType::Text) => {} + (DetectedFileType::NonText(a), DetectedFileType::NonText(b)) => assert_eq!(a, b), + _ => panic!("Serialization/deserialization mismatch"), + } + } + } +} diff --git a/crates/mcp/src/server.rs b/crates/mcp/src/server.rs new file mode 100644 index 00000000..02bfec06 --- /dev/null +++ b/crates/mcp/src/server.rs @@ -0,0 +1,670 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use rmcp::{ + ErrorData, RoleServer, ServerHandler, + handler::server::{router::tool::ToolRouter, tool::Parameters}, + model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, ToolsCapability}, + service::{NotificationContext, RequestContext}, + tool, tool_handler, tool_router, +}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use tracing::{debug, error, info}; + +use crate::config::Config; +use crate::s3_client::{DetectedFileType, GetObjectOptions, ListObjectsOptions, S3Client, UploadFileOptions}; + +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct ListObjectsRequest { + pub bucket_name: String, + #[serde(default)] + #[schemars(description = "Optional prefix to filter objects")] + pub prefix: Option, +} + +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct UploadFileRequest { + #[schemars(description = "Path to the local file to upload")] + pub local_file_path: String, + #[schemars(description = "Name of the S3 bucket to upload to")] + pub bucket_name: String, + #[schemars(description = "S3 object key (path/filename in the bucket)")] + pub object_key: String, + #[serde(default)] + #[schemars(description = "Optional content type (auto-detected if not specified)")] + pub content_type: Option, + #[serde(default)] + #[schemars(description = "Optional storage class (STANDARD, REDUCED_REDUNDANCY, etc.)")] + pub storage_class: Option, + #[serde(default)] + #[schemars(description = "Optional cache control header")] + pub cache_control: Option, +} + +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct GetObjectRequest { + #[schemars(description = "Name of the S3 bucket")] + pub bucket_name: String, + #[schemars(description = "S3 object key (path/filename in the bucket)")] + pub object_key: String, + #[serde(default)] + #[schemars(description = "Optional version ID for versioned objects")] + pub version_id: Option, + #[serde(default = "default_operation_mode")] + #[schemars(description = "Operation mode: read (return content) or download (save to local file)")] + pub mode: GetObjectMode, + #[serde(default)] + #[schemars(description = "Local file path for download mode (required when mode is download)")] + pub local_path: Option, + #[serde(default = "default_max_content_size")] + #[schemars(description = "Maximum content size to read in bytes for read mode (default: 1MB)")] + pub max_content_size: usize, +} + +#[derive(Serialize, Deserialize, JsonSchema, Debug, Clone, PartialEq)] +pub enum GetObjectMode { + #[serde(rename = "read")] + Read, + #[serde(rename = "download")] + Download, +} + +fn default_operation_mode() -> GetObjectMode { + GetObjectMode::Read +} +fn default_max_content_size() -> usize { + 1024 * 1024 +} + +#[derive(Debug, Clone)] +pub struct RustfsMcpServer { + s3_client: S3Client, + _config: Config, + tool_router: ToolRouter, +} + +#[tool_router(router = tool_router)] +impl RustfsMcpServer { + pub async fn new(config: Config) -> Result { + info!("Creating RustFS MCP Server"); + + let s3_client = S3Client::new(&config).await?; + + Ok(Self { + s3_client, + _config: config, + tool_router: Self::tool_router(), + }) + } + + #[tool(description = "List all S3 buckets accessible with the configured credentials")] + pub async fn list_buckets(&self) -> String { + info!("Executing list_buckets tool"); + + match self.s3_client.list_buckets().await { + Ok(buckets) => { + debug!("Successfully retrieved {} buckets", buckets.len()); + + if buckets.is_empty() { + return "No S3 buckets found. The AWS credentials may not have access to any buckets, or no buckets exist in this account.".to_string(); + } + + let mut result_text = format!("Found {} S3 bucket(s):\n\n", buckets.len()); + + for (index, bucket) in buckets.iter().enumerate() { + result_text.push_str(&format!("{}. **{}**", index + 1, bucket.name)); + + if let Some(ref creation_date) = bucket.creation_date { + result_text.push_str(&format!("\n - Created: {creation_date}")); + } + result_text.push_str("\n\n"); + } + + result_text.push_str("---\n"); + result_text.push_str(&format!("Total buckets: {}\n", buckets.len())); + result_text.push_str("Note: Only buckets accessible with the current AWS credentials are shown."); + + info!("list_buckets tool executed successfully"); + result_text + } + Err(e) => { + error!("Failed to list buckets: {:?}", e); + + format!( + "Failed to list S3 buckets: {e}\n\nPossible causes:\n\ + β€’ AWS credentials are not set or invalid\n\ + β€’ Network connectivity issues\n\ + β€’ AWS region is not set correctly\n\ + β€’ Insufficient permissions to list buckets\n\ + β€’ Custom endpoint is misconfigured\n\n\ + Please verify your AWS configuration and try again." + ) + } + } + } + + #[tool(description = "List objects in a specific S3 bucket with optional prefix filtering")] + pub async fn list_objects(&self, Parameters(req): Parameters) -> String { + info!("Executing list_objects tool for bucket: {}", req.bucket_name); + + let options = ListObjectsOptions { + prefix: req.prefix.clone(), + delimiter: None, + max_keys: Some(1000), + ..ListObjectsOptions::default() + }; + + match self.s3_client.list_objects_v2(&req.bucket_name, options).await { + Ok(result) => { + debug!( + "Successfully retrieved {} objects and {} common prefixes from bucket '{}'", + result.objects.len(), + result.common_prefixes.len(), + req.bucket_name + ); + + if result.objects.is_empty() && result.common_prefixes.is_empty() { + let prefix_msg = req.prefix.as_ref().map(|p| format!(" with prefix '{p}'")).unwrap_or_default(); + return format!( + "No objects found in bucket '{}'{prefix_msg}. The bucket may be empty or the prefix may not match any objects.", + req.bucket_name + ); + } + + let mut result_text = format!("Found {} object(s) in bucket **{}**", result.key_count, req.bucket_name); + + if let Some(ref p) = req.prefix { + result_text.push_str(&format!(" with prefix '{p}'")); + } + result_text.push_str(":\n\n"); + + if !result.common_prefixes.is_empty() { + result_text.push_str("**Directories:**\n"); + for (index, prefix) in result.common_prefixes.iter().enumerate() { + result_text.push_str(&format!("{}. πŸ“ {prefix}\n", index + 1)); + } + result_text.push('\n'); + } + + if !result.objects.is_empty() { + result_text.push_str("**Objects:**\n"); + for (index, obj) in result.objects.iter().enumerate() { + result_text.push_str(&format!("{}. **{}**\n", index + 1, obj.key)); + + if let Some(size) = obj.size { + result_text.push_str(&format!(" - Size: {size} bytes\n")); + } + + if let Some(ref last_modified) = obj.last_modified { + result_text.push_str(&format!(" - Last Modified: {last_modified}\n")); + } + + if let Some(ref etag) = obj.etag { + result_text.push_str(&format!(" - ETag: {etag}\n")); + } + + if let Some(ref storage_class) = obj.storage_class { + result_text.push_str(&format!(" - Storage Class: {storage_class}\n")); + } + + result_text.push('\n'); + } + } + + if result.is_truncated { + result_text.push_str("**Note:** Results are truncated. "); + if let Some(ref token) = result.next_continuation_token { + result_text.push_str(&format!("Use continuation token '{token}' to get more results.\n")); + } + result_text.push('\n'); + } + + result_text.push_str("---\n"); + result_text.push_str(&format!( + "Total: {} object(s), {} directory/ies", + result.objects.len(), + result.common_prefixes.len() + )); + + if let Some(max_keys) = result.max_keys { + result_text.push_str(&format!(", Max keys: {max_keys}")); + } + + info!("list_objects tool executed successfully for bucket '{}'", req.bucket_name); + result_text + } + Err(e) => { + error!("Failed to list objects in bucket '{}': {:?}", req.bucket_name, e); + + format!( + "Failed to list objects in S3 bucket '{}': {}\n\nPossible causes:\n\ + β€’ Bucket does not exist or is not accessible\n\ + β€’ AWS credentials lack permissions to list objects in this bucket\n\ + β€’ Network connectivity issues\n\ + β€’ Custom endpoint is misconfigured\n\ + β€’ Bucket name contains invalid characters\n\n\ + Please verify the bucket name, your AWS configuration, and permissions.", + req.bucket_name, e + ) + } + } + } + + #[tool( + description = "Get/download an object from an S3 bucket - supports read mode for text files and download mode for all files" + )] + pub async fn get_object(&self, Parameters(req): Parameters) -> String { + info!( + "Executing get_object tool: s3://{}/{} (mode: {:?})", + req.bucket_name, req.object_key, req.mode + ); + + match req.mode { + GetObjectMode::Read => self.handle_read_mode(req).await, + GetObjectMode::Download => self.handle_download_mode(req).await, + } + } + + async fn handle_read_mode(&self, req: GetObjectRequest) -> String { + let options = GetObjectOptions { + version_id: req.version_id.clone(), + max_content_size: Some(req.max_content_size), + ..GetObjectOptions::default() + }; + + match self.s3_client.get_object(&req.bucket_name, &req.object_key, options).await { + Ok(result) => { + debug!( + "Successfully retrieved object s3://{}/{} ({} bytes)", + req.bucket_name, req.object_key, result.content_length + ); + + match result.detected_type { + DetectedFileType::Text => { + if let Some(ref text_content) = result.text_content { + format!( + "βœ… **Text file content retrieved!**\n\n\ + **S3 Location:** s3://{}/{}\n\ + **File Size:** {} bytes\n\ + **Content Type:** {}\n\n\ + **Content:**\n```\n{}\n```", + result.bucket, result.key, result.content_length, result.content_type, text_content + ) + } else { + format!( + "⚠️ **Text file detected but content could not be decoded!**\n\n\ + **S3 Location:** s3://{}/{}\n\ + **File Size:** {} bytes\n\ + **Content Type:** {}\n\n\ + **Note:** Could not decode file as UTF-8 text. \ + Try using download mode instead.", + result.bucket, result.key, result.content_length, result.content_type + ) + } + } + DetectedFileType::NonText(ref mime_type) => { + let file_category = if mime_type.starts_with("image/") { + "Image" + } else if mime_type.starts_with("audio/") { + "Audio" + } else if mime_type.starts_with("video/") { + "Video" + } else { + "Binary" + }; + + format!( + "⚠️ **Non-text file detected!**\n\n\ + **S3 Location:** s3://{}/{}\n\ + **File Type:** {} ({})\n\ + **File Size:** {} bytes ({:.2} MB)\n\n\ + **Note:** This file type cannot be displayed as text.\n\ + Please use download mode to save it to a local file:\n\n\ + ```json\n{{\n \"mode\": \"download\",\n \"local_path\": \"/path/to/save/file\"\n}}\n```", + result.bucket, + result.key, + file_category, + mime_type, + result.content_length, + result.content_length as f64 / 1_048_576.0 + ) + } + } + } + Err(e) => { + error!("Failed to read object s3://{}/{}: {:?}", req.bucket_name, req.object_key, e); + self.format_error_message(&req, e) + } + } + } + + async fn handle_download_mode(&self, req: GetObjectRequest) -> String { + let local_path = match req.local_path { + Some(ref path) => path, + None => { + return "❌ **Error:** local_path is required when using download mode.\n\n\ + **Example:**\n```json\n{\n \"mode\": \"download\",\n \"local_path\": \"/path/to/save/file.ext\"\n}\n```" + .to_string(); + } + }; + + let options = GetObjectOptions { + version_id: req.version_id.clone(), + ..GetObjectOptions::default() + }; + + match self + .s3_client + .download_object_to_file(&req.bucket_name, &req.object_key, local_path, options) + .await + { + Ok((bytes_downloaded, absolute_path)) => { + info!( + "Successfully downloaded object s3://{}/{} to {} ({} bytes)", + req.bucket_name, req.object_key, absolute_path, bytes_downloaded + ); + + format!( + "βœ… **File downloaded successfully!**\n\n\ + **S3 Location:** s3://{}/{}\n\ + **Local Path (requested):** {}\n\ + **Absolute Path:** {}\n\ + **File Size:** {} bytes ({:.2} MB)\n\n\ + **✨ File saved successfully!** You can now access it at:\n\ + `{}`", + req.bucket_name, + req.object_key, + local_path, + absolute_path, + bytes_downloaded, + bytes_downloaded as f64 / 1_048_576.0, + absolute_path + ) + } + Err(e) => { + error!( + "Failed to download object s3://{}/{} to {}: {:?}", + req.bucket_name, req.object_key, local_path, e + ); + + format!( + "❌ **Failed to download file from S3**\n\n\ + **S3 Location:** s3://{}/{}\n\ + **Local Path:** {}\n\ + **Error:** {}\n\n\ + **Possible causes:**\n\ + β€’ Object does not exist in the specified bucket\n\ + β€’ AWS credentials lack permissions to read this object\n\ + β€’ Cannot write to the specified local path\n\ + β€’ Insufficient disk space\n\ + β€’ Network connectivity issues\n\n\ + **Troubleshooting steps:**\n\ + 1. Verify the object exists using list_objects\n\ + 2. Check your AWS credentials and permissions\n\ + 3. Ensure the local directory exists and is writable\n\ + 4. Check available disk space", + req.bucket_name, req.object_key, local_path, e + ) + } + } + } + + fn format_error_message(&self, req: &GetObjectRequest, error: anyhow::Error) -> String { + format!( + "❌ **Failed to get object from S3 bucket '{}'**\n\n\ + **Object Key:** {}\n\ + **Mode:** {:?}\n\ + **Error:** {}\n\n\ + **Possible causes:**\n\ + β€’ Object does not exist in the specified bucket\n\ + β€’ AWS credentials lack permissions to read this object\n\ + β€’ Network connectivity issues\n\ + β€’ Object key contains invalid characters\n\ + β€’ Bucket does not exist or is not accessible\n\ + β€’ Object is in a different AWS region\n\ + β€’ Version ID is invalid (for versioned objects)\n\n\ + **Troubleshooting steps:**\n\ + 1. Verify the object exists using list_objects\n\ + 2. Check your AWS credentials and permissions\n\ + 3. Ensure the bucket name and object key are correct\n\ + 4. Try with a different object to test connectivity\n\ + 5. Check if the bucket has versioning enabled", + req.bucket_name, req.object_key, req.mode, error + ) + } + + #[tool(description = "Upload a local file to an S3 bucket")] + pub async fn upload_file(&self, Parameters(req): Parameters) -> String { + info!( + "Executing upload_file tool: '{}' -> s3://{}/{}", + req.local_file_path, req.bucket_name, req.object_key + ); + + let options = UploadFileOptions { + content_type: req.content_type.clone(), + storage_class: req.storage_class.clone(), + cache_control: req.cache_control.clone(), + ..UploadFileOptions::default() + }; + + match self + .s3_client + .upload_file(&req.local_file_path, &req.bucket_name, &req.object_key, options) + .await + { + Ok(result) => { + debug!( + "Successfully uploaded file '{}' to s3://{}/{} ({} bytes)", + req.local_file_path, req.bucket_name, req.object_key, result.file_size + ); + + let mut result_text = format!( + "βœ… **File uploaded successfully!**\n\n\ + **Local File:** {}\n\ + **S3 Location:** s3://{}/{}\n\ + **File Size:** {} bytes ({:.2} MB)\n\ + **Content Type:** {}\n\ + **ETag:** {}\n", + req.local_file_path, + result.bucket, + result.key, + result.file_size, + result.file_size as f64 / 1_048_576.0, + result.content_type, + result.etag + ); + + if let Some(ref version_id) = result.version_id { + result_text.push_str(&format!("**Version ID:** {version_id}\n")); + } + + result_text.push_str("\n---\n"); + result_text.push_str("**Upload Summary:**\n"); + result_text.push_str(&format!("β€’ Source: {}\n", req.local_file_path)); + result_text.push_str(&format!("β€’ Destination: {}\n", result.location)); + result_text.push_str(&format!("β€’ Size: {} bytes\n", result.file_size)); + result_text.push_str(&format!("β€’ Type: {}\n", result.content_type)); + + if result.file_size > 5 * 1024 * 1024 { + result_text.push_str("\nπŸ’‘ **Note:** Large file uploaded successfully. Consider using multipart upload for files larger than 100MB for better performance and reliability."); + } + + info!( + "upload_file tool executed successfully: {} bytes uploaded to s3://{}/{}", + result.file_size, req.bucket_name, req.object_key + ); + result_text + } + Err(e) => { + error!( + "Failed to upload file '{}' to s3://{}/{}: {:?}", + req.local_file_path, req.bucket_name, req.object_key, e + ); + + format!( + "❌ **Failed to upload file '{}' to S3 bucket '{}'**\n\n\ + **Error:** {}\n\n\ + **Possible causes:**\n\ + β€’ Local file does not exist or is not readable\n\ + β€’ AWS credentials lack permissions to upload to this bucket\n\ + β€’ S3 bucket does not exist or is not accessible\n\ + β€’ Network connectivity issues\n\ + β€’ File path contains invalid characters or is too long\n\ + β€’ Insufficient disk space or memory\n\ + β€’ Custom endpoint is misconfigured\n\ + β€’ File is locked by another process\n\n\ + **Troubleshooting steps:**\n\ + 1. Verify the local file exists and is readable\n\ + 2. Check your AWS credentials and permissions\n\ + 3. Ensure the bucket name is correct and accessible\n\ + 4. Try with a smaller file to test connectivity\n\ + 5. Check the file path for special characters\n\n\ + **File:** {}\n\ + **Bucket:** {}\n\ + **Object Key:** {}", + req.local_file_path, req.bucket_name, e, req.local_file_path, req.bucket_name, req.object_key + ) + } + } + } +} + +#[tool_handler(router = self.tool_router)] +impl ServerHandler for RustfsMcpServer { + fn get_info(&self) -> ServerInfo { + ServerInfo { + protocol_version: ProtocolVersion::V_2024_11_05, + capabilities: ServerCapabilities { + tools: Some(ToolsCapability { + list_changed: Some(false), + }), + ..Default::default() + }, + instructions: Some("RustFS MCP Server providing S3 operations through Model Context Protocol".into()), + server_info: Implementation { + name: "rustfs-mcp-server".into(), + version: env!("CARGO_PKG_VERSION").into(), + }, + } + } + + async fn ping(&self, _ctx: RequestContext) -> Result<(), ErrorData> { + info!("Received ping request"); + Ok(()) + } + + async fn on_initialized(&self, _ctx: NotificationContext) { + info!("Client initialized successfully"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_server_creation() { + let config = Config { + access_key_id: Some("test_key".to_string()), + secret_access_key: Some("test_secret".to_string()), + ..Config::default() + }; + + let result = RustfsMcpServer::new(config).await; + assert!(result.is_err() || result.is_ok()); + } + + #[test] + fn test_get_object_request_defaults() { + let request = GetObjectRequest { + bucket_name: "test-bucket".to_string(), + object_key: "test-key".to_string(), + version_id: None, + mode: default_operation_mode(), + local_path: None, + max_content_size: default_max_content_size(), + }; + + assert_eq!(request.bucket_name, "test-bucket"); + assert_eq!(request.object_key, "test-key"); + assert!(request.version_id.is_none()); + assert_eq!(request.mode, GetObjectMode::Read); + assert!(request.local_path.is_none()); + assert_eq!(request.max_content_size, 1024 * 1024); + } + + #[test] + fn test_get_object_request_serialization() { + let request = GetObjectRequest { + bucket_name: "test-bucket".to_string(), + object_key: "test-key".to_string(), + version_id: Some("version123".to_string()), + mode: GetObjectMode::Download, + local_path: Some("/path/to/file".to_string()), + max_content_size: 2048, + }; + + let json = serde_json::to_string(&request).unwrap(); + let deserialized: GetObjectRequest = serde_json::from_str(&json).unwrap(); + + assert_eq!(request.bucket_name, deserialized.bucket_name); + assert_eq!(request.object_key, deserialized.object_key); + assert_eq!(request.version_id, deserialized.version_id); + assert_eq!(request.mode, deserialized.mode); + assert_eq!(request.local_path, deserialized.local_path); + assert_eq!(request.max_content_size, deserialized.max_content_size); + } + + #[test] + fn test_get_object_request_serde_with_defaults() { + let json = r#"{ + "bucket_name": "test-bucket", + "object_key": "test-key" + }"#; + + let request: GetObjectRequest = serde_json::from_str(json).unwrap(); + assert_eq!(request.bucket_name, "test-bucket"); + assert_eq!(request.object_key, "test-key"); + assert!(request.version_id.is_none()); + assert_eq!(request.mode, GetObjectMode::Read); + assert!(request.local_path.is_none()); + assert_eq!(request.max_content_size, 1024 * 1024); + } + + #[test] + fn test_default_functions() { + assert_eq!(default_operation_mode(), GetObjectMode::Read); + assert_eq!(default_max_content_size(), 1024 * 1024); + } + + #[test] + fn test_get_object_mode_serialization() { + let read_mode = GetObjectMode::Read; + let download_mode = GetObjectMode::Download; + + let read_json = serde_json::to_string(&read_mode).unwrap(); + let download_json = serde_json::to_string(&download_mode).unwrap(); + + assert_eq!(read_json, r#""read""#); + assert_eq!(download_json, r#""download""#); + + let read_mode_deser: GetObjectMode = serde_json::from_str(r#""read""#).unwrap(); + let download_mode_deser: GetObjectMode = serde_json::from_str(r#""download""#).unwrap(); + + assert_eq!(read_mode_deser, GetObjectMode::Read); + assert_eq!(download_mode_deser, GetObjectMode::Download); + } +}