From e145586b65b63390232b5c37636de70bb64facff Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 19 Jun 2025 17:06:54 +0800 Subject: [PATCH] refactor: reorganize RPC modules into unified structure --- Cargo.lock | 5 + crates/rio/src/http_reader.rs | 1 + crates/utils/Cargo.toml | 8 +- crates/utils/src/net.rs | 26 +- crates/utils/src/string.rs | 38 ++ ecstore/Cargo.toml | 2 + ecstore/src/cmd/bucket_replication.rs | 2 +- ecstore/src/cmd/bucket_targets.rs | 8 +- ecstore/src/disk/mod.rs | 3 +- ecstore/src/erasure_coding/decode.rs | 2 +- ecstore/src/global.rs | 50 ++- ecstore/src/heal/data_scanner.rs | 2 +- ecstore/src/lib.rs | 5 +- ecstore/src/notification_sys.rs | 2 +- ecstore/src/rpc/http_auth.rs | 375 ++++++++++++++++++ ecstore/src/rpc/mod.rs | 11 + ecstore/src/{ => rpc}/peer_rest_client.rs | 0 .../src/{peer.rs => rpc/peer_s3_client.rs} | 59 +-- .../{disk/remote.rs => rpc/remote_disk.rs} | 91 ++--- .../src/rpc/tonic_service.rs | 5 +- ecstore/src/store.rs | 2 +- ecstore/src/store_list_objects.rs | 2 +- ecstore/src/store_utils.rs | 60 +++ iam/src/lib.rs | 34 -- iam/src/manager.rs | 11 +- iam/src/store/object.rs | 2 +- iam/src/sys.rs | 4 +- rustfs/src/admin/handlers.rs | 14 +- rustfs/src/admin/handlers/group.rs | 6 +- rustfs/src/admin/handlers/policys.rs | 4 +- rustfs/src/admin/handlers/service_account.rs | 8 +- rustfs/src/admin/handlers/trace.rs | 2 +- rustfs/src/admin/handlers/user.rs | 14 +- rustfs/src/admin/router.rs | 79 +--- rustfs/src/admin/rpc.rs | 2 +- rustfs/src/auth.rs | 5 +- rustfs/src/main.rs | 6 +- rustfs/src/storage/ecfs.rs | 26 +- 38 files changed, 655 insertions(+), 321 deletions(-) create mode 100644 ecstore/src/rpc/http_auth.rs create mode 100644 ecstore/src/rpc/mod.rs rename ecstore/src/{ => rpc}/peer_rest_client.rs (100%) rename ecstore/src/{peer.rs => rpc/peer_s3_client.rs} (93%) rename ecstore/src/{disk/remote.rs => rpc/remote_disk.rs} (95%) rename rustfs/src/grpc.rs => ecstore/src/rpc/tonic_service.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index ca279015..11bcb955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3631,6 +3631,7 @@ dependencies = [ "criterion", "flatbuffers 25.2.10", "futures", + "futures-util", "glob", "hex-simd", "highway", @@ -3664,6 +3665,7 @@ dependencies = [ "s3s", "serde", "serde_json", + "serde_urlencoded", "sha2 0.10.9", "shadow-rs", "siphasher 1.0.1", @@ -8494,8 +8496,10 @@ dependencies = [ "base64-simd", "blake3", "brotli 8.0.1", + "bytes", "crc32fast", "flate2", + "futures", "hex-simd", "highway", "lazy_static", @@ -8517,6 +8521,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "transform-stream", "url", "winapi", "zstd", diff --git a/crates/rio/src/http_reader.rs b/crates/rio/src/http_reader.rs index 62c39c1c..e42c94f9 100644 --- a/crates/rio/src/http_reader.rs +++ b/crates/rio/src/http_reader.rs @@ -40,6 +40,7 @@ pin_project! { url:String, method: Method, headers: HeaderMap, + #[pin] inner: StreamReader>+Send+Sync>>, Bytes>, } } diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index e3f8b831..216a11cf 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -34,6 +34,10 @@ brotli = { workspace = true , optional = true} zstd = { workspace = true , optional = true} snap = { workspace = true , optional = true} lz4 = { workspace = true , optional = true} +rand = { workspace = true, optional = true } +futures= { workspace = true, optional = true } +transform-stream= { workspace = true, optional = true } +bytes= { workspace = true, optional = true } [dev-dependencies] tempfile = { workspace = true } @@ -49,11 +53,11 @@ workspace = true default = ["ip"] # features that are enabled by default ip = ["dep:local-ip-address"] # ip characteristics and their dependencies tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies -net = ["ip","dep:url", "dep:netif", "dep:lazy_static"] # empty network features +net = ["ip","dep:url", "dep:netif", "dep:lazy_static", "dep:futures", "dep:transform-stream", "dep:bytes"] # empty network features io = ["dep:tokio"] path = [] compress =["dep:flate2","dep:brotli","dep:snap","dep:lz4","dep:zstd"] -string = ["dep:regex","dep:lazy_static"] +string = ["dep:regex","dep:lazy_static","dep:rand"] crypto = ["dep:base64-simd","dep:hex-simd"] hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher"] os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities diff --git a/crates/utils/src/net.rs b/crates/utils/src/net.rs index 79944ff2..81bb9f9d 100644 --- a/crates/utils/src/net.rs +++ b/crates/utils/src/net.rs @@ -1,10 +1,13 @@ +use bytes::Bytes; +use futures::pin_mut; +use futures::{Stream, StreamExt}; use lazy_static::lazy_static; use std::{ collections::HashSet, fmt::Display, net::{IpAddr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs}, }; - +use transform_stream::AsyncTryStream; use url::Host; lazy_static! { @@ -167,6 +170,27 @@ pub fn parse_and_resolve_address(addr_str: &str) -> std::io::Result Ok(resolved_addr) } +#[allow(dead_code)] +pub fn bytes_stream(stream: S, content_length: usize) -> impl Stream> + Send + 'static +where + S: Stream> + Send + 'static, + E: Send + 'static, +{ + AsyncTryStream::::new(|mut y| async move { + pin_mut!(stream); + let mut remaining: usize = content_length; + while let Some(result) = stream.next().await { + let mut bytes = result?; + if bytes.len() > remaining { + bytes.truncate(remaining); + } + remaining -= bytes.len(); + y.yield_ok(bytes).await; + } + Ok(()) + }) +} + #[cfg(test)] mod test { use std::net::{Ipv4Addr, Ipv6Addr}; diff --git a/crates/utils/src/string.rs b/crates/utils/src/string.rs index 096287e9..a3420572 100644 --- a/crates/utils/src/string.rs +++ b/crates/utils/src/string.rs @@ -1,4 +1,5 @@ use lazy_static::*; +use rand::{Rng, RngCore}; use regex::Regex; use std::io::{Error, Result}; @@ -306,6 +307,43 @@ pub fn parse_ellipses_range(pattern: &str) -> Result> { Ok(ret) } +pub fn gen_access_key(length: usize) -> Result { + const ALPHA_NUMERIC_TABLE: [char; 36] = [ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', + 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', + ]; + + if length < 3 { + return Err(Error::other("access key length is too short")); + } + + let mut result = String::with_capacity(length); + let mut rng = rand::rng(); + + for _ in 0..length { + result.push(ALPHA_NUMERIC_TABLE[rng.random_range(0..ALPHA_NUMERIC_TABLE.len())]); + } + + Ok(result) +} + +pub fn gen_secret_key(length: usize) -> Result { + use base64_simd::URL_SAFE_NO_PAD; + + if length < 8 { + return Err(Error::other("secret key length is too short")); + } + let mut rng = rand::rng(); + + let mut key = vec![0u8; URL_SAFE_NO_PAD.estimated_decoded_length(length)]; + rng.fill_bytes(&mut key); + + let encoded = URL_SAFE_NO_PAD.encode_to_string(&key); + let key_str = encoded.replace("/", "+"); + + Ok(key_str) +} + #[cfg(test)] mod tests { use super::*; diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index e19ef229..a1b7288c 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -82,6 +82,8 @@ shadow-rs.workspace = true rustfs-filemeta.workspace = true rustfs-utils ={workspace = true, features=["full"]} rustfs-rio.workspace = true +futures-util.workspace = true +serde_urlencoded.workspace = true [target.'cfg(not(windows))'.dependencies] nix = { workspace = true } diff --git a/ecstore/src/cmd/bucket_replication.rs b/ecstore/src/cmd/bucket_replication.rs index 455f38cc..14a36f6a 100644 --- a/ecstore/src/cmd/bucket_replication.rs +++ b/ecstore/src/cmd/bucket_replication.rs @@ -6,7 +6,7 @@ use crate::bucket::metadata_sys::get_replication_config; use crate::bucket::versioning_sys::BucketVersioningSys; use crate::error::Error; use crate::new_object_layer_fn; -use crate::peer::RemotePeerS3Client; +use crate::rpc::RemotePeerS3Client; use crate::store; use crate::store_api::ObjectIO; use crate::store_api::ObjectInfo; diff --git a/ecstore/src/cmd/bucket_targets.rs b/ecstore/src/cmd/bucket_targets.rs index b343a487..621cd2cf 100644 --- a/ecstore/src/cmd/bucket_targets.rs +++ b/ecstore/src/cmd/bucket_targets.rs @@ -4,11 +4,11 @@ use crate::{ StorageAPI, bucket::{metadata_sys, target::BucketTarget}, endpoints::Node, - peer::{PeerS3Client, RemotePeerS3Client}, + rpc::{PeerS3Client, RemotePeerS3Client}, }; use crate::{ bucket::{self, target::BucketTargets}, - new_object_layer_fn, peer, store_api, + new_object_layer_fn, store_api, }; //use tokio::sync::RwLock; use aws_sdk_s3::Client as S3Client; @@ -24,7 +24,7 @@ use tokio::sync::RwLock; pub struct TClient { pub s3cli: S3Client, - pub remote_peer_client: peer::RemotePeerS3Client, + pub remote_peer_client: RemotePeerS3Client, pub arn: String, } impl TClient { @@ -444,7 +444,7 @@ impl BucketTargetSys { grid_host: "".to_string(), }; - let cli = peer::RemotePeerS3Client::new(Some(node), None); + let cli = RemotePeerS3Client::new(Some(node), None); match cli .get_bucket_info(&tgt.target_bucket, &store_api::BucketOptions::default()) diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index ee395e29..8d446865 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -6,7 +6,6 @@ pub mod format; pub mod fs; pub mod local; pub mod os; -pub mod remote; pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys"; pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart"; @@ -22,13 +21,13 @@ use crate::heal::{ data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::{HealScanMode, HealingTracker}, }; +use crate::rpc::RemoteDisk; use bytes::Bytes; use endpoint::Endpoint; use error::DiskError; use error::{Error, Result}; use local::LocalDisk; use madmin::info_commands::DiskMetrics; -use remote::RemoteDisk; use rustfs_filemeta::{FileInfo, RawFileInfo}; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, path::PathBuf, sync::Arc}; diff --git a/ecstore/src/erasure_coding/decode.rs b/ecstore/src/erasure_coding/decode.rs index 5c2d6e23..ae00edbd 100644 --- a/ecstore/src/erasure_coding/decode.rs +++ b/ecstore/src/erasure_coding/decode.rs @@ -102,7 +102,7 @@ where } } Err(e) => { - error!("Error reading shard {}: {}", i, e); + // error!("Error reading shard {}: {}", i, e); errs[i] = Some(e); } } diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index c60c6c59..7f8011bd 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -1,12 +1,3 @@ -use lazy_static::lazy_static; -use std::{ - collections::HashMap, - sync::{Arc, OnceLock}, - time::SystemTime, -}; -use tokio::sync::{OnceCell, RwLock}; -use uuid::Uuid; - use crate::heal::mrf::MRFState; use crate::{ disk::DiskStore, @@ -14,6 +5,15 @@ use crate::{ heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState}, store::ECStore, }; +use lazy_static::lazy_static; +use policy::auth::Credentials; +use std::{ + collections::HashMap, + sync::{Arc, OnceLock}, + time::SystemTime, +}; +use tokio::sync::{OnceCell, RwLock}; +use uuid::Uuid; pub const DISK_ASSUME_UNKNOWN_SIZE: u64 = 1 << 30; pub const DISK_MIN_INODES: u64 = 1000; @@ -39,6 +39,38 @@ lazy_static! { pub static ref GLOBAL_BOOT_TIME: OnceCell = OnceCell::new(); } +static GLOBAL_ACTIVE_CRED: OnceLock = OnceLock::new(); + +pub fn init_global_action_cred(ak: Option, sk: Option) { + let ak = { + if let Some(k) = ak { + k + } else { + rustfs_utils::string::gen_access_key(20).unwrap_or_default() + } + }; + + let sk = { + if let Some(k) = sk { + k + } else { + rustfs_utils::string::gen_secret_key(32).unwrap_or_default() + } + }; + + GLOBAL_ACTIVE_CRED + .set(Credentials { + access_key: ak, + secret_key: sk, + ..Default::default() + }) + .unwrap(); +} + +pub fn get_global_action_cred() -> Option { + GLOBAL_ACTIVE_CRED.get().cloned() +} + /// Get the global rustfs port pub fn global_rustfs_port() -> u16 { if let Some(p) = GLOBAL_RUSTFS_PORT.get() { diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 65eb1960..680334ea 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -41,8 +41,8 @@ use crate::{ heal_ops::{BG_HEALING_UUID, HealSource}, }, new_object_layer_fn, - peer::is_reserved_or_invalid_bucket, store::ECStore, + store_utils::is_reserved_or_invalid_bucket, }; use crate::{disk::DiskAPI, store_api::ObjectInfo}; use crate::{ diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index f60330ad..d5cef9d9 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -15,17 +15,16 @@ pub mod global; pub mod heal; pub mod metrics_realtime; pub mod notification_sys; -pub mod peer; -pub mod peer_rest_client; pub mod pools; pub mod rebalance; +pub mod rpc; pub mod set_disk; mod sets; pub mod store; pub mod store_api; mod store_init; pub mod store_list_objects; -mod store_utils; +pub mod store_utils; pub use global::new_object_layer_fn; pub use global::set_global_endpoints; diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index 232de8ab..054c66cb 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -2,7 +2,7 @@ use crate::StorageAPI; use crate::admin_server_info::get_commit_id; use crate::error::{Error, Result}; use crate::global::{GLOBAL_BOOT_TIME, get_global_endpoints}; -use crate::peer_rest_client::PeerRestClient; +use crate::rpc::PeerRestClient; use crate::{endpoints::EndpointServerPools, new_object_layer_fn}; use futures::future::join_all; use lazy_static::lazy_static; diff --git a/ecstore/src/rpc/http_auth.rs b/ecstore/src/rpc/http_auth.rs new file mode 100644 index 00000000..1268932d --- /dev/null +++ b/ecstore/src/rpc/http_auth.rs @@ -0,0 +1,375 @@ +use crate::global::get_global_action_cred; +use base64::Engine as _; +use base64::engine::general_purpose; +use hmac::{Hmac, Mac}; +use http::HeaderMap; +use http::HeaderValue; +use http::Method; +use http::Uri; +use sha2::Sha256; +use time::OffsetDateTime; +use tracing::error; + +type HmacSha256 = Hmac; + +const SIGNATURE_HEADER: &str = "x-rustfs-signature"; +const TIMESTAMP_HEADER: &str = "x-rustfs-timestamp"; +const SIGNATURE_VALID_DURATION: i64 = 300; // 5 minutes + +/// Get the shared secret for HMAC signing +fn get_shared_secret() -> String { + if let Some(cred) = get_global_action_cred() { + cred.secret_key + } else { + // Fallback to environment variable if global credentials are not available + std::env::var("RUSTFS_RPC_SECRET").unwrap_or_else(|_| "rustfs-default-secret".to_string()) + } +} + +/// Generate HMAC-SHA256 signature for the given data +fn generate_signature(secret: &str, url: &str, method: &Method, timestamp: i64) -> String { + let uri: Uri = url.parse().expect("Invalid URL"); + + let path_and_query = uri.path_and_query().unwrap(); + + let url = path_and_query.to_string(); + + let data = format!("{}|{}|{}", url, method, timestamp); + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + mac.update(data.as_bytes()); + let result = mac.finalize(); + general_purpose::STANDARD.encode(result.into_bytes()) +} + +/// Build headers with authentication signature +pub fn build_auth_headers(url: &str, method: &Method, headers: &mut HeaderMap) { + let secret = get_shared_secret(); + let timestamp = OffsetDateTime::now_utc().unix_timestamp(); + + let signature = generate_signature(&secret, url, method, timestamp); + + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap()); + headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(×tamp.to_string()).unwrap()); +} + +/// Verify the request signature for RPC requests +pub fn verify_rpc_signature(url: &str, method: &Method, headers: &HeaderMap) -> std::io::Result<()> { + let secret = get_shared_secret(); + + // Get signature from header + let signature = headers + .get(SIGNATURE_HEADER) + .and_then(|v| v.to_str().ok()) + .ok_or_else(|| std::io::Error::other("Missing signature header"))?; + + // Get timestamp from header + let timestamp_str = headers + .get(TIMESTAMP_HEADER) + .and_then(|v| v.to_str().ok()) + .ok_or_else(|| std::io::Error::other("Missing timestamp header"))?; + + let timestamp: i64 = timestamp_str + .parse() + .map_err(|_| std::io::Error::other("Invalid timestamp format"))?; + + // Check timestamp validity (prevent replay attacks) + let current_time = OffsetDateTime::now_utc().unix_timestamp(); + + if current_time.saturating_sub(timestamp) > SIGNATURE_VALID_DURATION { + return Err(std::io::Error::other("Request timestamp expired")); + } + + // Generate expected signature + + let expected_signature = generate_signature(&secret, url, method, timestamp); + + // Compare signatures + if signature != expected_signature { + error!( + "verify_rpc_signature: Invalid signature: secret {}, url {}, method {}, timestamp {}, signature {}, expected_signature {}", + secret, url, method, timestamp, signature, expected_signature + ); + + return Err(std::io::Error::other("Invalid signature")); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use http::{HeaderMap, Method}; + use time::OffsetDateTime; + + #[test] + fn test_get_shared_secret() { + let secret = get_shared_secret(); + assert!(!secret.is_empty(), "Secret should not be empty"); + + let url = "http://node1:7000/rustfs/rpc/read_file_stream?disk=http%3A%2F%2Fnode1%3A7000%2Fdata%2Frustfs3&volume=.rustfs.sys&path=pool.bin%2Fdd0fd773-a962-4265-b543-783ce83953e9%2Fpart.1&offset=0&length=44"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + build_auth_headers(url, &method, &mut headers); + + let url = "/rustfs/rpc/read_file_stream?disk=http%3A%2F%2Fnode1%3A7000%2Fdata%2Frustfs3&volume=.rustfs.sys&path=pool.bin%2Fdd0fd773-a962-4265-b543-783ce83953e9%2Fpart.1&offset=0&length=44"; + + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_ok(), "Valid signature should pass verification"); + } + + #[test] + fn test_generate_signature_deterministic() { + let secret = "test-secret"; + let url = "http://example.com/api/test"; + let method = Method::GET; + let timestamp = 1640995200; // Fixed timestamp + + let signature1 = generate_signature(secret, url, &method, timestamp); + let signature2 = generate_signature(secret, url, &method, timestamp); + + assert_eq!(signature1, signature2, "Same inputs should produce same signature"); + assert!(!signature1.is_empty(), "Signature should not be empty"); + } + + #[test] + fn test_generate_signature_different_inputs() { + let secret = "test-secret"; + let url = "http://example.com/api/test"; + let method = Method::GET; + let timestamp = 1640995200; + + let signature1 = generate_signature(secret, url, &method, timestamp); + let signature2 = generate_signature(secret, "http://different.com/api/test2", &method, timestamp); + let signature3 = generate_signature(secret, url, &Method::POST, timestamp); + let signature4 = generate_signature(secret, url, &method, timestamp + 1); + + assert_ne!(signature1, signature2, "Different URLs should produce different signatures"); + assert_ne!(signature1, signature3, "Different methods should produce different signatures"); + assert_ne!(signature1, signature4, "Different timestamps should produce different signatures"); + } + + #[test] + fn test_build_auth_headers() { + let url = "http://example.com/api/test"; + let method = Method::POST; + let mut headers = HeaderMap::new(); + + build_auth_headers(url, &method, &mut headers); + + // Verify headers are present + assert!(headers.contains_key(SIGNATURE_HEADER), "Should contain signature header"); + assert!(headers.contains_key(TIMESTAMP_HEADER), "Should contain timestamp header"); + + // Verify header values are not empty + let signature = headers.get(SIGNATURE_HEADER).unwrap().to_str().unwrap(); + let timestamp_str = headers.get(TIMESTAMP_HEADER).unwrap().to_str().unwrap(); + + assert!(!signature.is_empty(), "Signature should not be empty"); + assert!(!timestamp_str.is_empty(), "Timestamp should not be empty"); + + // Verify timestamp is a valid integer + let timestamp: i64 = timestamp_str.parse().expect("Timestamp should be valid integer"); + let current_time = OffsetDateTime::now_utc().unix_timestamp(); + + // Should be within a reasonable range (within 1 second of current time) + assert!((current_time - timestamp).abs() <= 1, "Timestamp should be close to current time"); + } + + #[test] + fn test_verify_rpc_signature_success() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + // Build headers with valid signature + build_auth_headers(url, &method, &mut headers); + + // Verify should succeed + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_ok(), "Valid signature should pass verification"); + } + + #[test] + fn test_verify_rpc_signature_invalid_signature() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + // Build headers with valid signature first + build_auth_headers(url, &method, &mut headers); + + // Tamper with the signature + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str("invalid-signature").unwrap()); + + // Verify should fail + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_err(), "Invalid signature should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Invalid signature"); + } + + #[test] + fn test_verify_rpc_signature_expired_timestamp() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + // Set expired timestamp (older than SIGNATURE_VALID_DURATION) + let expired_timestamp = OffsetDateTime::now_utc().unix_timestamp() - SIGNATURE_VALID_DURATION - 10; + let secret = get_shared_secret(); + let signature = generate_signature(&secret, url, &method, expired_timestamp); + + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap()); + headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&expired_timestamp.to_string()).unwrap()); + + // Verify should fail due to expired timestamp + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_err(), "Expired timestamp should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Request timestamp expired"); + } + + #[test] + fn test_verify_rpc_signature_missing_signature_header() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + // Add only timestamp header, missing signature + let timestamp = OffsetDateTime::now_utc().unix_timestamp(); + headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(×tamp.to_string()).unwrap()); + + // Verify should fail + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_err(), "Missing signature header should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Missing signature header"); + } + + #[test] + fn test_verify_rpc_signature_missing_timestamp_header() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + // Add only signature header, missing timestamp + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str("some-signature").unwrap()); + + // Verify should fail + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_err(), "Missing timestamp header should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Missing timestamp header"); + } + + #[test] + fn test_verify_rpc_signature_invalid_timestamp_format() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str("some-signature").unwrap()); + headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str("invalid-timestamp").unwrap()); + + // Verify should fail + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_err(), "Invalid timestamp format should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Invalid timestamp format"); + } + + #[test] + fn test_verify_rpc_signature_url_mismatch() { + let original_url = "http://example.com/api/test"; + let different_url = "http://example.com/api/different"; + let method = Method::GET; + let mut headers = HeaderMap::new(); + + // Build headers for one URL + build_auth_headers(original_url, &method, &mut headers); + + // Try to verify with a different URL + let result = verify_rpc_signature(different_url, &method, &headers); + assert!(result.is_err(), "URL mismatch should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Invalid signature"); + } + + #[test] + fn test_verify_rpc_signature_method_mismatch() { + let url = "http://example.com/api/test"; + let original_method = Method::GET; + let different_method = Method::POST; + let mut headers = HeaderMap::new(); + + // Build headers for one method + build_auth_headers(url, &original_method, &mut headers); + + // Try to verify with a different method + let result = verify_rpc_signature(url, &different_method, &headers); + assert!(result.is_err(), "Method mismatch should fail verification"); + + let error = result.unwrap_err(); + assert_eq!(error.to_string(), "Invalid signature"); + } + + #[test] + fn test_signature_valid_duration_boundary() { + let url = "http://example.com/api/test"; + let method = Method::GET; + let secret = get_shared_secret(); + + let mut headers = HeaderMap::new(); + let current_time = OffsetDateTime::now_utc().unix_timestamp(); + // Test timestamp just within valid duration + let valid_timestamp = current_time - SIGNATURE_VALID_DURATION + 1; + + let signature = generate_signature(&secret, url, &method, valid_timestamp); + + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap()); + headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&valid_timestamp.to_string()).unwrap()); + + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_ok(), "Timestamp within valid duration should pass"); + + // Test timestamp just outside valid duration + let mut headers = HeaderMap::new(); + let invalid_timestamp = current_time - SIGNATURE_VALID_DURATION - 15; + let signature = generate_signature(&secret, url, &method, invalid_timestamp); + + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap()); + headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&invalid_timestamp.to_string()).unwrap()); + + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_err(), "Timestamp outside valid duration should fail"); + } + + #[test] + fn test_round_trip_authentication() { + let test_cases = vec![ + ("http://example.com/api/test", Method::GET), + ("https://api.rustfs.com/v1/bucket", Method::POST), + ("http://localhost:9000/admin/info", Method::PUT), + ("https://storage.example.com/path/to/object?query=param", Method::DELETE), + ]; + + for (url, method) in test_cases { + let mut headers = HeaderMap::new(); + + // Build authentication headers + build_auth_headers(url, &method, &mut headers); + + // Verify the signature should succeed + let result = verify_rpc_signature(url, &method, &headers); + assert!(result.is_ok(), "Round-trip test failed for {} {}", method, url); + } + } +} diff --git a/ecstore/src/rpc/mod.rs b/ecstore/src/rpc/mod.rs new file mode 100644 index 00000000..e95032ab --- /dev/null +++ b/ecstore/src/rpc/mod.rs @@ -0,0 +1,11 @@ +mod http_auth; +mod peer_rest_client; +mod peer_s3_client; +mod remote_disk; +mod tonic_service; + +pub use http_auth::{build_auth_headers, verify_rpc_signature}; +pub use peer_rest_client::PeerRestClient; +pub use peer_s3_client::{LocalPeerS3Client, PeerS3Client, RemotePeerS3Client, S3PeerSys}; +pub use remote_disk::RemoteDisk; +pub use tonic_service::make_server; diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/rpc/peer_rest_client.rs similarity index 100% rename from ecstore/src/peer_rest_client.rs rename to ecstore/src/rpc/peer_rest_client.rs diff --git a/ecstore/src/peer.rs b/ecstore/src/rpc/peer_s3_client.rs similarity index 93% rename from ecstore/src/peer.rs rename to ecstore/src/rpc/peer_s3_client.rs index 4ffba975..37992dbe 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/rpc/peer_s3_client.rs @@ -8,6 +8,7 @@ use crate::heal::heal_commands::{ }; use crate::heal::heal_ops::RUSTFS_RESERVED_BUCKET; use crate::store::all_local_disk; +use crate::store_utils::is_reserved_or_invalid_bucket; use crate::{ disk::{self, VolumeInfo}, endpoints::{EndpointServerPools, Node}, @@ -20,7 +21,6 @@ use protos::node_service_time_out_client; use protos::proto_gen::node_service::{ DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest, }; -use regex::Regex; use std::{collections::HashMap, fmt::Debug, sync::Arc}; use tokio::sync::RwLock; use tonic::Request; @@ -622,63 +622,6 @@ impl PeerS3Client for RemotePeerS3Client { } } -// 检查桶名是否有效 -fn check_bucket_name(bucket_name: &str, strict: bool) -> Result<()> { - if bucket_name.trim().is_empty() { - return Err(Error::other("Bucket name cannot be empty")); - } - if bucket_name.len() < 3 { - return Err(Error::other("Bucket name cannot be shorter than 3 characters")); - } - if bucket_name.len() > 63 { - return Err(Error::other("Bucket name cannot be longer than 63 characters")); - } - - let ip_address_regex = Regex::new(r"^(\d+\.){3}\d+$").unwrap(); - if ip_address_regex.is_match(bucket_name) { - return Err(Error::other("Bucket name cannot be an IP address")); - } - - let valid_bucket_name_regex = if strict { - Regex::new(r"^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$").unwrap() - } else { - Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\.\-_:]{1,61}[A-Za-z0-9]$").unwrap() - }; - - if !valid_bucket_name_regex.is_match(bucket_name) { - return Err(Error::other("Bucket name contains invalid characters")); - } - - // 检查包含 "..", ".-", "-." - if bucket_name.contains("..") || bucket_name.contains(".-") || bucket_name.contains("-.") { - return Err(Error::other("Bucket name contains invalid characters")); - } - - Ok(()) -} - -// 检查是否为 元数据桶 -fn is_meta_bucket(bucket_name: &str) -> bool { - bucket_name == disk::RUSTFS_META_BUCKET -} - -// 检查是否为 保留桶 -fn is_reserved_bucket(bucket_name: &str) -> bool { - bucket_name == "rustfs" -} - -// 检查桶名是否为保留名或无效名 -pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool { - if bucket_entry.is_empty() { - return true; - } - - let bucket_entry = bucket_entry.trim_end_matches('/'); - let result = check_bucket_name(bucket_entry, strict).is_err(); - - result || is_meta_bucket(bucket_entry) || is_reserved_bucket(bucket_entry) -} - pub async fn heal_bucket_local(bucket: &str, opts: &HealOpts) -> Result { let disks = clone_drives().await; let before_state = Arc::new(RwLock::new(vec![String::new(); disks.len()])); diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/rpc/remote_disk.rs similarity index 95% rename from ecstore/src/disk/remote.rs rename to ecstore/src/rpc/remote_disk.rs index dae2d3a8..d8376309 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/rpc/remote_disk.rs @@ -13,13 +13,25 @@ use protos::{ }, }; +use crate::disk::{ + CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions, + ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, + endpoint::Endpoint, +}; +use crate::{ + disk::error::{Error, Result}, + rpc::build_auth_headers, +}; +use crate::{ + disk::{FileReader, FileWriter}, + heal::{ + data_scanner::ShouldSleepFn, + data_usage_cache::{DataUsageCache, DataUsageEntry}, + heal_commands::{HealScanMode, HealingTracker}, + }, +}; use rustfs_filemeta::{FileInfo, RawFileInfo}; use rustfs_rio::{HttpReader, HttpWriter}; - -use base64::{Engine as _, engine::general_purpose}; -use hmac::{Hmac, Mac}; -use sha2::Sha256; -use std::time::{SystemTime, UNIX_EPOCH}; use tokio::{ io::AsyncWrite, sync::mpsc::{self, Sender}, @@ -29,26 +41,8 @@ use tonic::Request; use tracing::info; use uuid::Uuid; -use super::error::{Error, Result}; -use super::{ - CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions, - ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, - endpoint::Endpoint, -}; - -use crate::{ - disk::{FileReader, FileWriter}, - heal::{ - data_scanner::ShouldSleepFn, - data_usage_cache::{DataUsageCache, DataUsageEntry}, - heal_commands::{HealScanMode, HealingTracker}, - }, -}; - use protos::proto_gen::node_service::RenamePartRequest; -type HmacSha256 = Hmac; - #[derive(Debug)] pub struct RemoteDisk { pub id: Mutex>, @@ -75,35 +69,6 @@ impl RemoteDisk { endpoint: ep.clone(), }) } - - /// Get the shared secret for HMAC signing - fn get_shared_secret() -> String { - std::env::var("RUSTFS_RPC_SECRET").unwrap_or_else(|_| "rustfs-default-secret".to_string()) - } - - /// Generate HMAC-SHA256 signature for the given data - fn generate_signature(secret: &str, url: &str, method: &str, timestamp: u64) -> String { - let data = format!("{}|{}|{}", url, method, timestamp); - let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); - mac.update(data.as_bytes()); - let result = mac.finalize(); - general_purpose::STANDARD.encode(result.into_bytes()) - } - - /// Build headers with authentication signature - fn build_auth_headers(&self, url: &str, method: &Method, base_headers: Option) -> HeaderMap { - let mut headers = base_headers.unwrap_or_default(); - - let secret = Self::get_shared_secret(); - let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - - let signature = Self::generate_signature(&secret, url, method.as_str(), timestamp); - - headers.insert("x-rustfs-signature", HeaderValue::from_str(&signature).unwrap()); - headers.insert("x-rustfs-timestamp", HeaderValue::from_str(×tamp.to_string()).unwrap()); - - headers - } } // TODO: all api need to handle errors @@ -614,9 +579,9 @@ impl DiskAPI for RemoteDisk { let opts = serde_json::to_vec(&opts)?; - let mut base_headers = HeaderMap::new(); - base_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - let headers = self.build_auth_headers(&url, &Method::GET, Some(base_headers)); + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + build_auth_headers(&url, &Method::GET, &mut headers); let mut reader = HttpReader::new(url, Method::GET, headers, Some(opts)).await?; @@ -639,7 +604,9 @@ impl DiskAPI for RemoteDisk { 0 ); - let headers = self.build_auth_headers(&url, &Method::GET, None); + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + build_auth_headers(&url, &Method::GET, &mut headers); Ok(Box::new(HttpReader::new(url, Method::GET, headers, None).await?)) } @@ -663,7 +630,9 @@ impl DiskAPI for RemoteDisk { length ); - let headers = self.build_auth_headers(&url, &Method::GET, None); + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + build_auth_headers(&url, &Method::GET, &mut headers); Ok(Box::new(HttpReader::new(url, Method::GET, headers, None).await?)) } @@ -681,7 +650,9 @@ impl DiskAPI for RemoteDisk { 0 ); - let headers = self.build_auth_headers(&url, &Method::PUT, None); + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + build_auth_headers(&url, &Method::PUT, &mut headers); Ok(Box::new(HttpWriter::new(url, Method::PUT, headers).await?)) } @@ -705,7 +676,9 @@ impl DiskAPI for RemoteDisk { file_size ); - let headers = self.build_auth_headers(&url, &Method::PUT, None); + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + build_auth_headers(&url, &Method::PUT, &mut headers); Ok(Box::new(HttpWriter::new(url, Method::PUT, headers).await?)) } diff --git a/rustfs/src/grpc.rs b/ecstore/src/rpc/tonic_service.rs similarity index 99% rename from rustfs/src/grpc.rs rename to ecstore/src/rpc/tonic_service.rs index 95ca12c2..d7a47aa0 100644 --- a/rustfs/src/grpc.rs +++ b/ecstore/src/rpc/tonic_service.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, io::Cursor, pin::Pin}; // use common::error::Error as EcsError; -use ecstore::{ +use crate::{ admin_server_info::get_local_server_property, bucket::{metadata::load_bucket_metadata, metadata_sys}, disk::{ @@ -14,7 +14,7 @@ use ecstore::{ }, metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics}, new_object_layer_fn, - peer::{LocalPeerS3Client, PeerS3Client}, + rpc::{LocalPeerS3Client, PeerS3Client}, store::{all_local_disk_path, find_local_disk}, store_api::{BucketOptions, DeleteBucketOptions, MakeBucketOptions, StorageAPI}, }; @@ -226,7 +226,6 @@ impl Node for NodeService { } }; - println!("bucket info {}", bucket_info.clone()); Ok(tonic::Response::new(GetBucketInfoResponse { success: true, bucket_info, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 29aaf398..402a751d 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -29,7 +29,7 @@ use crate::{ bucket::metadata::BucketMetadata, disk::{BUCKET_META_PREFIX, DiskOption, DiskStore, RUSTFS_META_BUCKET, new_disk}, endpoints::EndpointServerPools, - peer::S3PeerSys, + rpc::S3PeerSys, sets::Sets, store_api::{ BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index c08bb8b2..4aed6cab 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -7,10 +7,10 @@ use crate::disk::{DiskInfo, DiskStore}; use crate::error::{ Error, Result, StorageError, is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err, }; -use crate::peer::is_reserved_or_invalid_bucket; use crate::set_disk::SetDisks; use crate::store::check_list_objs_args; use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions}; +use crate::store_utils::is_reserved_or_invalid_bucket; use crate::{store::ECStore, store_api::ListObjectsV2Info}; use futures::future::join_all; use rand::seq::SliceRandom; diff --git a/ecstore/src/store_utils.rs b/ecstore/src/store_utils.rs index 06edaacb..5ba5b1f4 100644 --- a/ecstore/src/store_utils.rs +++ b/ecstore/src/store_utils.rs @@ -1,7 +1,10 @@ use crate::config::storageclass::STANDARD; +use crate::disk::RUSTFS_META_BUCKET; +use regex::Regex; use rustfs_filemeta::headers::AMZ_OBJECT_TAGGING; use rustfs_filemeta::headers::AMZ_STORAGE_CLASS; use std::collections::HashMap; +use std::io::{Error, Result}; pub fn clean_metadata(metadata: &mut HashMap) { remove_standard_storage_class(metadata); @@ -19,3 +22,60 @@ pub fn clean_metadata_keys(metadata: &mut HashMap, key_names: &[ metadata.remove(key.to_owned()); } } + +// 检查是否为 元数据桶 +fn is_meta_bucket(bucket_name: &str) -> bool { + bucket_name == RUSTFS_META_BUCKET +} + +// 检查是否为 保留桶 +fn is_reserved_bucket(bucket_name: &str) -> bool { + bucket_name == "rustfs" +} + +// 检查桶名是否为保留名或无效名 +pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool { + if bucket_entry.is_empty() { + return true; + } + + let bucket_entry = bucket_entry.trim_end_matches('/'); + let result = check_bucket_name(bucket_entry, strict).is_err(); + + result || is_meta_bucket(bucket_entry) || is_reserved_bucket(bucket_entry) +} + +// 检查桶名是否有效 +fn check_bucket_name(bucket_name: &str, strict: bool) -> Result<()> { + if bucket_name.trim().is_empty() { + return Err(Error::other("Bucket name cannot be empty")); + } + if bucket_name.len() < 3 { + return Err(Error::other("Bucket name cannot be shorter than 3 characters")); + } + if bucket_name.len() > 63 { + return Err(Error::other("Bucket name cannot be longer than 63 characters")); + } + + let ip_address_regex = Regex::new(r"^(\d+\.){3}\d+$").unwrap(); + if ip_address_regex.is_match(bucket_name) { + return Err(Error::other("Bucket name cannot be an IP address")); + } + + let valid_bucket_name_regex = if strict { + Regex::new(r"^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$").unwrap() + } else { + Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\.\-_:]{1,61}[A-Za-z0-9]$").unwrap() + }; + + if !valid_bucket_name_regex.is_match(bucket_name) { + return Err(Error::other("Bucket name contains invalid characters")); + } + + // 检查包含 "..", ".-", "-." + if bucket_name.contains("..") || bucket_name.contains(".-") || bucket_name.contains("-.") { + return Err(Error::other("Bucket name contains invalid characters")); + } + + Ok(()) +} diff --git a/iam/src/lib.rs b/iam/src/lib.rs index 3aa1259e..8f3fd6b0 100644 --- a/iam/src/lib.rs +++ b/iam/src/lib.rs @@ -1,7 +1,6 @@ use crate::error::{Error, Result}; use ecstore::store::ECStore; use manager::IamCache; -use policy::auth::Credentials; use std::sync::{Arc, OnceLock}; use store::object::ObjectStore; use sys::IamSys; @@ -17,39 +16,6 @@ pub mod sys; static IAM_SYS: OnceLock>> = OnceLock::new(); -static GLOBAL_ACTIVE_CRED: OnceLock = OnceLock::new(); - -pub fn init_global_action_cred(ak: Option, sk: Option) -> Result<()> { - let ak = { - if let Some(k) = ak { - k - } else { - utils::gen_access_key(20).unwrap_or_default() - } - }; - - let sk = { - if let Some(k) = sk { - k - } else { - utils::gen_secret_key(32).unwrap_or_default() - } - }; - - GLOBAL_ACTIVE_CRED - .set(Credentials { - access_key: ak, - secret_key: sk, - ..Default::default() - }) - .unwrap(); - Ok(()) -} - -pub fn get_global_action_cred() -> Option { - GLOBAL_ACTIVE_CRED.get().cloned() -} - #[instrument(skip(ecstore))] pub async fn init_iam_sys(ecstore: Arc) -> Result<()> { debug!("init iam system"); diff --git a/iam/src/manager.rs b/iam/src/manager.rs index a556aff2..a31125cf 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -2,14 +2,13 @@ use crate::error::{Error, Result, is_err_config_not_found}; use crate::{ cache::{Cache, CacheEntity}, error::{Error as IamError, is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user}, - get_global_action_cred, store::{GroupInfo, MappedPolicy, Store, UserType, object::IAM_CONFIG_PREFIX}, sys::{ MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, STATUS_ENABLED, UpdateServiceAccountOpts, }, }; -// use ecstore::utils::crypto::base64_encode; +use ecstore::global::get_global_action_cred; use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc}; use policy::{ arn::ARN, @@ -39,8 +38,8 @@ use tokio::{ mpsc::{Receiver, Sender}, }, }; -use tracing::error; use tracing::warn; +use tracing::{error, info}; const IAM_FORMAT_FILE: &str = "format.json"; const IAM_FORMAT_VERSION_1: i32 = 1; @@ -108,18 +107,18 @@ where loop { select! { _ = ticker.tick() => { - warn!("iam load ticker"); + info!("iam load ticker"); if let Err(err) =s.clone().load().await{ error!("iam load err {:?}", err); } }, i = reciver.recv() => { - warn!("iam load reciver"); + info!("iam load reciver"); match i { Some(t) => { let last = s.last_timestamp.load(Ordering::Relaxed); if last <= t { - warn!("iam load reciver load"); + info!("iam load reciver load"); if let Err(err) =s.clone().load().await{ error!("iam load err {:?}", err); } diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index 6e616eb5..525c146b 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -3,7 +3,6 @@ use crate::error::{Error, Result, is_err_config_not_found}; use crate::{ cache::{Cache, CacheEntity}, error::{is_err_no_such_policy, is_err_no_such_user}, - get_global_action_cred, manager::{extract_jwt_claims, get_default_policyes}, }; use ecstore::{ @@ -11,6 +10,7 @@ use ecstore::{ RUSTFS_CONFIG_PREFIX, com::{delete_config, read_config, read_config_with_metadata, save_config}, }, + global::get_global_action_cred, store::ECStore, store_api::{ObjectInfo, ObjectOptions}, store_list_objects::{ObjectInfoOrErr, WalkOptions}, diff --git a/iam/src/sys.rs b/iam/src/sys.rs index 6840212b..1bcbb700 100644 --- a/iam/src/sys.rs +++ b/iam/src/sys.rs @@ -2,15 +2,13 @@ use crate::error::Error as IamError; use crate::error::is_err_no_such_account; use crate::error::is_err_no_such_temp_account; use crate::error::{Error, Result}; -use crate::get_global_action_cred; use crate::manager::IamCache; use crate::manager::extract_jwt_claims; use crate::manager::get_default_policyes; use crate::store::MappedPolicy; use crate::store::Store; use crate::store::UserType; -// use ecstore::utils::crypto::base64_decode; -// use ecstore::utils::crypto::base64_encode; +use ecstore::global::get_global_action_cred; use madmin::AddOrUpdateUserReq; use madmin::GroupDesc; use policy::arn::ARN; diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 88676b49..f59b32e5 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -11,20 +11,20 @@ use ecstore::bucket::versioning_sys::BucketVersioningSys; use ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys}; use ecstore::error::StorageError; use ecstore::global::GLOBAL_ALlHealState; +use ecstore::global::get_global_action_cred; use ecstore::heal::data_usage::load_data_usage_from_backend; use ecstore::heal::heal_commands::HealOpts; use ecstore::heal::heal_ops::new_heal_sequence; use ecstore::metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics}; use ecstore::new_object_layer_fn; -use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free}; use ecstore::store::is_valid_object_prefix; use ecstore::store_api::BucketOptions; use ecstore::store_api::StorageAPI; +use ecstore::store_utils::is_reserved_or_invalid_bucket; use futures::{Stream, StreamExt}; use http::{HeaderMap, Uri}; use hyper::StatusCode; -use iam::get_global_action_cred; use iam::store::MappedPolicy; use rustfs_utils::path::path_join; // use lazy_static::lazy_static; @@ -810,7 +810,7 @@ impl Operation for SetRemoteTargetHandler { //println!("bucket is:{}", bucket.clone()); if let Some(bucket) = querys.get("bucket") { if bucket.is_empty() { - println!("have bucket: {}", bucket); + info!("have bucket: {}", bucket); return Ok(S3Response::new((StatusCode::OK, Body::from("fuck".to_string())))); } let Some(store) = new_object_layer_fn() else { @@ -824,13 +824,13 @@ impl Operation for SetRemoteTargetHandler { .await { Ok(info) => { - println!("Bucket Info: {:?}", info); + info!("Bucket Info: {:?}", info); if !info.versionning { return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("bucket need versioned".to_string())))); } } Err(err) => { - eprintln!("Error: {:?}", err); + error!("Error: {:?}", err); return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("empty bucket".to_string())))); } } @@ -934,7 +934,7 @@ impl Operation for ListRemoteTargetHandler { .await { Ok(info) => { - println!("Bucket Info: {:?}", info); + info!("Bucket Info: {:?}", info); if !info.versionning { return Ok(S3Response::new(( StatusCode::FORBIDDEN, @@ -943,7 +943,7 @@ impl Operation for ListRemoteTargetHandler { } } Err(err) => { - eprintln!("Error fetching bucket info: {:?}", err); + error!("Error fetching bucket info: {:?}", err); return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string())))); } } diff --git a/rustfs/src/admin/handlers/group.rs b/rustfs/src/admin/handlers/group.rs index e64ef4c1..b8eb3628 100644 --- a/rustfs/src/admin/handlers/group.rs +++ b/rustfs/src/admin/handlers/group.rs @@ -1,8 +1,6 @@ +use ecstore::global::get_global_action_cred; use http::{HeaderMap, StatusCode}; -use iam::{ - error::{is_err_no_such_group, is_err_no_such_user}, - get_global_action_cred, -}; +use iam::error::{is_err_no_such_group, is_err_no_such_user}; use madmin::GroupAddRemove; use matchit::Params; use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error}; diff --git a/rustfs/src/admin/handlers/policys.rs b/rustfs/src/admin/handlers/policys.rs index 41329a0b..2c575c8d 100644 --- a/rustfs/src/admin/handlers/policys.rs +++ b/rustfs/src/admin/handlers/policys.rs @@ -1,6 +1,8 @@ use crate::admin::{router::Operation, utils::has_space_be}; +use ecstore::global::get_global_action_cred; use http::{HeaderMap, StatusCode}; -use iam::{error::is_err_no_such_user, get_global_action_cred, store::MappedPolicy}; +use iam::error::is_err_no_such_user; +use iam::store::MappedPolicy; use matchit::Params; use policy::policy::Policy; use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error}; diff --git a/rustfs/src/admin/handlers/service_account.rs b/rustfs/src/admin/handlers/service_account.rs index 7893082f..a821bf09 100644 --- a/rustfs/src/admin/handlers/service_account.rs +++ b/rustfs/src/admin/handlers/service_account.rs @@ -1,13 +1,11 @@ use crate::admin::utils::has_space_be; use crate::auth::{get_condition_values, get_session_token}; use crate::{admin::router::Operation, auth::check_key_valid}; +use ecstore::global::get_global_action_cred; use http::HeaderMap; use hyper::StatusCode; -use iam::{ - error::is_err_no_such_service_account, - get_global_action_cred, - sys::{NewServiceAccountOpts, UpdateServiceAccountOpts}, -}; +use iam::error::is_err_no_such_service_account; +use iam::sys::{NewServiceAccountOpts, UpdateServiceAccountOpts}; use madmin::{ AddServiceAccountReq, AddServiceAccountResp, Credentials, InfoServiceAccountResp, ListServiceAccountsResp, ServiceAccountInfo, UpdateServiceAccountReq, diff --git a/rustfs/src/admin/handlers/trace.rs b/rustfs/src/admin/handlers/trace.rs index 55a489b5..a2d34087 100644 --- a/rustfs/src/admin/handlers/trace.rs +++ b/rustfs/src/admin/handlers/trace.rs @@ -1,4 +1,4 @@ -use ecstore::{GLOBAL_Endpoints, peer_rest_client::PeerRestClient}; +use ecstore::{GLOBAL_Endpoints, rpc::PeerRestClient}; use http::StatusCode; use hyper::Uri; use madmin::service_commands::ServiceTraceOpts; diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index a375d7ad..871c4337 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -1,7 +1,9 @@ -use std::{collections::HashMap, str::from_utf8}; - +use crate::{ + admin::{router::Operation, utils::has_space_be}, + auth::{check_key_valid, get_condition_values, get_session_token}, +}; +use ecstore::global::get_global_action_cred; use http::{HeaderMap, StatusCode}; -use iam::get_global_action_cred; use madmin::{AccountStatus, AddOrUpdateUserReq}; use matchit::Params; use policy::policy::{ @@ -11,13 +13,9 @@ use policy::policy::{ use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error}; use serde::Deserialize; use serde_urlencoded::from_bytes; +use std::{collections::HashMap, str::from_utf8}; use tracing::warn; -use crate::{ - admin::{router::Operation, utils::has_space_be}, - auth::{check_key_valid, get_condition_values, get_session_token}, -}; - #[derive(Debug, Deserialize, Default)] pub struct AddUserQuery { #[serde(rename = "accessKey")] diff --git a/rustfs/src/admin/router.rs b/rustfs/src/admin/router.rs index 7413623b..3fa18b7c 100644 --- a/rustfs/src/admin/router.rs +++ b/rustfs/src/admin/router.rs @@ -1,5 +1,4 @@ -use base64::{Engine as _, engine::general_purpose}; -use hmac::{Hmac, Mac}; +use ecstore::rpc::verify_rpc_signature; use hyper::HeaderMap; use hyper::Method; use hyper::StatusCode; @@ -14,80 +13,11 @@ use s3s::S3Result; use s3s::header; use s3s::route::S3Route; use s3s::s3_error; -use sha2::Sha256; -use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::error; use super::ADMIN_PREFIX; use super::RUSTFS_ADMIN_PREFIX; use super::rpc::RPC_PREFIX; -use iam::get_global_action_cred; - -type HmacSha256 = Hmac; - -const SIGNATURE_HEADER: &str = "x-rustfs-signature"; -const TIMESTAMP_HEADER: &str = "x-rustfs-timestamp"; -const SIGNATURE_VALID_DURATION: u64 = 300; // 5 minutes - -/// Get the shared secret for HMAC signing -fn get_shared_secret() -> String { - if let Some(cred) = get_global_action_cred() { - cred.secret_key - } else { - // Fallback to environment variable if global credentials are not available - std::env::var("RUSTFS_RPC_SECRET").unwrap_or_else(|_| "rustfs-default-secret".to_string()) - } -} - -/// Generate HMAC-SHA256 signature for the given data -fn generate_signature(secret: &str, url: &str, method: &str, timestamp: u64) -> String { - let data = format!("{}|{}|{}", url, method, timestamp); - let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); - mac.update(data.as_bytes()); - let result = mac.finalize(); - general_purpose::STANDARD.encode(result.into_bytes()) -} - -/// Verify the request signature for RPC requests -fn verify_rpc_signature(req: &S3Request) -> S3Result<()> { - let secret = get_shared_secret(); - - // Get signature from header - let signature = req - .headers - .get(SIGNATURE_HEADER) - .and_then(|v| v.to_str().ok()) - .ok_or_else(|| s3_error!(InvalidArgument, "Missing signature header"))?; - - // Get timestamp from header - let timestamp_str = req - .headers - .get(TIMESTAMP_HEADER) - .and_then(|v| v.to_str().ok()) - .ok_or_else(|| s3_error!(InvalidArgument, "Missing timestamp header"))?; - - let timestamp: u64 = timestamp_str - .parse() - .map_err(|_| s3_error!(InvalidArgument, "Invalid timestamp format"))?; - - // Check timestamp validity (prevent replay attacks) - let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - - if current_time.saturating_sub(timestamp) > SIGNATURE_VALID_DURATION { - return Err(s3_error!(InvalidArgument, "Request timestamp expired")); - } - - // Generate expected signature - let url = req.uri.to_string(); - let method = req.method.as_str(); - let expected_signature = generate_signature(&secret, &url, method, timestamp); - - // Compare signatures - if signature != expected_signature { - return Err(s3_error!(AccessDenied, "Invalid signature")); - } - - Ok(()) -} pub struct S3Router { router: Router, @@ -160,7 +90,10 @@ where if req.uri.path().starts_with(RPC_PREFIX) { // Skip signature verification for HEAD requests (health checks) if req.method != Method::HEAD { - verify_rpc_signature(req)?; + verify_rpc_signature(&req.uri.to_string(), &req.method, &req.headers).map_err(|e| { + error!("RPC signature verification failed: {}", e); + s3_error!(AccessDenied, "{}", e) + })?; } return Ok(()); } diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs index 372a2ffa..f9dd4184 100644 --- a/rustfs/src/admin/rpc.rs +++ b/rustfs/src/admin/rpc.rs @@ -1,7 +1,6 @@ use super::router::AdminOperation; use super::router::Operation; use super::router::S3Router; -use crate::storage::ecfs::bytes_stream; use ecstore::disk::DiskAPI; use ecstore::disk::WalkDirOptions; use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; @@ -10,6 +9,7 @@ use futures::StreamExt; use http::StatusCode; use hyper::Method; use matchit::Params; +use rustfs_utils::net::bytes_stream; use s3s::Body; use s3s::S3Request; use s3s::S3Response; diff --git a/rustfs/src/auth.rs b/rustfs/src/auth.rs index a66a9242..a5cac503 100644 --- a/rustfs/src/auth.rs +++ b/rustfs/src/auth.rs @@ -1,9 +1,7 @@ -use std::collections::HashMap; - +use ecstore::global::get_global_action_cred; use http::HeaderMap; use http::Uri; use iam::error::Error as IamError; -use iam::get_global_action_cred; use iam::sys::SESSION_POLICY_NAME; use policy::auth; use policy::auth::get_claims_from_token_with_secret; @@ -15,6 +13,7 @@ use s3s::auth::SecretKey; use s3s::auth::SimpleAuth; use s3s::s3_error; use serde_json::Value; +use std::collections::HashMap; pub struct IAMAuth { simple_auth: SimpleAuth, diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index ab0889eb..b15bf091 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -4,7 +4,7 @@ mod config; mod console; mod error; mod event; -mod grpc; +// mod grpc; pub mod license; mod logging; mod server; @@ -28,6 +28,7 @@ use ecstore::cmd::bucket_replication::init_bucket_replication_pool; use ecstore::config as ecconfig; use ecstore::config::GLOBAL_ConfigSys; use ecstore::heal::background_heal_ops::init_auto_heal; +use ecstore::rpc::make_server; use ecstore::store_api::BucketOptions; use ecstore::{ endpoints::EndpointServerPools, @@ -37,7 +38,6 @@ use ecstore::{ update_erasure_type, }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; -use grpc::make_server; use http::{HeaderMap, Request as HttpRequest, Response}; use hyper_util::server::graceful::GracefulShutdown; use hyper_util::{ @@ -129,7 +129,7 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("server_address {}", &server_address); // Set up AK and SK - iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?; + ecstore::global::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone())); set_global_rustfs_port(server_port); diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 458cb597..936f868e 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -8,6 +8,7 @@ use crate::storage::access::ReqInfo; use crate::storage::options::copy_dst_opts; use crate::storage::options::copy_src_opts; use crate::storage::options::{extract_metadata_from_mime, get_opts}; +use api::object_store::bytes_stream; use api::query::Context; use api::query::Query; use api::server::dbms::DatabaseManagerSystem; @@ -52,8 +53,7 @@ use ecstore::store_api::ObjectOptions; use ecstore::store_api::ObjectToDelete; use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; // use ecstore::store_api::RESERVED_METADATA_PREFIX; -use futures::pin_mut; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use http::HeaderMap; use lazy_static::lazy_static; use policy::auth; @@ -95,7 +95,6 @@ use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; -use transform_stream::AsyncTryStream; use uuid::Uuid; macro_rules! try_ { @@ -2414,24 +2413,3 @@ impl S3 for FS { })) } } - -#[allow(dead_code)] -pub fn bytes_stream(stream: S, content_length: usize) -> impl Stream> + Send + 'static -where - S: Stream> + Send + 'static, - E: Send + 'static, -{ - AsyncTryStream::::new(|mut y| async move { - pin_mut!(stream); - let mut remaining: usize = content_length; - while let Some(result) = stream.next().await { - let mut bytes = result?; - if bytes.len() > remaining { - bytes.truncate(remaining); - } - remaining -= bytes.len(); - y.yield_ok(bytes).await; - } - Ok(()) - }) -}