refactor: reorganize RPC modules into unified structure

This commit is contained in:
weisd
2025-06-19 17:06:54 +08:00
parent 450db14305
commit e145586b65
38 changed files with 655 additions and 321 deletions

5
Cargo.lock generated
View File

@@ -3631,6 +3631,7 @@ dependencies = [
"criterion",
"flatbuffers 25.2.10",
"futures",
"futures-util",
"glob",
"hex-simd",
"highway",
@@ -3664,6 +3665,7 @@ dependencies = [
"s3s",
"serde",
"serde_json",
"serde_urlencoded",
"sha2 0.10.9",
"shadow-rs",
"siphasher 1.0.1",
@@ -8494,8 +8496,10 @@ dependencies = [
"base64-simd",
"blake3",
"brotli 8.0.1",
"bytes",
"crc32fast",
"flate2",
"futures",
"hex-simd",
"highway",
"lazy_static",
@@ -8517,6 +8521,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing",
"transform-stream",
"url",
"winapi",
"zstd",

View File

@@ -40,6 +40,7 @@ pin_project! {
url:String,
method: Method,
headers: HeaderMap,
#[pin]
inner: StreamReader<Pin<Box<dyn Stream<Item=std::io::Result<Bytes>>+Send+Sync>>, Bytes>,
}
}

View File

@@ -34,6 +34,10 @@ brotli = { workspace = true , optional = true}
zstd = { workspace = true , optional = true}
snap = { workspace = true , optional = true}
lz4 = { workspace = true , optional = true}
rand = { workspace = true, optional = true }
futures= { workspace = true, optional = true }
transform-stream= { workspace = true, optional = true }
bytes= { workspace = true, optional = true }
[dev-dependencies]
tempfile = { workspace = true }
@@ -49,11 +53,11 @@ workspace = true
default = ["ip"] # features that are enabled by default
ip = ["dep:local-ip-address"] # ip characteristics and their dependencies
tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies
net = ["ip","dep:url", "dep:netif", "dep:lazy_static"] # empty network features
net = ["ip","dep:url", "dep:netif", "dep:lazy_static", "dep:futures", "dep:transform-stream", "dep:bytes"] # empty network features
io = ["dep:tokio"]
path = []
compress =["dep:flate2","dep:brotli","dep:snap","dep:lz4","dep:zstd"]
string = ["dep:regex","dep:lazy_static"]
string = ["dep:regex","dep:lazy_static","dep:rand"]
crypto = ["dep:base64-simd","dep:hex-simd"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher"]
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities

View File

@@ -1,10 +1,13 @@
use bytes::Bytes;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use std::{
collections::HashSet,
fmt::Display,
net::{IpAddr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs},
};
use transform_stream::AsyncTryStream;
use url::Host;
lazy_static! {
@@ -167,6 +170,27 @@ pub fn parse_and_resolve_address(addr_str: &str) -> std::io::Result<SocketAddr>
Ok(resolved_addr)
}
#[allow(dead_code)]
pub fn bytes_stream<S, E>(stream: S, content_length: usize) -> impl Stream<Item = std::result::Result<Bytes, E>> + Send + 'static
where
S: Stream<Item = std::result::Result<Bytes, E>> + Send + 'static,
E: Send + 'static,
{
AsyncTryStream::<Bytes, E, _>::new(|mut y| async move {
pin_mut!(stream);
let mut remaining: usize = content_length;
while let Some(result) = stream.next().await {
let mut bytes = result?;
if bytes.len() > remaining {
bytes.truncate(remaining);
}
remaining -= bytes.len();
y.yield_ok(bytes).await;
}
Ok(())
})
}
#[cfg(test)]
mod test {
use std::net::{Ipv4Addr, Ipv6Addr};

View File

@@ -1,4 +1,5 @@
use lazy_static::*;
use rand::{Rng, RngCore};
use regex::Regex;
use std::io::{Error, Result};
@@ -306,6 +307,43 @@ pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
Ok(ret)
}
pub fn gen_access_key(length: usize) -> Result<String> {
const ALPHA_NUMERIC_TABLE: [char; 36] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N',
'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
];
if length < 3 {
return Err(Error::other("access key length is too short"));
}
let mut result = String::with_capacity(length);
let mut rng = rand::rng();
for _ in 0..length {
result.push(ALPHA_NUMERIC_TABLE[rng.random_range(0..ALPHA_NUMERIC_TABLE.len())]);
}
Ok(result)
}
pub fn gen_secret_key(length: usize) -> Result<String> {
use base64_simd::URL_SAFE_NO_PAD;
if length < 8 {
return Err(Error::other("secret key length is too short"));
}
let mut rng = rand::rng();
let mut key = vec![0u8; URL_SAFE_NO_PAD.estimated_decoded_length(length)];
rng.fill_bytes(&mut key);
let encoded = URL_SAFE_NO_PAD.encode_to_string(&key);
let key_str = encoded.replace("/", "+");
Ok(key_str)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -82,6 +82,8 @@ shadow-rs.workspace = true
rustfs-filemeta.workspace = true
rustfs-utils ={workspace = true, features=["full"]}
rustfs-rio.workspace = true
futures-util.workspace = true
serde_urlencoded.workspace = true
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -6,7 +6,7 @@ use crate::bucket::metadata_sys::get_replication_config;
use crate::bucket::versioning_sys::BucketVersioningSys;
use crate::error::Error;
use crate::new_object_layer_fn;
use crate::peer::RemotePeerS3Client;
use crate::rpc::RemotePeerS3Client;
use crate::store;
use crate::store_api::ObjectIO;
use crate::store_api::ObjectInfo;

View File

@@ -4,11 +4,11 @@ use crate::{
StorageAPI,
bucket::{metadata_sys, target::BucketTarget},
endpoints::Node,
peer::{PeerS3Client, RemotePeerS3Client},
rpc::{PeerS3Client, RemotePeerS3Client},
};
use crate::{
bucket::{self, target::BucketTargets},
new_object_layer_fn, peer, store_api,
new_object_layer_fn, store_api,
};
//use tokio::sync::RwLock;
use aws_sdk_s3::Client as S3Client;
@@ -24,7 +24,7 @@ use tokio::sync::RwLock;
pub struct TClient {
pub s3cli: S3Client,
pub remote_peer_client: peer::RemotePeerS3Client,
pub remote_peer_client: RemotePeerS3Client,
pub arn: String,
}
impl TClient {
@@ -444,7 +444,7 @@ impl BucketTargetSys {
grid_host: "".to_string(),
};
let cli = peer::RemotePeerS3Client::new(Some(node), None);
let cli = RemotePeerS3Client::new(Some(node), None);
match cli
.get_bucket_info(&tgt.target_bucket, &store_api::BucketOptions::default())

View File

@@ -6,7 +6,6 @@ pub mod format;
pub mod fs;
pub mod local;
pub mod os;
pub mod remote;
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
@@ -22,13 +21,13 @@ use crate::heal::{
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::{HealScanMode, HealingTracker},
};
use crate::rpc::RemoteDisk;
use bytes::Bytes;
use endpoint::Endpoint;
use error::DiskError;
use error::{Error, Result};
use local::LocalDisk;
use madmin::info_commands::DiskMetrics;
use remote::RemoteDisk;
use rustfs_filemeta::{FileInfo, RawFileInfo};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, path::PathBuf, sync::Arc};

View File

@@ -102,7 +102,7 @@ where
}
}
Err(e) => {
error!("Error reading shard {}: {}", i, e);
// error!("Error reading shard {}: {}", i, e);
errs[i] = Some(e);
}
}

View File

@@ -1,12 +1,3 @@
use lazy_static::lazy_static;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
time::SystemTime,
};
use tokio::sync::{OnceCell, RwLock};
use uuid::Uuid;
use crate::heal::mrf::MRFState;
use crate::{
disk::DiskStore,
@@ -14,6 +5,15 @@ use crate::{
heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState},
store::ECStore,
};
use lazy_static::lazy_static;
use policy::auth::Credentials;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
time::SystemTime,
};
use tokio::sync::{OnceCell, RwLock};
use uuid::Uuid;
pub const DISK_ASSUME_UNKNOWN_SIZE: u64 = 1 << 30;
pub const DISK_MIN_INODES: u64 = 1000;
@@ -39,6 +39,38 @@ lazy_static! {
pub static ref GLOBAL_BOOT_TIME: OnceCell<SystemTime> = OnceCell::new();
}
static GLOBAL_ACTIVE_CRED: OnceLock<Credentials> = OnceLock::new();
pub fn init_global_action_cred(ak: Option<String>, sk: Option<String>) {
let ak = {
if let Some(k) = ak {
k
} else {
rustfs_utils::string::gen_access_key(20).unwrap_or_default()
}
};
let sk = {
if let Some(k) = sk {
k
} else {
rustfs_utils::string::gen_secret_key(32).unwrap_or_default()
}
};
GLOBAL_ACTIVE_CRED
.set(Credentials {
access_key: ak,
secret_key: sk,
..Default::default()
})
.unwrap();
}
pub fn get_global_action_cred() -> Option<Credentials> {
GLOBAL_ACTIVE_CRED.get().cloned()
}
/// Get the global rustfs port
pub fn global_rustfs_port() -> u16 {
if let Some(p) = GLOBAL_RUSTFS_PORT.get() {

View File

@@ -41,8 +41,8 @@ use crate::{
heal_ops::{BG_HEALING_UUID, HealSource},
},
new_object_layer_fn,
peer::is_reserved_or_invalid_bucket,
store::ECStore,
store_utils::is_reserved_or_invalid_bucket,
};
use crate::{disk::DiskAPI, store_api::ObjectInfo};
use crate::{

View File

@@ -15,17 +15,16 @@ pub mod global;
pub mod heal;
pub mod metrics_realtime;
pub mod notification_sys;
pub mod peer;
pub mod peer_rest_client;
pub mod pools;
pub mod rebalance;
pub mod rpc;
pub mod set_disk;
mod sets;
pub mod store;
pub mod store_api;
mod store_init;
pub mod store_list_objects;
mod store_utils;
pub mod store_utils;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;

View File

@@ -2,7 +2,7 @@ use crate::StorageAPI;
use crate::admin_server_info::get_commit_id;
use crate::error::{Error, Result};
use crate::global::{GLOBAL_BOOT_TIME, get_global_endpoints};
use crate::peer_rest_client::PeerRestClient;
use crate::rpc::PeerRestClient;
use crate::{endpoints::EndpointServerPools, new_object_layer_fn};
use futures::future::join_all;
use lazy_static::lazy_static;

View File

@@ -0,0 +1,375 @@
use crate::global::get_global_action_cred;
use base64::Engine as _;
use base64::engine::general_purpose;
use hmac::{Hmac, Mac};
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
use http::Uri;
use sha2::Sha256;
use time::OffsetDateTime;
use tracing::error;
type HmacSha256 = Hmac<Sha256>;
const SIGNATURE_HEADER: &str = "x-rustfs-signature";
const TIMESTAMP_HEADER: &str = "x-rustfs-timestamp";
const SIGNATURE_VALID_DURATION: i64 = 300; // 5 minutes
/// Get the shared secret for HMAC signing
fn get_shared_secret() -> String {
if let Some(cred) = get_global_action_cred() {
cred.secret_key
} else {
// Fallback to environment variable if global credentials are not available
std::env::var("RUSTFS_RPC_SECRET").unwrap_or_else(|_| "rustfs-default-secret".to_string())
}
}
/// Generate HMAC-SHA256 signature for the given data
fn generate_signature(secret: &str, url: &str, method: &Method, timestamp: i64) -> String {
let uri: Uri = url.parse().expect("Invalid URL");
let path_and_query = uri.path_and_query().unwrap();
let url = path_and_query.to_string();
let data = format!("{}|{}|{}", url, method, timestamp);
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
mac.update(data.as_bytes());
let result = mac.finalize();
general_purpose::STANDARD.encode(result.into_bytes())
}
/// Build headers with authentication signature
pub fn build_auth_headers(url: &str, method: &Method, headers: &mut HeaderMap) {
let secret = get_shared_secret();
let timestamp = OffsetDateTime::now_utc().unix_timestamp();
let signature = generate_signature(&secret, url, method, timestamp);
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap());
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&timestamp.to_string()).unwrap());
}
/// Verify the request signature for RPC requests
pub fn verify_rpc_signature(url: &str, method: &Method, headers: &HeaderMap) -> std::io::Result<()> {
let secret = get_shared_secret();
// Get signature from header
let signature = headers
.get(SIGNATURE_HEADER)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| std::io::Error::other("Missing signature header"))?;
// Get timestamp from header
let timestamp_str = headers
.get(TIMESTAMP_HEADER)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| std::io::Error::other("Missing timestamp header"))?;
let timestamp: i64 = timestamp_str
.parse()
.map_err(|_| std::io::Error::other("Invalid timestamp format"))?;
// Check timestamp validity (prevent replay attacks)
let current_time = OffsetDateTime::now_utc().unix_timestamp();
if current_time.saturating_sub(timestamp) > SIGNATURE_VALID_DURATION {
return Err(std::io::Error::other("Request timestamp expired"));
}
// Generate expected signature
let expected_signature = generate_signature(&secret, url, method, timestamp);
// Compare signatures
if signature != expected_signature {
error!(
"verify_rpc_signature: Invalid signature: secret {}, url {}, method {}, timestamp {}, signature {}, expected_signature {}",
secret, url, method, timestamp, signature, expected_signature
);
return Err(std::io::Error::other("Invalid signature"));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use http::{HeaderMap, Method};
use time::OffsetDateTime;
#[test]
fn test_get_shared_secret() {
let secret = get_shared_secret();
assert!(!secret.is_empty(), "Secret should not be empty");
let url = "http://node1:7000/rustfs/rpc/read_file_stream?disk=http%3A%2F%2Fnode1%3A7000%2Fdata%2Frustfs3&volume=.rustfs.sys&path=pool.bin%2Fdd0fd773-a962-4265-b543-783ce83953e9%2Fpart.1&offset=0&length=44";
let method = Method::GET;
let mut headers = HeaderMap::new();
build_auth_headers(url, &method, &mut headers);
let url = "/rustfs/rpc/read_file_stream?disk=http%3A%2F%2Fnode1%3A7000%2Fdata%2Frustfs3&volume=.rustfs.sys&path=pool.bin%2Fdd0fd773-a962-4265-b543-783ce83953e9%2Fpart.1&offset=0&length=44";
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_ok(), "Valid signature should pass verification");
}
#[test]
fn test_generate_signature_deterministic() {
let secret = "test-secret";
let url = "http://example.com/api/test";
let method = Method::GET;
let timestamp = 1640995200; // Fixed timestamp
let signature1 = generate_signature(secret, url, &method, timestamp);
let signature2 = generate_signature(secret, url, &method, timestamp);
assert_eq!(signature1, signature2, "Same inputs should produce same signature");
assert!(!signature1.is_empty(), "Signature should not be empty");
}
#[test]
fn test_generate_signature_different_inputs() {
let secret = "test-secret";
let url = "http://example.com/api/test";
let method = Method::GET;
let timestamp = 1640995200;
let signature1 = generate_signature(secret, url, &method, timestamp);
let signature2 = generate_signature(secret, "http://different.com/api/test2", &method, timestamp);
let signature3 = generate_signature(secret, url, &Method::POST, timestamp);
let signature4 = generate_signature(secret, url, &method, timestamp + 1);
assert_ne!(signature1, signature2, "Different URLs should produce different signatures");
assert_ne!(signature1, signature3, "Different methods should produce different signatures");
assert_ne!(signature1, signature4, "Different timestamps should produce different signatures");
}
#[test]
fn test_build_auth_headers() {
let url = "http://example.com/api/test";
let method = Method::POST;
let mut headers = HeaderMap::new();
build_auth_headers(url, &method, &mut headers);
// Verify headers are present
assert!(headers.contains_key(SIGNATURE_HEADER), "Should contain signature header");
assert!(headers.contains_key(TIMESTAMP_HEADER), "Should contain timestamp header");
// Verify header values are not empty
let signature = headers.get(SIGNATURE_HEADER).unwrap().to_str().unwrap();
let timestamp_str = headers.get(TIMESTAMP_HEADER).unwrap().to_str().unwrap();
assert!(!signature.is_empty(), "Signature should not be empty");
assert!(!timestamp_str.is_empty(), "Timestamp should not be empty");
// Verify timestamp is a valid integer
let timestamp: i64 = timestamp_str.parse().expect("Timestamp should be valid integer");
let current_time = OffsetDateTime::now_utc().unix_timestamp();
// Should be within a reasonable range (within 1 second of current time)
assert!((current_time - timestamp).abs() <= 1, "Timestamp should be close to current time");
}
#[test]
fn test_verify_rpc_signature_success() {
let url = "http://example.com/api/test";
let method = Method::GET;
let mut headers = HeaderMap::new();
// Build headers with valid signature
build_auth_headers(url, &method, &mut headers);
// Verify should succeed
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_ok(), "Valid signature should pass verification");
}
#[test]
fn test_verify_rpc_signature_invalid_signature() {
let url = "http://example.com/api/test";
let method = Method::GET;
let mut headers = HeaderMap::new();
// Build headers with valid signature first
build_auth_headers(url, &method, &mut headers);
// Tamper with the signature
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str("invalid-signature").unwrap());
// Verify should fail
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_err(), "Invalid signature should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Invalid signature");
}
#[test]
fn test_verify_rpc_signature_expired_timestamp() {
let url = "http://example.com/api/test";
let method = Method::GET;
let mut headers = HeaderMap::new();
// Set expired timestamp (older than SIGNATURE_VALID_DURATION)
let expired_timestamp = OffsetDateTime::now_utc().unix_timestamp() - SIGNATURE_VALID_DURATION - 10;
let secret = get_shared_secret();
let signature = generate_signature(&secret, url, &method, expired_timestamp);
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap());
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&expired_timestamp.to_string()).unwrap());
// Verify should fail due to expired timestamp
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_err(), "Expired timestamp should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Request timestamp expired");
}
#[test]
fn test_verify_rpc_signature_missing_signature_header() {
let url = "http://example.com/api/test";
let method = Method::GET;
let mut headers = HeaderMap::new();
// Add only timestamp header, missing signature
let timestamp = OffsetDateTime::now_utc().unix_timestamp();
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&timestamp.to_string()).unwrap());
// Verify should fail
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_err(), "Missing signature header should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Missing signature header");
}
#[test]
fn test_verify_rpc_signature_missing_timestamp_header() {
let url = "http://example.com/api/test";
let method = Method::GET;
let mut headers = HeaderMap::new();
// Add only signature header, missing timestamp
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str("some-signature").unwrap());
// Verify should fail
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_err(), "Missing timestamp header should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Missing timestamp header");
}
#[test]
fn test_verify_rpc_signature_invalid_timestamp_format() {
let url = "http://example.com/api/test";
let method = Method::GET;
let mut headers = HeaderMap::new();
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str("some-signature").unwrap());
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str("invalid-timestamp").unwrap());
// Verify should fail
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_err(), "Invalid timestamp format should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Invalid timestamp format");
}
#[test]
fn test_verify_rpc_signature_url_mismatch() {
let original_url = "http://example.com/api/test";
let different_url = "http://example.com/api/different";
let method = Method::GET;
let mut headers = HeaderMap::new();
// Build headers for one URL
build_auth_headers(original_url, &method, &mut headers);
// Try to verify with a different URL
let result = verify_rpc_signature(different_url, &method, &headers);
assert!(result.is_err(), "URL mismatch should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Invalid signature");
}
#[test]
fn test_verify_rpc_signature_method_mismatch() {
let url = "http://example.com/api/test";
let original_method = Method::GET;
let different_method = Method::POST;
let mut headers = HeaderMap::new();
// Build headers for one method
build_auth_headers(url, &original_method, &mut headers);
// Try to verify with a different method
let result = verify_rpc_signature(url, &different_method, &headers);
assert!(result.is_err(), "Method mismatch should fail verification");
let error = result.unwrap_err();
assert_eq!(error.to_string(), "Invalid signature");
}
#[test]
fn test_signature_valid_duration_boundary() {
let url = "http://example.com/api/test";
let method = Method::GET;
let secret = get_shared_secret();
let mut headers = HeaderMap::new();
let current_time = OffsetDateTime::now_utc().unix_timestamp();
// Test timestamp just within valid duration
let valid_timestamp = current_time - SIGNATURE_VALID_DURATION + 1;
let signature = generate_signature(&secret, url, &method, valid_timestamp);
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap());
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&valid_timestamp.to_string()).unwrap());
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_ok(), "Timestamp within valid duration should pass");
// Test timestamp just outside valid duration
let mut headers = HeaderMap::new();
let invalid_timestamp = current_time - SIGNATURE_VALID_DURATION - 15;
let signature = generate_signature(&secret, url, &method, invalid_timestamp);
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap());
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&invalid_timestamp.to_string()).unwrap());
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_err(), "Timestamp outside valid duration should fail");
}
#[test]
fn test_round_trip_authentication() {
let test_cases = vec![
("http://example.com/api/test", Method::GET),
("https://api.rustfs.com/v1/bucket", Method::POST),
("http://localhost:9000/admin/info", Method::PUT),
("https://storage.example.com/path/to/object?query=param", Method::DELETE),
];
for (url, method) in test_cases {
let mut headers = HeaderMap::new();
// Build authentication headers
build_auth_headers(url, &method, &mut headers);
// Verify the signature should succeed
let result = verify_rpc_signature(url, &method, &headers);
assert!(result.is_ok(), "Round-trip test failed for {} {}", method, url);
}
}
}

11
ecstore/src/rpc/mod.rs Normal file
View File

@@ -0,0 +1,11 @@
mod http_auth;
mod peer_rest_client;
mod peer_s3_client;
mod remote_disk;
mod tonic_service;
pub use http_auth::{build_auth_headers, verify_rpc_signature};
pub use peer_rest_client::PeerRestClient;
pub use peer_s3_client::{LocalPeerS3Client, PeerS3Client, RemotePeerS3Client, S3PeerSys};
pub use remote_disk::RemoteDisk;
pub use tonic_service::make_server;

View File

@@ -8,6 +8,7 @@ use crate::heal::heal_commands::{
};
use crate::heal::heal_ops::RUSTFS_RESERVED_BUCKET;
use crate::store::all_local_disk;
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::{
disk::{self, VolumeInfo},
endpoints::{EndpointServerPools, Node},
@@ -20,7 +21,6 @@ use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::RwLock;
use tonic::Request;
@@ -622,63 +622,6 @@ impl PeerS3Client for RemotePeerS3Client {
}
}
// 检查桶名是否有效
fn check_bucket_name(bucket_name: &str, strict: bool) -> Result<()> {
if bucket_name.trim().is_empty() {
return Err(Error::other("Bucket name cannot be empty"));
}
if bucket_name.len() < 3 {
return Err(Error::other("Bucket name cannot be shorter than 3 characters"));
}
if bucket_name.len() > 63 {
return Err(Error::other("Bucket name cannot be longer than 63 characters"));
}
let ip_address_regex = Regex::new(r"^(\d+\.){3}\d+$").unwrap();
if ip_address_regex.is_match(bucket_name) {
return Err(Error::other("Bucket name cannot be an IP address"));
}
let valid_bucket_name_regex = if strict {
Regex::new(r"^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$").unwrap()
} else {
Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\.\-_:]{1,61}[A-Za-z0-9]$").unwrap()
};
if !valid_bucket_name_regex.is_match(bucket_name) {
return Err(Error::other("Bucket name contains invalid characters"));
}
// 检查包含 "..", ".-", "-."
if bucket_name.contains("..") || bucket_name.contains(".-") || bucket_name.contains("-.") {
return Err(Error::other("Bucket name contains invalid characters"));
}
Ok(())
}
// 检查是否为 元数据桶
fn is_meta_bucket(bucket_name: &str) -> bool {
bucket_name == disk::RUSTFS_META_BUCKET
}
// 检查是否为 保留桶
fn is_reserved_bucket(bucket_name: &str) -> bool {
bucket_name == "rustfs"
}
// 检查桶名是否为保留名或无效名
pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool {
if bucket_entry.is_empty() {
return true;
}
let bucket_entry = bucket_entry.trim_end_matches('/');
let result = check_bucket_name(bucket_entry, strict).is_err();
result || is_meta_bucket(bucket_entry) || is_reserved_bucket(bucket_entry)
}
pub async fn heal_bucket_local(bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
let disks = clone_drives().await;
let before_state = Arc::new(RwLock::new(vec![String::new(); disks.len()]));

View File

@@ -13,13 +13,25 @@ use protos::{
},
};
use crate::disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
endpoint::Endpoint,
};
use crate::{
disk::error::{Error, Result},
rpc::build_auth_headers,
};
use crate::{
disk::{FileReader, FileWriter},
heal::{
data_scanner::ShouldSleepFn,
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::{HealScanMode, HealingTracker},
},
};
use rustfs_filemeta::{FileInfo, RawFileInfo};
use rustfs_rio::{HttpReader, HttpWriter};
use base64::{Engine as _, engine::general_purpose};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::{
io::AsyncWrite,
sync::mpsc::{self, Sender},
@@ -29,26 +41,8 @@ use tonic::Request;
use tracing::info;
use uuid::Uuid;
use super::error::{Error, Result};
use super::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
endpoint::Endpoint,
};
use crate::{
disk::{FileReader, FileWriter},
heal::{
data_scanner::ShouldSleepFn,
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::{HealScanMode, HealingTracker},
},
};
use protos::proto_gen::node_service::RenamePartRequest;
type HmacSha256 = Hmac<Sha256>;
#[derive(Debug)]
pub struct RemoteDisk {
pub id: Mutex<Option<Uuid>>,
@@ -75,35 +69,6 @@ impl RemoteDisk {
endpoint: ep.clone(),
})
}
/// Get the shared secret for HMAC signing
fn get_shared_secret() -> String {
std::env::var("RUSTFS_RPC_SECRET").unwrap_or_else(|_| "rustfs-default-secret".to_string())
}
/// Generate HMAC-SHA256 signature for the given data
fn generate_signature(secret: &str, url: &str, method: &str, timestamp: u64) -> String {
let data = format!("{}|{}|{}", url, method, timestamp);
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
mac.update(data.as_bytes());
let result = mac.finalize();
general_purpose::STANDARD.encode(result.into_bytes())
}
/// Build headers with authentication signature
fn build_auth_headers(&self, url: &str, method: &Method, base_headers: Option<HeaderMap>) -> HeaderMap {
let mut headers = base_headers.unwrap_or_default();
let secret = Self::get_shared_secret();
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let signature = Self::generate_signature(&secret, url, method.as_str(), timestamp);
headers.insert("x-rustfs-signature", HeaderValue::from_str(&signature).unwrap());
headers.insert("x-rustfs-timestamp", HeaderValue::from_str(&timestamp.to_string()).unwrap());
headers
}
}
// TODO: all api need to handle errors
@@ -614,9 +579,9 @@ impl DiskAPI for RemoteDisk {
let opts = serde_json::to_vec(&opts)?;
let mut base_headers = HeaderMap::new();
base_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let headers = self.build_auth_headers(&url, &Method::GET, Some(base_headers));
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
build_auth_headers(&url, &Method::GET, &mut headers);
let mut reader = HttpReader::new(url, Method::GET, headers, Some(opts)).await?;
@@ -639,7 +604,9 @@ impl DiskAPI for RemoteDisk {
0
);
let headers = self.build_auth_headers(&url, &Method::GET, None);
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
build_auth_headers(&url, &Method::GET, &mut headers);
Ok(Box::new(HttpReader::new(url, Method::GET, headers, None).await?))
}
@@ -663,7 +630,9 @@ impl DiskAPI for RemoteDisk {
length
);
let headers = self.build_auth_headers(&url, &Method::GET, None);
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
build_auth_headers(&url, &Method::GET, &mut headers);
Ok(Box::new(HttpReader::new(url, Method::GET, headers, None).await?))
}
@@ -681,7 +650,9 @@ impl DiskAPI for RemoteDisk {
0
);
let headers = self.build_auth_headers(&url, &Method::PUT, None);
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
build_auth_headers(&url, &Method::PUT, &mut headers);
Ok(Box::new(HttpWriter::new(url, Method::PUT, headers).await?))
}
@@ -705,7 +676,9 @@ impl DiskAPI for RemoteDisk {
file_size
);
let headers = self.build_auth_headers(&url, &Method::PUT, None);
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
build_auth_headers(&url, &Method::PUT, &mut headers);
Ok(Box::new(HttpWriter::new(url, Method::PUT, headers).await?))
}

View File

@@ -1,7 +1,7 @@
use std::{collections::HashMap, io::Cursor, pin::Pin};
// use common::error::Error as EcsError;
use ecstore::{
use crate::{
admin_server_info::get_local_server_property,
bucket::{metadata::load_bucket_metadata, metadata_sys},
disk::{
@@ -14,7 +14,7 @@ use ecstore::{
},
metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics},
new_object_layer_fn,
peer::{LocalPeerS3Client, PeerS3Client},
rpc::{LocalPeerS3Client, PeerS3Client},
store::{all_local_disk_path, find_local_disk},
store_api::{BucketOptions, DeleteBucketOptions, MakeBucketOptions, StorageAPI},
};
@@ -226,7 +226,6 @@ impl Node for NodeService {
}
};
println!("bucket info {}", bucket_info.clone());
Ok(tonic::Response::new(GetBucketInfoResponse {
success: true,
bucket_info,

View File

@@ -29,7 +29,7 @@ use crate::{
bucket::metadata::BucketMetadata,
disk::{BUCKET_META_PREFIX, DiskOption, DiskStore, RUSTFS_META_BUCKET, new_disk},
endpoints::EndpointServerPools,
peer::S3PeerSys,
rpc::S3PeerSys,
sets::Sets,
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,

View File

@@ -7,10 +7,10 @@ use crate::disk::{DiskInfo, DiskStore};
use crate::error::{
Error, Result, StorageError, is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err,
};
use crate::peer::is_reserved_or_invalid_bucket;
use crate::set_disk::SetDisks;
use crate::store::check_list_objs_args;
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::{store::ECStore, store_api::ListObjectsV2Info};
use futures::future::join_all;
use rand::seq::SliceRandom;

View File

@@ -1,7 +1,10 @@
use crate::config::storageclass::STANDARD;
use crate::disk::RUSTFS_META_BUCKET;
use regex::Regex;
use rustfs_filemeta::headers::AMZ_OBJECT_TAGGING;
use rustfs_filemeta::headers::AMZ_STORAGE_CLASS;
use std::collections::HashMap;
use std::io::{Error, Result};
pub fn clean_metadata(metadata: &mut HashMap<String, String>) {
remove_standard_storage_class(metadata);
@@ -19,3 +22,60 @@ pub fn clean_metadata_keys(metadata: &mut HashMap<String, String>, key_names: &[
metadata.remove(key.to_owned());
}
}
// 检查是否为 元数据桶
fn is_meta_bucket(bucket_name: &str) -> bool {
bucket_name == RUSTFS_META_BUCKET
}
// 检查是否为 保留桶
fn is_reserved_bucket(bucket_name: &str) -> bool {
bucket_name == "rustfs"
}
// 检查桶名是否为保留名或无效名
pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool {
if bucket_entry.is_empty() {
return true;
}
let bucket_entry = bucket_entry.trim_end_matches('/');
let result = check_bucket_name(bucket_entry, strict).is_err();
result || is_meta_bucket(bucket_entry) || is_reserved_bucket(bucket_entry)
}
// 检查桶名是否有效
fn check_bucket_name(bucket_name: &str, strict: bool) -> Result<()> {
if bucket_name.trim().is_empty() {
return Err(Error::other("Bucket name cannot be empty"));
}
if bucket_name.len() < 3 {
return Err(Error::other("Bucket name cannot be shorter than 3 characters"));
}
if bucket_name.len() > 63 {
return Err(Error::other("Bucket name cannot be longer than 63 characters"));
}
let ip_address_regex = Regex::new(r"^(\d+\.){3}\d+$").unwrap();
if ip_address_regex.is_match(bucket_name) {
return Err(Error::other("Bucket name cannot be an IP address"));
}
let valid_bucket_name_regex = if strict {
Regex::new(r"^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$").unwrap()
} else {
Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\.\-_:]{1,61}[A-Za-z0-9]$").unwrap()
};
if !valid_bucket_name_regex.is_match(bucket_name) {
return Err(Error::other("Bucket name contains invalid characters"));
}
// 检查包含 "..", ".-", "-."
if bucket_name.contains("..") || bucket_name.contains(".-") || bucket_name.contains("-.") {
return Err(Error::other("Bucket name contains invalid characters"));
}
Ok(())
}

View File

@@ -1,7 +1,6 @@
use crate::error::{Error, Result};
use ecstore::store::ECStore;
use manager::IamCache;
use policy::auth::Credentials;
use std::sync::{Arc, OnceLock};
use store::object::ObjectStore;
use sys::IamSys;
@@ -17,39 +16,6 @@ pub mod sys;
static IAM_SYS: OnceLock<Arc<IamSys<ObjectStore>>> = OnceLock::new();
static GLOBAL_ACTIVE_CRED: OnceLock<Credentials> = OnceLock::new();
pub fn init_global_action_cred(ak: Option<String>, sk: Option<String>) -> Result<()> {
let ak = {
if let Some(k) = ak {
k
} else {
utils::gen_access_key(20).unwrap_or_default()
}
};
let sk = {
if let Some(k) = sk {
k
} else {
utils::gen_secret_key(32).unwrap_or_default()
}
};
GLOBAL_ACTIVE_CRED
.set(Credentials {
access_key: ak,
secret_key: sk,
..Default::default()
})
.unwrap();
Ok(())
}
pub fn get_global_action_cred() -> Option<Credentials> {
GLOBAL_ACTIVE_CRED.get().cloned()
}
#[instrument(skip(ecstore))]
pub async fn init_iam_sys(ecstore: Arc<ECStore>) -> Result<()> {
debug!("init iam system");

View File

@@ -2,14 +2,13 @@ use crate::error::{Error, Result, is_err_config_not_found};
use crate::{
cache::{Cache, CacheEntity},
error::{Error as IamError, is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user},
get_global_action_cred,
store::{GroupInfo, MappedPolicy, Store, UserType, object::IAM_CONFIG_PREFIX},
sys::{
MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, STATUS_ENABLED,
UpdateServiceAccountOpts,
},
};
// use ecstore::utils::crypto::base64_encode;
use ecstore::global::get_global_action_cred;
use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
use policy::{
arn::ARN,
@@ -39,8 +38,8 @@ use tokio::{
mpsc::{Receiver, Sender},
},
};
use tracing::error;
use tracing::warn;
use tracing::{error, info};
const IAM_FORMAT_FILE: &str = "format.json";
const IAM_FORMAT_VERSION_1: i32 = 1;
@@ -108,18 +107,18 @@ where
loop {
select! {
_ = ticker.tick() => {
warn!("iam load ticker");
info!("iam load ticker");
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
}
},
i = reciver.recv() => {
warn!("iam load reciver");
info!("iam load reciver");
match i {
Some(t) => {
let last = s.last_timestamp.load(Ordering::Relaxed);
if last <= t {
warn!("iam load reciver load");
info!("iam load reciver load");
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
}

View File

@@ -3,7 +3,6 @@ use crate::error::{Error, Result, is_err_config_not_found};
use crate::{
cache::{Cache, CacheEntity},
error::{is_err_no_such_policy, is_err_no_such_user},
get_global_action_cred,
manager::{extract_jwt_claims, get_default_policyes},
};
use ecstore::{
@@ -11,6 +10,7 @@ use ecstore::{
RUSTFS_CONFIG_PREFIX,
com::{delete_config, read_config, read_config_with_metadata, save_config},
},
global::get_global_action_cred,
store::ECStore,
store_api::{ObjectInfo, ObjectOptions},
store_list_objects::{ObjectInfoOrErr, WalkOptions},

View File

@@ -2,15 +2,13 @@ use crate::error::Error as IamError;
use crate::error::is_err_no_such_account;
use crate::error::is_err_no_such_temp_account;
use crate::error::{Error, Result};
use crate::get_global_action_cred;
use crate::manager::IamCache;
use crate::manager::extract_jwt_claims;
use crate::manager::get_default_policyes;
use crate::store::MappedPolicy;
use crate::store::Store;
use crate::store::UserType;
// use ecstore::utils::crypto::base64_decode;
// use ecstore::utils::crypto::base64_encode;
use ecstore::global::get_global_action_cred;
use madmin::AddOrUpdateUserReq;
use madmin::GroupDesc;
use policy::arn::ARN;

View File

@@ -11,20 +11,20 @@ use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
use ecstore::error::StorageError;
use ecstore::global::GLOBAL_ALlHealState;
use ecstore::global::get_global_action_cred;
use ecstore::heal::data_usage::load_data_usage_from_backend;
use ecstore::heal::heal_commands::HealOpts;
use ecstore::heal::heal_ops::new_heal_sequence;
use ecstore::metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics};
use ecstore::new_object_layer_fn;
use ecstore::peer::is_reserved_or_invalid_bucket;
use ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free};
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::StorageAPI;
use ecstore::store_utils::is_reserved_or_invalid_bucket;
use futures::{Stream, StreamExt};
use http::{HeaderMap, Uri};
use hyper::StatusCode;
use iam::get_global_action_cred;
use iam::store::MappedPolicy;
use rustfs_utils::path::path_join;
// use lazy_static::lazy_static;
@@ -810,7 +810,7 @@ impl Operation for SetRemoteTargetHandler {
//println!("bucket is:{}", bucket.clone());
if let Some(bucket) = querys.get("bucket") {
if bucket.is_empty() {
println!("have bucket: {}", bucket);
info!("have bucket: {}", bucket);
return Ok(S3Response::new((StatusCode::OK, Body::from("fuck".to_string()))));
}
let Some(store) = new_object_layer_fn() else {
@@ -824,13 +824,13 @@ impl Operation for SetRemoteTargetHandler {
.await
{
Ok(info) => {
println!("Bucket Info: {:?}", info);
info!("Bucket Info: {:?}", info);
if !info.versionning {
return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("bucket need versioned".to_string()))));
}
}
Err(err) => {
eprintln!("Error: {:?}", err);
error!("Error: {:?}", err);
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("empty bucket".to_string()))));
}
}
@@ -934,7 +934,7 @@ impl Operation for ListRemoteTargetHandler {
.await
{
Ok(info) => {
println!("Bucket Info: {:?}", info);
info!("Bucket Info: {:?}", info);
if !info.versionning {
return Ok(S3Response::new((
StatusCode::FORBIDDEN,
@@ -943,7 +943,7 @@ impl Operation for ListRemoteTargetHandler {
}
}
Err(err) => {
eprintln!("Error fetching bucket info: {:?}", err);
error!("Error fetching bucket info: {:?}", err);
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string()))));
}
}

View File

@@ -1,8 +1,6 @@
use ecstore::global::get_global_action_cred;
use http::{HeaderMap, StatusCode};
use iam::{
error::{is_err_no_such_group, is_err_no_such_user},
get_global_action_cred,
};
use iam::error::{is_err_no_such_group, is_err_no_such_user};
use madmin::GroupAddRemove;
use matchit::Params;
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};

View File

@@ -1,6 +1,8 @@
use crate::admin::{router::Operation, utils::has_space_be};
use ecstore::global::get_global_action_cred;
use http::{HeaderMap, StatusCode};
use iam::{error::is_err_no_such_user, get_global_action_cred, store::MappedPolicy};
use iam::error::is_err_no_such_user;
use iam::store::MappedPolicy;
use matchit::Params;
use policy::policy::Policy;
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};

View File

@@ -1,13 +1,11 @@
use crate::admin::utils::has_space_be;
use crate::auth::{get_condition_values, get_session_token};
use crate::{admin::router::Operation, auth::check_key_valid};
use ecstore::global::get_global_action_cred;
use http::HeaderMap;
use hyper::StatusCode;
use iam::{
error::is_err_no_such_service_account,
get_global_action_cred,
sys::{NewServiceAccountOpts, UpdateServiceAccountOpts},
};
use iam::error::is_err_no_such_service_account;
use iam::sys::{NewServiceAccountOpts, UpdateServiceAccountOpts};
use madmin::{
AddServiceAccountReq, AddServiceAccountResp, Credentials, InfoServiceAccountResp, ListServiceAccountsResp,
ServiceAccountInfo, UpdateServiceAccountReq,

View File

@@ -1,4 +1,4 @@
use ecstore::{GLOBAL_Endpoints, peer_rest_client::PeerRestClient};
use ecstore::{GLOBAL_Endpoints, rpc::PeerRestClient};
use http::StatusCode;
use hyper::Uri;
use madmin::service_commands::ServiceTraceOpts;

View File

@@ -1,7 +1,9 @@
use std::{collections::HashMap, str::from_utf8};
use crate::{
admin::{router::Operation, utils::has_space_be},
auth::{check_key_valid, get_condition_values, get_session_token},
};
use ecstore::global::get_global_action_cred;
use http::{HeaderMap, StatusCode};
use iam::get_global_action_cred;
use madmin::{AccountStatus, AddOrUpdateUserReq};
use matchit::Params;
use policy::policy::{
@@ -11,13 +13,9 @@ use policy::policy::{
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use std::{collections::HashMap, str::from_utf8};
use tracing::warn;
use crate::{
admin::{router::Operation, utils::has_space_be},
auth::{check_key_valid, get_condition_values, get_session_token},
};
#[derive(Debug, Deserialize, Default)]
pub struct AddUserQuery {
#[serde(rename = "accessKey")]

View File

@@ -1,5 +1,4 @@
use base64::{Engine as _, engine::general_purpose};
use hmac::{Hmac, Mac};
use ecstore::rpc::verify_rpc_signature;
use hyper::HeaderMap;
use hyper::Method;
use hyper::StatusCode;
@@ -14,80 +13,11 @@ use s3s::S3Result;
use s3s::header;
use s3s::route::S3Route;
use s3s::s3_error;
use sha2::Sha256;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::error;
use super::ADMIN_PREFIX;
use super::RUSTFS_ADMIN_PREFIX;
use super::rpc::RPC_PREFIX;
use iam::get_global_action_cred;
type HmacSha256 = Hmac<Sha256>;
const SIGNATURE_HEADER: &str = "x-rustfs-signature";
const TIMESTAMP_HEADER: &str = "x-rustfs-timestamp";
const SIGNATURE_VALID_DURATION: u64 = 300; // 5 minutes
/// Get the shared secret for HMAC signing
fn get_shared_secret() -> String {
if let Some(cred) = get_global_action_cred() {
cred.secret_key
} else {
// Fallback to environment variable if global credentials are not available
std::env::var("RUSTFS_RPC_SECRET").unwrap_or_else(|_| "rustfs-default-secret".to_string())
}
}
/// Generate HMAC-SHA256 signature for the given data
fn generate_signature(secret: &str, url: &str, method: &str, timestamp: u64) -> String {
let data = format!("{}|{}|{}", url, method, timestamp);
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
mac.update(data.as_bytes());
let result = mac.finalize();
general_purpose::STANDARD.encode(result.into_bytes())
}
/// Verify the request signature for RPC requests
fn verify_rpc_signature(req: &S3Request<Body>) -> S3Result<()> {
let secret = get_shared_secret();
// Get signature from header
let signature = req
.headers
.get(SIGNATURE_HEADER)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| s3_error!(InvalidArgument, "Missing signature header"))?;
// Get timestamp from header
let timestamp_str = req
.headers
.get(TIMESTAMP_HEADER)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| s3_error!(InvalidArgument, "Missing timestamp header"))?;
let timestamp: u64 = timestamp_str
.parse()
.map_err(|_| s3_error!(InvalidArgument, "Invalid timestamp format"))?;
// Check timestamp validity (prevent replay attacks)
let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
if current_time.saturating_sub(timestamp) > SIGNATURE_VALID_DURATION {
return Err(s3_error!(InvalidArgument, "Request timestamp expired"));
}
// Generate expected signature
let url = req.uri.to_string();
let method = req.method.as_str();
let expected_signature = generate_signature(&secret, &url, method, timestamp);
// Compare signatures
if signature != expected_signature {
return Err(s3_error!(AccessDenied, "Invalid signature"));
}
Ok(())
}
pub struct S3Router<T> {
router: Router<T>,
@@ -160,7 +90,10 @@ where
if req.uri.path().starts_with(RPC_PREFIX) {
// Skip signature verification for HEAD requests (health checks)
if req.method != Method::HEAD {
verify_rpc_signature(req)?;
verify_rpc_signature(&req.uri.to_string(), &req.method, &req.headers).map_err(|e| {
error!("RPC signature verification failed: {}", e);
s3_error!(AccessDenied, "{}", e)
})?;
}
return Ok(());
}

View File

@@ -1,7 +1,6 @@
use super::router::AdminOperation;
use super::router::Operation;
use super::router::S3Router;
use crate::storage::ecfs::bytes_stream;
use ecstore::disk::DiskAPI;
use ecstore::disk::WalkDirOptions;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
@@ -10,6 +9,7 @@ use futures::StreamExt;
use http::StatusCode;
use hyper::Method;
use matchit::Params;
use rustfs_utils::net::bytes_stream;
use s3s::Body;
use s3s::S3Request;
use s3s::S3Response;

View File

@@ -1,9 +1,7 @@
use std::collections::HashMap;
use ecstore::global::get_global_action_cred;
use http::HeaderMap;
use http::Uri;
use iam::error::Error as IamError;
use iam::get_global_action_cred;
use iam::sys::SESSION_POLICY_NAME;
use policy::auth;
use policy::auth::get_claims_from_token_with_secret;
@@ -15,6 +13,7 @@ use s3s::auth::SecretKey;
use s3s::auth::SimpleAuth;
use s3s::s3_error;
use serde_json::Value;
use std::collections::HashMap;
pub struct IAMAuth {
simple_auth: SimpleAuth,

View File

@@ -4,7 +4,7 @@ mod config;
mod console;
mod error;
mod event;
mod grpc;
// mod grpc;
pub mod license;
mod logging;
mod server;
@@ -28,6 +28,7 @@ use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::rpc::make_server;
use ecstore::store_api::BucketOptions;
use ecstore::{
endpoints::EndpointServerPools,
@@ -37,7 +38,6 @@ use ecstore::{
update_erasure_type,
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
use grpc::make_server;
use http::{HeaderMap, Request as HttpRequest, Response};
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::{
@@ -129,7 +129,7 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("server_address {}", &server_address);
// Set up AK and SK
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?;
ecstore::global::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()));
set_global_rustfs_port(server_port);

View File

@@ -8,6 +8,7 @@ use crate::storage::access::ReqInfo;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::{extract_metadata_from_mime, get_opts};
use api::object_store::bytes_stream;
use api::query::Context;
use api::query::Query;
use api::server::dbms::DatabaseManagerSystem;
@@ -52,8 +53,7 @@ use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI; // use ecstore::store_api::RESERVED_METADATA_PREFIX;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use http::HeaderMap;
use lazy_static::lazy_static;
use policy::auth;
@@ -95,7 +95,6 @@ use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
macro_rules! try_ {
@@ -2414,24 +2413,3 @@ impl S3 for FS {
}))
}
}
#[allow(dead_code)]
pub fn bytes_stream<S, E>(stream: S, content_length: usize) -> impl Stream<Item = std::result::Result<Bytes, E>> + Send + 'static
where
S: Stream<Item = std::result::Result<Bytes, E>> + Send + 'static,
E: Send + 'static,
{
AsyncTryStream::<Bytes, E, _>::new(|mut y| async move {
pin_mut!(stream);
let mut remaining: usize = content_length;
while let Some(result) = stream.next().await {
let mut bytes = result?;
if bytes.len() > remaining {
bytes.truncate(remaining);
}
remaining -= bytes.len();
y.yield_ok(bytes).await;
}
Ok(())
})
}