diff --git a/.github/dependabot.yml b/.github/dependabot.yml index de42741d..50cdb35d 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -28,6 +28,9 @@ updates: time: "08:00" groups: s3s: + update-types: + - "minor" + - "patch" patterns: - "s3s" - "s3s-*" diff --git a/Cargo.lock b/Cargo.lock index a1258c03..4443da55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3483,9 +3483,9 @@ dependencies = [ [[package]] name = "fs-err" -version = "3.1.3" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ad492b2cf1d89d568a43508ab24f98501fe03f2f31c01e1d0fe7366a71745d2" +checksum = "62d91fd049c123429b018c47887d3f75a265540dd3c30ba9cb7bae9197edb03a" dependencies = [ "autocfg", "tokio", @@ -7268,7 +7268,6 @@ dependencies = [ "chrono", "md5", "moka", - "once_cell", "rand 0.10.0-rc.5", "reqwest", "serde", diff --git a/crates/kms/Cargo.toml b/crates/kms/Cargo.toml index 18859588..5e9e0159 100644 --- a/crates/kms/Cargo.toml +++ b/crates/kms/Cargo.toml @@ -37,7 +37,6 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } -once_cell = { workspace = true } # Cryptography aes-gcm = { workspace = true } diff --git a/crates/kms/src/api_types.rs b/crates/kms/src/api_types.rs index 2d2d1d82..fffe0434 100644 --- a/crates/kms/src/api_types.rs +++ b/crates/kms/src/api_types.rs @@ -14,7 +14,7 @@ //! API types for KMS dynamic configuration -use crate::config::{KmsBackend, KmsConfig, VaultAuthMethod}; +use crate::config::{BackendConfig, CacheConfig, KmsBackend, KmsConfig, LocalConfig, TlsConfig, VaultAuthMethod, VaultConfig}; use crate::service_manager::KmsServiceStatus; use crate::types::{KeyMetadata, KeyUsage}; use serde::{Deserialize, Serialize}; @@ -212,12 +212,12 @@ impl From<&KmsConfig> for KmsConfigSummary { }; let backend_summary = match &config.backend_config { - crate::config::BackendConfig::Local(local_config) => BackendSummary::Local { + BackendConfig::Local(local_config) => BackendSummary::Local { key_dir: local_config.key_dir.clone(), has_master_key: local_config.master_key.is_some(), file_permissions: local_config.file_permissions, }, - crate::config::BackendConfig::Vault(vault_config) => BackendSummary::Vault { + BackendConfig::Vault(vault_config) => BackendSummary::Vault { address: vault_config.address.clone(), auth_method_type: match &vault_config.auth_method { VaultAuthMethod::Token { .. } => "token".to_string(), @@ -248,7 +248,7 @@ impl ConfigureLocalKmsRequest { KmsConfig { backend: KmsBackend::Local, default_key_id: self.default_key_id.clone(), - backend_config: crate::config::BackendConfig::Local(crate::config::LocalConfig { + backend_config: BackendConfig::Local(LocalConfig { key_dir: self.key_dir.clone(), master_key: self.master_key.clone(), file_permissions: self.file_permissions, @@ -256,7 +256,7 @@ impl ConfigureLocalKmsRequest { timeout: Duration::from_secs(self.timeout_seconds.unwrap_or(30)), retry_attempts: self.retry_attempts.unwrap_or(3), enable_cache: self.enable_cache.unwrap_or(true), - cache_config: crate::config::CacheConfig { + cache_config: CacheConfig { max_keys: self.max_cached_keys.unwrap_or(1000), ttl: Duration::from_secs(self.cache_ttl_seconds.unwrap_or(3600)), enable_metrics: true, @@ -271,7 +271,7 @@ impl ConfigureVaultKmsRequest { KmsConfig { backend: KmsBackend::Vault, default_key_id: self.default_key_id.clone(), - backend_config: crate::config::BackendConfig::Vault(crate::config::VaultConfig { + backend_config: BackendConfig::Vault(VaultConfig { address: self.address.clone(), auth_method: self.auth_method.clone(), namespace: self.namespace.clone(), @@ -279,7 +279,7 @@ impl ConfigureVaultKmsRequest { kv_mount: self.kv_mount.clone().unwrap_or_else(|| "secret".to_string()), key_path_prefix: self.key_path_prefix.clone().unwrap_or_else(|| "rustfs/kms/keys".to_string()), tls: if self.skip_tls_verify.unwrap_or(false) { - Some(crate::config::TlsConfig { + Some(TlsConfig { ca_cert_path: None, client_cert_path: None, client_key_path: None, @@ -292,7 +292,7 @@ impl ConfigureVaultKmsRequest { timeout: Duration::from_secs(self.timeout_seconds.unwrap_or(30)), retry_attempts: self.retry_attempts.unwrap_or(3), enable_cache: self.enable_cache.unwrap_or(true), - cache_config: crate::config::CacheConfig { + cache_config: CacheConfig { max_keys: self.max_cached_keys.unwrap_or(1000), ttl: Duration::from_secs(self.cache_ttl_seconds.unwrap_or(3600)), enable_metrics: true, diff --git a/crates/kms/src/backends/mod.rs b/crates/kms/src/backends/mod.rs index 03637198..9d4550d1 100644 --- a/crates/kms/src/backends/mod.rs +++ b/crates/kms/src/backends/mod.rs @@ -200,6 +200,16 @@ pub struct BackendInfo { impl BackendInfo { /// Create a new backend info + /// + /// # Arguments + /// * `backend_type` - The type of the backend + /// * `version` - The version of the backend + /// * `endpoint` - The endpoint or location of the backend + /// * `healthy` - Whether the backend is healthy + /// + /// # Returns + /// A new BackendInfo instance + /// pub fn new(backend_type: String, version: String, endpoint: String, healthy: bool) -> Self { Self { backend_type, @@ -211,6 +221,14 @@ impl BackendInfo { } /// Add metadata to the backend info + /// + /// # Arguments + /// * `key` - Metadata key + /// * `value` - Metadata value + /// + /// # Returns + /// Updated BackendInfo instance + /// pub fn with_metadata(mut self, key: String, value: String) -> Self { self.metadata.insert(key, value); self diff --git a/crates/kms/src/cache.rs b/crates/kms/src/cache.rs index 8b8b763e..e8d44585 100644 --- a/crates/kms/src/cache.rs +++ b/crates/kms/src/cache.rs @@ -34,6 +34,13 @@ pub struct KmsCache { impl KmsCache { /// Create a new KMS cache with the specified capacity + /// + /// # Arguments + /// * `capacity` - Maximum number of entries in the cache + /// + /// # Returns + /// A new instance of `KmsCache` + /// pub fn new(capacity: u64) -> Self { Self { key_metadata_cache: Cache::builder() @@ -48,22 +55,47 @@ impl KmsCache { } /// Get key metadata from cache + /// + /// # Arguments + /// * `key_id` - The ID of the key to retrieve metadata for + /// + /// # Returns + /// An `Option` containing the `KeyMetadata` if found, or `None` if not found + /// pub async fn get_key_metadata(&self, key_id: &str) -> Option { self.key_metadata_cache.get(key_id).await } /// Put key metadata into cache + /// + /// # Arguments + /// * `key_id` - The ID of the key to store metadata for + /// * `metadata` - The `KeyMetadata` to store in the cache + /// pub async fn put_key_metadata(&mut self, key_id: &str, metadata: &KeyMetadata) { self.key_metadata_cache.insert(key_id.to_string(), metadata.clone()).await; self.key_metadata_cache.run_pending_tasks().await; } /// Get data key from cache + /// + /// # Arguments + /// * `key_id` - The ID of the key to retrieve the data key for + /// + /// # Returns + /// An `Option` containing the `CachedDataKey` if found, or `None` if not found + /// pub async fn get_data_key(&self, key_id: &str) -> Option { self.data_key_cache.get(key_id).await } /// Put data key into cache + /// + /// # Arguments + /// * `key_id` - The ID of the key to store the data key for + /// * `plaintext` - The plaintext data key bytes + /// * `ciphertext` - The ciphertext data key bytes + /// pub async fn put_data_key(&mut self, key_id: &str, plaintext: &[u8], ciphertext: &[u8]) { let cached_key = CachedDataKey { plaintext: plaintext.to_vec(), @@ -75,11 +107,19 @@ impl KmsCache { } /// Remove key metadata from cache + /// + /// # Arguments + /// * `key_id` - The ID of the key to remove metadata for + /// pub async fn remove_key_metadata(&mut self, key_id: &str) { self.key_metadata_cache.remove(key_id).await; } /// Remove data key from cache + /// + /// # Arguments + /// * `key_id` - The ID of the key to remove the data key for + /// pub async fn remove_data_key(&mut self, key_id: &str) { self.data_key_cache.remove(key_id).await; } @@ -95,6 +135,10 @@ impl KmsCache { } /// Get cache statistics (hit count, miss count) + /// + /// # Returns + /// A tuple containing total entries and total misses + /// pub fn stats(&self) -> (u64, u64) { let metadata_stats = ( self.key_metadata_cache.entry_count(), diff --git a/crates/kms/src/encryption/ciphers.rs b/crates/kms/src/encryption/ciphers.rs index 7143bce7..5081e6cc 100644 --- a/crates/kms/src/encryption/ciphers.rs +++ b/crates/kms/src/encryption/ciphers.rs @@ -52,6 +52,16 @@ pub struct AesCipher { impl AesCipher { /// Create a new AES cipher with the given key + /// + /// #Arguments + /// * `key` - A byte slice representing the AES-256 key (32 bytes) + /// + /// #Errors + /// Returns `KmsError` if the key size is invalid + /// + /// #Returns + /// A Result containing the AesCipher instance + /// pub fn new(key: &[u8]) -> Result { if key.len() != 32 { return Err(KmsError::invalid_key_size(32, key.len())); @@ -142,6 +152,16 @@ pub struct ChaCha20Cipher { impl ChaCha20Cipher { /// Create a new ChaCha20 cipher with the given key + /// + /// #Arguments + /// * `key` - A byte slice representing the ChaCha20-Poly1305 key (32 bytes) + /// + /// #Errors + /// Returns `KmsError` if the key size is invalid + /// + /// #Returns + /// A Result containing the ChaCha20Cipher instance + /// pub fn new(key: &[u8]) -> Result { if key.len() != 32 { return Err(KmsError::invalid_key_size(32, key.len())); @@ -228,6 +248,14 @@ impl ObjectCipher for ChaCha20Cipher { } /// Create a cipher instance for the given algorithm and key +/// +/// #Arguments +/// * `algorithm` - The encryption algorithm to use +/// * `key` - A byte slice representing the encryption key +/// +/// #Returns +/// A Result containing a boxed ObjectCipher instance +/// pub fn create_cipher(algorithm: &EncryptionAlgorithm, key: &[u8]) -> Result> { match algorithm { EncryptionAlgorithm::Aes256 | EncryptionAlgorithm::AwsKms => Ok(Box::new(AesCipher::new(key)?)), @@ -236,6 +264,13 @@ pub fn create_cipher(algorithm: &EncryptionAlgorithm, key: &[u8]) -> Result Vec { let iv_size = match algorithm { EncryptionAlgorithm::Aes256 | EncryptionAlgorithm::AwsKms => 12, diff --git a/crates/kms/src/encryption/service.rs b/crates/kms/src/encryption/service.rs index 94ed0c50..f4b973eb 100644 --- a/crates/kms/src/encryption/service.rs +++ b/crates/kms/src/encryption/service.rs @@ -18,6 +18,12 @@ use crate::encryption::ciphers::{create_cipher, generate_iv}; use crate::error::{KmsError, Result}; use crate::manager::KmsManager; use crate::types::*; +use base64::Engine; +use rand::random; +use std::collections::HashMap; +use std::io::Cursor; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tracing::{debug, info}; use zeroize::Zeroize; /// Data key for object encryption @@ -36,12 +42,6 @@ impl Drop for DataKey { self.plaintext_key.zeroize(); } } -use base64::Engine; -use rand::random; -use std::collections::HashMap; -use std::io::Cursor; -use tokio::io::{AsyncRead, AsyncReadExt}; -use tracing::{debug, info}; /// Service for encrypting and decrypting S3 objects with KMS integration pub struct ObjectEncryptionService { @@ -59,51 +59,110 @@ pub struct EncryptionResult { impl ObjectEncryptionService { /// Create a new object encryption service + /// + /// # Arguments + /// * `kms_manager` - KMS manager to use for key operations + /// + /// # Returns + /// New ObjectEncryptionService instance + /// pub fn new(kms_manager: KmsManager) -> Self { Self { kms_manager } } /// Create a new master key (delegates to KMS manager) + /// + /// # Arguments + /// * `request` - CreateKeyRequest with key parameters + /// + /// # Returns + /// CreateKeyResponse with created key details + /// pub async fn create_key(&self, request: CreateKeyRequest) -> Result { self.kms_manager.create_key(request).await } /// Describe a master key (delegates to KMS manager) + /// + /// # Arguments + /// * `request` - DescribeKeyRequest with key ID + /// + /// # Returns + /// DescribeKeyResponse with key metadata + /// pub async fn describe_key(&self, request: DescribeKeyRequest) -> Result { self.kms_manager.describe_key(request).await } /// List master keys (delegates to KMS manager) + /// + /// # Arguments + /// * `request` - ListKeysRequest with listing parameters + /// + /// # Returns + /// ListKeysResponse with list of keys + /// pub async fn list_keys(&self, request: ListKeysRequest) -> Result { self.kms_manager.list_keys(request).await } /// Generate a data encryption key (delegates to KMS manager) + /// + /// # Arguments + /// * `request` - GenerateDataKeyRequest with key parameters + /// + /// # Returns + /// GenerateDataKeyResponse with generated key details + /// pub async fn generate_data_key(&self, request: GenerateDataKeyRequest) -> Result { self.kms_manager.generate_data_key(request).await } /// Get the default key ID + /// + /// # Returns + /// Option with default key ID if configured + /// pub fn get_default_key_id(&self) -> Option<&String> { self.kms_manager.get_default_key_id() } /// Get cache statistics + /// + /// # Returns + /// Option with (hits, misses) if caching is enabled + /// pub async fn cache_stats(&self) -> Option<(u64, u64)> { self.kms_manager.cache_stats().await } /// Clear the cache + /// + /// # Returns + /// Result indicating success or failure + /// pub async fn clear_cache(&self) -> Result<()> { self.kms_manager.clear_cache().await } /// Get backend health status + /// + /// # Returns + /// Result indicating if backend is healthy + /// pub async fn health_check(&self) -> Result { self.kms_manager.health_check().await } /// Create a data encryption key for object encryption + /// + /// # Arguments + /// * `kms_key_id` - Optional KMS key ID to use (uses default if None) + /// * `context` - ObjectEncryptionContext with bucket and object key + /// + /// # Returns + /// Tuple with DataKey and encrypted key blob + /// pub async fn create_data_key( &self, kms_key_id: &Option, @@ -146,6 +205,14 @@ impl ObjectEncryptionService { } /// Decrypt a data encryption key + /// + /// # Arguments + /// * `encrypted_key` - Encrypted data key blob + /// * `context` - ObjectEncryptionContext with bucket and object key + /// + /// # Returns + /// DataKey with decrypted key + /// pub async fn decrypt_data_key(&self, encrypted_key: &[u8], _context: &ObjectEncryptionContext) -> Result { let decrypt_request = DecryptRequest { ciphertext: encrypted_key.to_vec(), @@ -429,6 +496,17 @@ impl ObjectEncryptionService { } /// Decrypt object with customer-provided key (SSE-C) + /// + /// # Arguments + /// * `bucket` - S3 bucket name + /// * `object_key` - S3 object key + /// * `ciphertext` - Encrypted data + /// * `metadata` - Encryption metadata + /// * `customer_key` - Customer-provided 256-bit key + /// + /// # Returns + /// Decrypted data as a reader + /// pub async fn decrypt_object_with_customer_key( &self, bucket: &str, @@ -481,6 +559,14 @@ impl ObjectEncryptionService { } /// Validate encryption context + /// + /// # Arguments + /// * `actual` - Actual encryption context from metadata + /// * `expected` - Expected encryption context to validate against + /// + /// # Returns + /// Result indicating success or context mismatch + /// fn validate_encryption_context(&self, actual: &HashMap, expected: &HashMap) -> Result<()> { for (key, expected_value) in expected { match actual.get(key) { @@ -499,6 +585,13 @@ impl ObjectEncryptionService { } /// Convert encryption metadata to HTTP headers for S3 compatibility + /// + /// # Arguments + /// * `metadata` - EncryptionMetadata to convert + /// + /// # Returns + /// HashMap of HTTP headers + /// pub fn metadata_to_headers(&self, metadata: &EncryptionMetadata) -> HashMap { let mut headers = HashMap::new(); @@ -542,6 +635,13 @@ impl ObjectEncryptionService { } /// Parse encryption metadata from HTTP headers + /// + /// # Arguments + /// * `headers` - HashMap of HTTP headers + /// + /// # Returns + /// EncryptionMetadata parsed from headers + /// pub fn headers_to_metadata(&self, headers: &HashMap) -> Result { let algorithm = headers .get("x-amz-server-side-encryption") diff --git a/crates/kms/src/error.rs b/crates/kms/src/error.rs index 740bed14..f960156b 100644 --- a/crates/kms/src/error.rs +++ b/crates/kms/src/error.rs @@ -116,7 +116,7 @@ impl KmsError { Self::BackendError { message: message.into() } } - /// Create an access denied error + /// Create access denied error pub fn access_denied>(message: S) -> Self { Self::AccessDenied { message: message.into() } } @@ -184,7 +184,7 @@ impl KmsError { } } -// Convert from standard library errors +/// Convert from standard library errors impl From for KmsError { fn from(error: std::io::Error) -> Self { Self::IoError { @@ -206,6 +206,13 @@ impl From for KmsError { impl KmsError { /// Create a KMS error from AES-GCM error + /// + /// #Arguments + /// * `error` - The AES-GCM error to convert + /// + /// #Returns + /// * `KmsError` - The corresponding KMS error + /// pub fn from_aes_gcm_error(error: aes_gcm::Error) -> Self { Self::CryptographicError { operation: "AES-GCM".to_string(), @@ -214,6 +221,13 @@ impl KmsError { } /// Create a KMS error from ChaCha20-Poly1305 error + /// + /// #Arguments + /// * `error` - The ChaCha20-Poly1305 error to convert + /// + /// #Returns + /// * `KmsError` - The corresponding KMS error + /// pub fn from_chacha20_error(error: chacha20poly1305::Error) -> Self { Self::CryptographicError { operation: "ChaCha20-Poly1305".to_string(), diff --git a/crates/kms/src/service_manager.rs b/crates/kms/src/service_manager.rs index f877e010..59388549 100644 --- a/crates/kms/src/service_manager.rs +++ b/crates/kms/src/service_manager.rs @@ -19,7 +19,7 @@ use crate::config::{BackendConfig, KmsConfig}; use crate::encryption::service::ObjectEncryptionService; use crate::error::{KmsError, Result}; use crate::manager::KmsManager; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use tokio::sync::RwLock; use tracing::{error, info, warn}; @@ -71,7 +71,7 @@ impl KmsServiceManager { /// Configure KMS with new configuration pub async fn configure(&self, new_config: KmsConfig) -> Result<()> { - tracing::info!("CLAUDE DEBUG: configure() called with backend: {:?}", new_config.backend); + info!("CLAUDE DEBUG: configure() called with backend: {:?}", new_config.backend); info!("Configuring KMS with backend: {:?}", new_config.backend); // Update configuration @@ -92,7 +92,7 @@ impl KmsServiceManager { /// Start KMS service with current configuration pub async fn start(&self) -> Result<()> { - tracing::info!("CLAUDE DEBUG: start() called"); + info!("CLAUDE DEBUG: start() called"); let config = { let config_guard = self.config.read().await; match config_guard.as_ref() { @@ -254,7 +254,7 @@ impl Default for KmsServiceManager { } /// Global KMS service manager instance -static GLOBAL_KMS_SERVICE_MANAGER: once_cell::sync::OnceCell> = once_cell::sync::OnceCell::new(); +static GLOBAL_KMS_SERVICE_MANAGER: OnceLock> = OnceLock::new(); /// Initialize global KMS service manager pub fn init_global_kms_service_manager() -> Arc { @@ -270,12 +270,12 @@ pub fn get_global_kms_service_manager() -> Option> { /// Get global encryption service (if KMS is running) pub async fn get_global_encryption_service() -> Option> { - tracing::info!("CLAUDE DEBUG: get_global_encryption_service called"); + info!("CLAUDE DEBUG: get_global_encryption_service called"); let manager = get_global_kms_service_manager().unwrap_or_else(|| { - tracing::warn!("CLAUDE DEBUG: KMS service manager not initialized, initializing now as fallback"); + warn!("CLAUDE DEBUG: KMS service manager not initialized, initializing now as fallback"); init_global_kms_service_manager() }); let service = manager.get_encryption_service().await; - tracing::info!("CLAUDE DEBUG: get_encryption_service returned: {}", service.is_some()); + info!("CLAUDE DEBUG: get_encryption_service returned: {}", service.is_some()); service } diff --git a/crates/kms/src/types.rs b/crates/kms/src/types.rs index 50b97f39..e53d0b49 100644 --- a/crates/kms/src/types.rs +++ b/crates/kms/src/types.rs @@ -42,6 +42,17 @@ pub struct DataKey { impl DataKey { /// Create a new data key + /// + /// # Arguments + /// * `key_id` - Unique identifier for the key + /// * `version` - Key version number + /// * `plaintext` - Optional plaintext key material + /// * `ciphertext` - Encrypted key material + /// * `key_spec` - Key specification (e.g., "AES_256") + /// + /// # Returns + /// A new `DataKey` instance + /// pub fn new(key_id: String, version: u32, plaintext: Option>, ciphertext: Vec, key_spec: String) -> Self { Self { key_id, @@ -55,6 +66,11 @@ impl DataKey { } /// Clear the plaintext key material from memory for security + /// + /// # Security + /// This method zeroes out the plaintext key material before dropping it + /// to prevent sensitive data from lingering in memory. + /// pub fn clear_plaintext(&mut self) { if let Some(ref mut plaintext) = self.plaintext { // Zero out the memory before dropping @@ -64,6 +80,14 @@ impl DataKey { } /// Add metadata to the data key + /// + /// # Arguments + /// * `key` - Metadata key + /// * `value` - Metadata value + /// + /// # Returns + /// Updated `DataKey` instance with added metadata + /// pub fn with_metadata(mut self, key: String, value: String) -> Self { self.metadata.insert(key, value); self @@ -97,6 +121,15 @@ pub struct MasterKey { impl MasterKey { /// Create a new master key + /// + /// # Arguments + /// * `key_id` - Unique identifier for the key + /// * `algorithm` - Key algorithm (e.g., "AES-256") + /// * `created_by` - Optional creator/owner of the key + /// + /// # Returns + /// A new `MasterKey` instance + /// pub fn new(key_id: String, algorithm: String, created_by: Option) -> Self { Self { key_id, @@ -113,6 +146,16 @@ impl MasterKey { } /// Create a new master key with description + /// + /// # Arguments + /// * `key_id` - Unique identifier for the key + /// * `algorithm` - Key algorithm (e.g., "AES-256") + /// * `created_by` - Optional creator/owner of the key + /// * `description` - Optional key description + /// + /// # Returns + /// A new `MasterKey` instance with description + /// pub fn new_with_description( key_id: String, algorithm: String, @@ -218,6 +261,14 @@ pub struct GenerateKeyRequest { impl GenerateKeyRequest { /// Create a new generate key request + /// + /// # Arguments + /// * `master_key_id` - Master key ID to use for encryption + /// * `key_spec` - Key specification (e.g., "AES_256") + /// + /// # Returns + /// A new `GenerateKeyRequest` instance + /// pub fn new(master_key_id: String, key_spec: String) -> Self { Self { master_key_id, @@ -229,12 +280,27 @@ impl GenerateKeyRequest { } /// Add encryption context + /// + /// # Arguments + /// * `key` - Context key + /// * `value` - Context value + /// + /// # Returns + /// Updated `GenerateKeyRequest` instance with added context + /// pub fn with_context(mut self, key: String, value: String) -> Self { self.encryption_context.insert(key, value); self } /// Set key length explicitly + /// + /// # Arguments + /// * `length` - Key length in bytes + /// + /// # Returns + /// Updated `GenerateKeyRequest` instance with specified key length + /// pub fn with_length(mut self, length: u32) -> Self { self.key_length = Some(length); self @@ -256,6 +322,14 @@ pub struct EncryptRequest { impl EncryptRequest { /// Create a new encrypt request + /// + /// # Arguments + /// * `key_id` - Key ID to use for encryption + /// * `plaintext` - Plaintext data to encrypt + /// + /// # Returns + /// A new `EncryptRequest` instance + /// pub fn new(key_id: String, plaintext: Vec) -> Self { Self { key_id, @@ -266,6 +340,14 @@ impl EncryptRequest { } /// Add encryption context + /// + /// # Arguments + /// * `key` - Context key + /// * `value` - Context value + /// + /// # Returns + /// Updated `EncryptRequest` instance with added context + /// pub fn with_context(mut self, key: String, value: String) -> Self { self.encryption_context.insert(key, value); self @@ -298,6 +380,13 @@ pub struct DecryptRequest { impl DecryptRequest { /// Create a new decrypt request + /// + /// # Arguments + /// * `ciphertext` - Ciphertext to decrypt + /// + /// # Returns + /// A new `DecryptRequest` instance + /// pub fn new(ciphertext: Vec) -> Self { Self { ciphertext, @@ -307,6 +396,14 @@ impl DecryptRequest { } /// Add encryption context + /// + /// # Arguments + /// * `key` - Context key + /// * `value` - Context value + /// + /// # Returns + /// Updated `DecryptRequest` instance with added context + /// pub fn with_context(mut self, key: String, value: String) -> Self { self.encryption_context.insert(key, value); self @@ -365,6 +462,13 @@ pub struct OperationContext { impl OperationContext { /// Create a new operation context + /// + /// # Arguments + /// * `principal` - User or service performing the operation + /// + /// # Returns + /// A new `OperationContext` instance + /// pub fn new(principal: String) -> Self { Self { operation_id: Uuid::new_v4(), @@ -376,18 +480,40 @@ impl OperationContext { } /// Add additional context + /// + /// # Arguments + /// * `key` - Context key + /// * `value` - Context value + /// + /// # Returns + /// Updated `OperationContext` instance with added context + /// pub fn with_context(mut self, key: String, value: String) -> Self { self.additional_context.insert(key, value); self } /// Set source IP + /// + /// # Arguments + /// * `ip` - Source IP address + /// + /// # Returns + /// Updated `OperationContext` instance with source IP + /// pub fn with_source_ip(mut self, ip: String) -> Self { self.source_ip = Some(ip); self } /// Set user agent + /// + /// # Arguments + /// * `agent` - User agent string + /// + /// # Returns + /// Updated `OperationContext` instance with user agent + /// pub fn with_user_agent(mut self, agent: String) -> Self { self.user_agent = Some(agent); self @@ -411,6 +537,14 @@ pub struct ObjectEncryptionContext { impl ObjectEncryptionContext { /// Create a new object encryption context + /// + /// # Arguments + /// * `bucket` - Bucket name + /// * `object_key` - Object key + /// + /// # Returns + /// A new `ObjectEncryptionContext` instance + /// pub fn new(bucket: String, object_key: String) -> Self { Self { bucket, @@ -422,18 +556,40 @@ impl ObjectEncryptionContext { } /// Set content type + /// + /// # Arguments + /// * `content_type` - Content type string + /// + /// # Returns + /// Updated `ObjectEncryptionContext` instance with content type + /// pub fn with_content_type(mut self, content_type: String) -> Self { self.content_type = Some(content_type); self } /// Set object size + /// + /// # Arguments + /// * `size` - Object size in bytes + /// + /// # Returns + /// Updated `ObjectEncryptionContext` instance with size + /// pub fn with_size(mut self, size: u64) -> Self { self.size = Some(size); self } /// Add encryption context + /// + /// # Arguments + /// * `key` - Context key + /// * `value` - Context value + /// + /// # Returns + /// Updated `ObjectEncryptionContext` instance with added context + /// pub fn with_encryption_context(mut self, key: String, value: String) -> Self { self.encryption_context.insert(key, value); self @@ -503,6 +659,10 @@ pub enum KeySpec { impl KeySpec { /// Get the key size in bytes + /// + /// # Returns + /// Key size in bytes + /// pub fn key_size(&self) -> usize { match self { Self::Aes256 => 32, @@ -512,6 +672,10 @@ impl KeySpec { } /// Get the string representation for backends + /// + /// # Returns + /// Key specification as a string + /// pub fn as_str(&self) -> &'static str { match self { Self::Aes256 => "AES_256", @@ -636,6 +800,14 @@ pub struct GenerateDataKeyRequest { impl GenerateDataKeyRequest { /// Create a new generate data key request + /// + /// # Arguments + /// * `key_id` - Key ID to use for encryption + /// * `key_spec` - Key specification + /// + /// # Returns + /// A new `GenerateDataKeyRequest` instance + /// pub fn new(key_id: String, key_spec: KeySpec) -> Self { Self { key_id, @@ -658,6 +830,10 @@ pub struct GenerateDataKeyResponse { impl EncryptionAlgorithm { /// Get the algorithm name as a string + /// + /// # Returns + /// Algorithm name as a string + /// pub fn as_str(&self) -> &'static str { match self { Self::Aes256 => "AES256", diff --git a/crates/protos/src/main.rs b/crates/protos/src/main.rs index 223bfaa9..5184ed8a 100644 --- a/crates/protos/src/main.rs +++ b/crates/protos/src/main.rs @@ -16,8 +16,8 @@ use std::{cmp, env, fs, io::Write, path::Path, process::Command}; type AnyError = Box; -const VERSION_PROTOBUF: Version = Version(27, 2, 0); // 27.2.0 -const VERSION_FLATBUFFERS: Version = Version(24, 3, 25); // 24.3.25 +const VERSION_PROTOBUF: Version = Version(33, 1, 0); // 31.1.0 +const VERSION_FLATBUFFERS: Version = Version(25, 9, 23); // 25.9.23 /// Build protos if the major version of `flatc` or `protoc` is greater /// or lesser than the expected version. const ENV_BUILD_PROTOS: &str = "BUILD_PROTOS"; diff --git a/crates/targets/src/check.rs b/crates/targets/src/check.rs index cf2363d3..5e6ab53d 100644 --- a/crates/targets/src/check.rs +++ b/crates/targets/src/check.rs @@ -13,6 +13,7 @@ // limitations under the License. /// Check if MQTT Broker is available +/// /// # Arguments /// * `broker_url` - URL of MQTT Broker, for example `mqtt://localhost:1883` /// * `topic` - Topic for testing connections @@ -40,6 +41,7 @@ /// url = "2.5.7" /// tokio = { version = "1", features = ["full"] } /// ``` +/// pub async fn check_mqtt_broker_available(broker_url: &str, topic: &str) -> Result<(), String> { use rumqttc::{AsyncClient, MqttOptions, QoS}; let url = rustfs_utils::parse_url(broker_url).map_err(|e| format!("Broker URL parsing failed:{e}"))?; diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index ffb2b20c..76748a9a 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -92,5 +92,5 @@ hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:s os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities integration = [] # integration test features sys = ["dep:sysinfo"] # system information features -http = ["dep:convert_case", "dep:http"] +http = ["dep:convert_case", "dep:http", "dep:regex"] full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify", "http"] # all features diff --git a/crates/utils/src/certs.rs b/crates/utils/src/certs.rs index 9fc40bf3..24657f7a 100644 --- a/crates/utils/src/certs.rs +++ b/crates/utils/src/certs.rs @@ -26,6 +26,13 @@ use tracing::{debug, warn}; /// Load public certificate from file. /// This function loads a public certificate from the specified file. +/// +/// # Arguments +/// * `filename` - A string slice that holds the name of the file containing the public certificate. +/// +/// # Returns +/// * An io::Result containing a vector of CertificateDer if successful, or an io::Error if an error occurs during loading. +/// pub fn load_certs(filename: &str) -> io::Result>> { // Open certificate file. let cert_file = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {filename}: {e}")))?; @@ -43,6 +50,13 @@ pub fn load_certs(filename: &str) -> io::Result>> { /// Load private key from file. /// This function loads a private key from the specified file. +/// +/// # Arguments +/// * `filename` - A string slice that holds the name of the file containing the private key. +/// +/// # Returns +/// * An io::Result containing the PrivateKeyDer if successful, or an io::Error if an error occurs during loading. +/// pub fn load_private_key(filename: &str) -> io::Result> { // Open keyfile. let keyfile = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {filename}: {e}")))?; @@ -53,6 +67,14 @@ pub fn load_private_key(filename: &str) -> io::Result> { } /// error function +/// This function creates a new io::Error with the provided error message. +/// +/// # Arguments +/// * `err` - A string containing the error message. +/// +/// # Returns +/// * An io::Error instance with the specified error message. +/// pub fn certs_error(err: String) -> Error { Error::other(err) } @@ -61,6 +83,13 @@ pub fn certs_error(err: String) -> Error { /// This function loads all certificate and private key pairs from the specified directory. /// It looks for files named `rustfs_cert.pem` and `rustfs_key.pem` in each subdirectory. /// The root directory can also contain a default certificate/private key pair. +/// +/// # Arguments +/// * `dir_path` - A string slice that holds the path to the directory containing the certificates and private keys. +/// +/// # Returns +/// * An io::Result containing a HashMap where the keys are domain names (or "default" for the root certificate) and the values are tuples of (Vec, PrivateKeyDer). If no valid certificate/private key pairs are found, an io::Error is returned. +/// pub fn load_all_certs_from_directory( dir_path: &str, ) -> io::Result>, PrivateKeyDer<'static>)>> { @@ -137,6 +166,14 @@ pub fn load_all_certs_from_directory( /// loading a single certificate private key pair /// This function loads a certificate and private key from the specified paths. /// It returns a tuple containing the certificate and private key. +/// +/// # Arguments +/// * `cert_path` - A string slice that holds the path to the certificate file. +/// * `key_path` - A string slice that holds the path to the private key file +/// +/// # Returns +/// * An io::Result containing a tuple of (Vec, PrivateKeyDer) if successful, or an io::Error if an error occurs during loading. +/// fn load_cert_key_pair(cert_path: &str, key_path: &str) -> io::Result<(Vec>, PrivateKeyDer<'static>)> { let certs = load_certs(cert_path)?; let key = load_private_key(key_path)?; @@ -148,6 +185,12 @@ fn load_cert_key_pair(cert_path: &str, key_path: &str) -> io::Result<(Vec, PrivateKeyDer). +/// +/// # Returns +/// * An io::Result containing an implementation of ResolvesServerCert if successful, or an io::Error if an error occurs during loading. +/// pub fn create_multi_cert_resolver( cert_key_pairs: HashMap>, PrivateKeyDer<'static>)>, ) -> io::Result { @@ -195,6 +238,10 @@ pub fn create_multi_cert_resolver( } /// Checks if TLS key logging is enabled. +/// +/// # Returns +/// * A boolean indicating whether TLS key logging is enabled based on the `RUSTFS_TLS_KEYLOG` environment variable. +/// pub fn tls_key_log() -> bool { env::var("RUSTFS_TLS_KEYLOG") .map(|v| { @@ -203,6 +250,8 @@ pub fn tls_key_log() -> bool { || v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true") || v.eq_ignore_ascii_case("yes") + || v.eq_ignore_ascii_case("enabled") + || v.eq_ignore_ascii_case("t") }) .unwrap_or(false) } diff --git a/crates/utils/src/compress.rs b/crates/utils/src/compress.rs index 0e6c5829..a2686ef5 100644 --- a/crates/utils/src/compress.rs +++ b/crates/utils/src/compress.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::io::Write; +use std::{fmt, str}; use tokio::io; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] @@ -41,13 +42,13 @@ impl CompressionAlgorithm { } } -impl std::fmt::Display for CompressionAlgorithm { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for CompressionAlgorithm { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.as_str()) } } -impl std::str::FromStr for CompressionAlgorithm { - type Err = std::io::Error; +impl str::FromStr for CompressionAlgorithm { + type Err = io::Error; fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { @@ -63,6 +64,16 @@ impl std::str::FromStr for CompressionAlgorithm { } } +/// Compress a block of data using the specified compression algorithm. +/// Returns the compressed data as a Vec. +/// +/// # Arguments +/// * `input` - The input data to be compressed. +/// * `algorithm` - The compression algorithm to use. +/// +/// # Returns +/// * A Vec containing the compressed data. +/// pub fn compress_block(input: &[u8], algorithm: CompressionAlgorithm) -> Vec { match algorithm { CompressionAlgorithm::Gzip => { @@ -105,6 +116,16 @@ pub fn compress_block(input: &[u8], algorithm: CompressionAlgorithm) -> Vec } } +/// Decompress a block of data using the specified compression algorithm. +/// Returns the decompressed data as a Vec. +/// +/// # Arguments +/// * `compressed` - The compressed data to be decompressed. +/// * `algorithm` - The compression algorithm used for compression. +/// +/// # Returns +/// * A Result containing a Vec with the decompressed data, or an io::Error. +/// pub fn decompress_block(compressed: &[u8], algorithm: CompressionAlgorithm) -> io::Result> { match algorithm { CompressionAlgorithm::Gzip => { diff --git a/crates/utils/src/crypto.rs b/crates/utils/src/crypto.rs index 9c8bb8b6..5da7681e 100644 --- a/crates/utils/src/crypto.rs +++ b/crates/utils/src/crypto.rs @@ -12,45 +12,95 @@ // See the License for the specific language governing permissions and // limitations under the License. +use hex_simd::{AsOut, AsciiCase}; +use hmac::{Hmac, KeyInit, Mac}; +use hyper::body::Bytes; +use sha1::Sha1; +use sha2::{Digest, Sha256}; use std::mem::MaybeUninit; -use hex_simd::{AsOut, AsciiCase}; -use hyper::body::Bytes; - +/// Base64 URL safe encoding without padding +/// `base64_encode_url_safe_no_pad(input)` +/// +/// # Arguments +/// * `input` - A byte slice to be encoded +/// +/// # Returns +/// A `String` containing the Base64 URL safe encoded representation of the input data pub fn base64_encode_url_safe_no_pad(input: &[u8]) -> String { base64_simd::URL_SAFE_NO_PAD.encode_to_string(input) } +/// Base64 URL safe decoding without padding +/// `base64_decode_url_safe_no_pad(input)` +/// +/// # Arguments +/// * `input` - A byte slice containing the Base64 URL safe encoded data +/// +/// # Returns +/// A `Result` containing a `Vec` with the decoded data or a `base64_simd::Error` if decoding fails +/// +/// # Errors +/// This function will return an error if the input data is not valid Base64 URL safe encoded data +/// pub fn base64_decode_url_safe_no_pad(input: &[u8]) -> Result, base64_simd::Error> { base64_simd::URL_SAFE_NO_PAD.decode_to_vec(input) } +/// encode to hex string (lowercase) +/// `hex(data)` +/// +/// # Arguments +/// * `data` - A byte slice to be encoded +/// +/// # Returns +/// A `String` containing the hexadecimal representation of the input data in lowercase +/// pub fn hex(data: impl AsRef<[u8]>) -> String { hex_simd::encode_to_string(data, hex_simd::AsciiCase::Lower) } /// verify sha256 checksum string +/// +/// # Arguments +/// * `s` - A string slice to be verified +/// +/// # Returns +/// A `bool` indicating whether the input string is a valid SHA-256 checksum (64 +/// pub fn is_sha256_checksum(s: &str) -> bool { // TODO: optimize let is_lowercase_hex = |c: u8| matches!(c, b'0'..=b'9' | b'a'..=b'f'); s.len() == 64 && s.as_bytes().iter().copied().all(is_lowercase_hex) } +/// HMAC-SHA1 hashing /// `hmac_sha1(key, data)` +/// +/// # Arguments +/// * `key` - A byte slice representing the HMAC key +/// * `data` - A byte slice representing the data to be hashed +/// +/// # Returns +/// A 20-byte array containing the HMAC-SHA1 hash of the input data using the provided key +/// pub fn hmac_sha1(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> [u8; 20] { - use hmac::{Hmac, KeyInit, Mac}; - use sha1::Sha1; - let mut m = >::new_from_slice(key.as_ref()).unwrap(); m.update(data.as_ref()); m.finalize().into_bytes().into() } +/// HMAC-SHA256 hashing /// `hmac_sha256(key, data)` +/// +/// # Arguments +/// * `key` - A byte slice representing the HMAC key +/// * `data` - A byte slice representing the data to be hashed +/// +/// # Returns +/// A 32-byte array containing the HMAC-SHA256 hash of the input data using the provided key +/// pub fn hmac_sha256(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> [u8; 32] { - use hmac::{Hmac, KeyInit, Mac}; - use sha2::Sha256; - let mut m = Hmac::::new_from_slice(key.as_ref()).unwrap(); m.update(data.as_ref()); m.finalize().into_bytes().into() @@ -64,18 +114,25 @@ fn hex_bytes32(src: impl AsRef<[u8]>, f: impl FnOnce(&str) -> R) -> R { } fn sha256(data: &[u8]) -> impl AsRef<[u8; 32]> + use<> { - use sha2::{Digest, Sha256}; ::digest(data) } fn sha256_chunk(chunk: &[Bytes]) -> impl AsRef<[u8; 32]> + use<> { - use sha2::{Digest, Sha256}; let mut h = ::new(); chunk.iter().for_each(|data| h.update(data)); h.finalize() } -/// `f(hex(sha256(data)))` +/// hex of sha256 `f(hex(sha256(data)))` +/// +/// # Arguments +/// * `data` - A byte slice representing the data to be hashed +/// * `f` - A closure that takes a string slice and returns a value of type `R` +/// +/// # Returns +/// A value of type `R` returned by the closure `f` after processing the hexadecimal +/// representation of the SHA-256 hash of the input data +/// pub fn hex_sha256(data: &[u8], f: impl FnOnce(&str) -> R) -> R { hex_bytes32(sha256(data).as_ref(), f) } diff --git a/crates/utils/src/dirs.rs b/crates/utils/src/dirs.rs index edfcddfa..bba272e6 100644 --- a/crates/utils/src/dirs.rs +++ b/crates/utils/src/dirs.rs @@ -16,6 +16,7 @@ use rustfs_config::{DEFAULT_LOG_DIR, DEFAULT_LOG_FILENAME}; use std::env; use std::fs; use std::path::{Path, PathBuf}; +use tracing::debug; /// Get the absolute path to the current project /// @@ -29,11 +30,12 @@ use std::path::{Path, PathBuf}; /// # Returns /// - `Ok(PathBuf)`: The absolute path of the project that was successfully obtained. /// - `Err(String)`: Error message for the failed path. +/// pub fn get_project_root() -> Result { // Try to get the project root directory through the CARGO_MANIFEST_DIR environment variable if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") { let project_root = Path::new(&manifest_dir).to_path_buf(); - println!("Get the project root directory with CARGO_MANIFEST_DIR:{}", project_root.display()); + debug!("Get the project root directory with CARGO_MANIFEST_DIR:{}", project_root.display()); return Ok(project_root); } @@ -43,7 +45,7 @@ pub fn get_project_root() -> Result { // Assume that the project root directory is in the parent directory of the parent directory of the executable path (usually target/debug or target/release) project_root.pop(); // Remove the executable file name project_root.pop(); // Remove target/debug or target/release - println!("Deduce the project root directory through current_exe:{}", project_root.display()); + debug!("Deduce the project root directory through current_exe:{}", project_root.display()); return Ok(project_root); } @@ -51,7 +53,7 @@ pub fn get_project_root() -> Result { if let Ok(mut current_dir) = env::current_dir() { // Assume that the project root directory is in the parent directory of the current working directory current_dir.pop(); - println!("Deduce the project root directory through current_dir:{}", current_dir.display()); + debug!("Deduce the project root directory through current_dir:{}", current_dir.display()); return Ok(current_dir); } @@ -61,12 +63,38 @@ pub fn get_project_root() -> Result { /// Get the log directory as a string /// This function will try to find a writable log directory in the following order: +/// +/// 1. Environment variables are specified +/// 2. System temporary directory +/// 3. User home directory +/// 4. Current working directory +/// 5. Relative path +/// +/// # Arguments +/// * `key` - The environment variable key to check for log directory +/// +/// # Returns +/// * `String` - The log directory path as a string +/// pub fn get_log_directory_to_string(key: &str) -> String { get_log_directory(key).to_string_lossy().to_string() } /// Get the log directory /// This function will try to find a writable log directory in the following order: +/// +/// 1. Environment variables are specified +/// 2. System temporary directory +/// 3. User home directory +/// 4. Current working directory +/// 5. Relative path +/// +/// # Arguments +/// * `key` - The environment variable key to check for log directory +/// +/// # Returns +/// * `PathBuf` - The log directory path +/// pub fn get_log_directory(key: &str) -> PathBuf { // Environment variables are specified if let Ok(log_dir) = env::var(key) { diff --git a/crates/utils/src/dns_resolver.rs b/crates/utils/src/dns_resolver.rs deleted file mode 100644 index c9ee0e47..00000000 --- a/crates/utils/src/dns_resolver.rs +++ /dev/null @@ -1,476 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// #![allow(dead_code)] -// -// //! Layered DNS resolution utility for Kubernetes environments -// //! -// //! This module provides robust DNS resolution with multiple fallback layers: -// //! 1. Local cache (Moka) for previously resolved results -// //! 2. System DNS resolver (container/host adaptive) using hickory-resolver -// //! 3. Public DNS servers as final fallback (8.8.8.8, 1.1.1.1) using hickory-resolver with TLS -// //! -// //! The resolver is designed to handle 5-level or deeper domain names that may fail -// //! in Kubernetes environments due to CoreDNS configuration, DNS recursion limits, -// //! or network-related issues. Uses hickory-resolver for actual DNS queries with TLS support. -// -// use hickory_resolver::Resolver; -// use hickory_resolver::config::ResolverConfig; -// use hickory_resolver::name_server::TokioConnectionProvider; -// use moka::future::Cache; -// use std::net::IpAddr; -// use std::sync::OnceLock; -// use std::time::Duration; -// use tracing::{debug, error, info, instrument, warn}; -// -// /// Maximum FQDN length according to RFC standards -// const MAX_FQDN_LENGTH: usize = 253; -// /// Maximum DNS label length according to RFC standards -// const MAX_LABEL_LENGTH: usize = 63; -// /// Cache entry TTL in seconds -// const CACHE_TTL_SECONDS: u64 = 300; // 5 minutes -// /// Maximum cache size (number of entries) -// const MAX_CACHE_SIZE: u64 = 10000; -// -// /// DNS resolution error types with detailed context and tracing information -// #[derive(Debug, thiserror::Error)] -// pub enum DnsError { -// #[error("Invalid domain format: {reason}")] -// InvalidFormat { reason: String }, -// -// #[error("Local cache miss for domain: {domain}")] -// CacheMiss { domain: String }, -// -// #[error("System DNS resolution failed for domain: {domain} - {source}")] -// SystemDnsFailed { -// domain: String, -// #[source] -// source: Box, -// }, -// -// #[error("Public DNS resolution failed for domain: {domain} - {source}")] -// PublicDnsFailed { -// domain: String, -// #[source] -// source: Box, -// }, -// -// #[error( -// "All DNS resolution attempts failed for domain: {domain}. Please check your domain spelling, network connectivity, or DNS configuration" -// )] -// AllAttemptsFailed { domain: String }, -// -// #[error("DNS resolver initialization failed: {source}")] -// InitializationFailed { -// #[source] -// source: Box, -// }, -// -// #[error("DNS configuration error: {source}")] -// ConfigurationError { -// #[source] -// source: Box, -// }, -// } -// -// /// Layered DNS resolver with caching and multiple fallback strategies -// pub struct LayeredDnsResolver { -// /// Local cache for resolved domains using Moka for high performance -// cache: Cache>, -// /// System DNS resolver using hickory-resolver with default configuration -// system_resolver: Resolver, -// /// Public DNS resolver using hickory-resolver with Cloudflare DNS servers -// public_resolver: Resolver, -// } -// -// impl LayeredDnsResolver { -// /// Create a new layered DNS resolver with automatic DNS configuration detection -// #[instrument(skip_all)] -// pub async fn new() -> Result { -// info!("Initializing layered DNS resolver with hickory-resolver, Moka cache and public DNS fallback"); -// -// // Create Moka cache with TTL and size limits -// let cache = Cache::builder() -// .time_to_live(Duration::from_secs(CACHE_TTL_SECONDS)) -// .max_capacity(MAX_CACHE_SIZE) -// .build(); -// -// // Create system DNS resolver with default configuration (auto-detects container/host DNS) -// let system_resolver = -// Resolver::builder_with_config(ResolverConfig::default(), TokioConnectionProvider::default()).build(); -// -// let mut config = ResolverConfig::cloudflare_tls(); -// for ns in ResolverConfig::google_tls().name_servers() { -// config.add_name_server(ns.clone()) -// } -// // Create public DNS resolver using Cloudflare DNS with TLS support -// let public_resolver = Resolver::builder_with_config(config, TokioConnectionProvider::default()).build(); -// -// info!("DNS resolver initialized successfully with hickory-resolver system and Cloudflare TLS public fallback"); -// -// Ok(Self { -// cache, -// system_resolver, -// public_resolver, -// }) -// } -// -// /// Validate domain format according to RFC standards -// #[instrument(skip_all, fields(domain = %domain))] -// fn validate_domain_format(domain: &str) -> Result<(), DnsError> { -// info!("Validating domain format start"); -// // Check FQDN length -// if domain.len() > MAX_FQDN_LENGTH { -// return Err(DnsError::InvalidFormat { -// reason: format!("FQDN must not exceed {} bytes, got {} bytes", MAX_FQDN_LENGTH, domain.len()), -// }); -// } -// -// // Check each label length -// for label in domain.split('.') { -// if label.len() > MAX_LABEL_LENGTH { -// return Err(DnsError::InvalidFormat { -// reason: format!( -// "Each label must not exceed {} bytes, label '{}' has {} bytes", -// MAX_LABEL_LENGTH, -// label, -// label.len() -// ), -// }); -// } -// } -// -// // Check for empty labels (except trailing dot) -// let labels: Vec<&str> = domain.trim_end_matches('.').split('.').collect(); -// for label in &labels { -// if label.is_empty() { -// return Err(DnsError::InvalidFormat { -// reason: "Domain contains empty labels".to_string(), -// }); -// } -// } -// info!("DNS resolver validated successfully"); -// Ok(()) -// } -// -// /// Check local cache for resolved domain -// #[instrument(skip_all, fields(domain = %domain))] -// async fn check_cache(&self, domain: &str) -> Option> { -// match self.cache.get(domain).await { -// Some(ips) => { -// debug!("DNS cache hit for domain: {}, found {} IPs", domain, ips.len()); -// Some(ips) -// } -// None => { -// debug!("DNS cache miss for domain: {}", domain); -// None -// } -// } -// } -// -// /// Update local cache with resolved IPs -// #[instrument(skip_all, fields(domain = %domain, ip_count = ips.len()))] -// async fn update_cache(&self, domain: &str, ips: Vec) { -// self.cache.insert(domain.to_string(), ips.clone()).await; -// debug!("DNS cache updated for domain: {} with {} IPs", domain, ips.len()); -// } -// -// /// Get cache statistics for monitoring -// #[instrument(skip_all)] -// pub async fn cache_stats(&self) -> (u64, u64) { -// let entry_count = self.cache.entry_count(); -// let weighted_size = self.cache.weighted_size(); -// debug!("DNS cache stats - entries: {}, weighted_size: {}", entry_count, weighted_size); -// (entry_count, weighted_size) -// } -// -// /// Manually invalidate cache entries (useful for testing or forced refresh) -// #[instrument(skip_all)] -// pub async fn invalidate_cache(&self) { -// self.cache.invalidate_all(); -// info!("DNS cache invalidated"); -// } -// -// /// Resolve domain using system DNS (cluster/host DNS configuration) with hickory-resolver -// #[instrument(skip_all, fields(domain = %domain))] -// async fn resolve_with_system_dns(&self, domain: &str) -> Result, DnsError> { -// debug!("Attempting system DNS resolution for domain: {} using hickory-resolver", domain); -// -// match self.system_resolver.lookup_ip(domain).await { -// Ok(lookup) => { -// let ips: Vec = lookup.iter().collect(); -// if !ips.is_empty() { -// info!("System DNS resolution successful for domain: {} -> {} IPs", domain, ips.len()); -// Ok(ips) -// } else { -// warn!("System DNS returned empty result for domain: {}", domain); -// Err(DnsError::SystemDnsFailed { -// domain: domain.to_string(), -// source: "No IP addresses found".to_string().into(), -// }) -// } -// } -// Err(e) => { -// warn!("System DNS resolution failed for domain: {} - {}", domain, e); -// Err(DnsError::SystemDnsFailed { -// domain: domain.to_string(), -// source: Box::new(e), -// }) -// } -// } -// } -// -// /// Resolve domain using public DNS servers (Cloudflare TLS DNS) with hickory-resolver -// #[instrument(skip_all, fields(domain = %domain))] -// async fn resolve_with_public_dns(&self, domain: &str) -> Result, DnsError> { -// debug!( -// "Attempting public DNS resolution for domain: {} using hickory-resolver with TLS-enabled Cloudflare DNS", -// domain -// ); -// -// match self.public_resolver.lookup_ip(domain).await { -// Ok(lookup) => { -// let ips: Vec = lookup.iter().collect(); -// if !ips.is_empty() { -// info!("Public DNS resolution successful for domain: {} -> {} IPs", domain, ips.len()); -// Ok(ips) -// } else { -// warn!("Public DNS returned empty result for domain: {}", domain); -// Err(DnsError::PublicDnsFailed { -// domain: domain.to_string(), -// source: "No IP addresses found".to_string().into(), -// }) -// } -// } -// Err(e) => { -// error!("Public DNS resolution failed for domain: {} - {}", domain, e); -// Err(DnsError::PublicDnsFailed { -// domain: domain.to_string(), -// source: Box::new(e), -// }) -// } -// } -// } -// -// /// Resolve domain with layered fallback strategy using hickory-resolver -// /// -// /// Resolution order with detailed tracing: -// /// 1. Local cache (Moka with TTL) -// /// 2. System DNS (hickory-resolver with host/container adaptive configuration) -// /// 3. Public DNS (hickory-resolver with TLS-enabled Cloudflare DNS fallback) -// #[instrument(skip_all, fields(domain = %domain))] -// pub async fn resolve(&self, domain: &str) -> Result, DnsError> { -// info!("Starting DNS resolution process for domain: {} start", domain); -// // Validate domain format first -// Self::validate_domain_format(domain)?; -// -// info!("Starting DNS resolution for domain: {}", domain); -// -// // Step 1: Check local cache -// if let Some(ips) = self.check_cache(domain).await { -// info!("DNS resolution completed from cache for domain: {} -> {} IPs", domain, ips.len()); -// return Ok(ips); -// } -// -// debug!("Local cache miss for domain: {}, attempting system DNS", domain); -// -// // Step 2: Try system DNS (cluster/host adaptive) -// match self.resolve_with_system_dns(domain).await { -// Ok(ips) => { -// self.update_cache(domain, ips.clone()).await; -// info!("DNS resolution completed via system DNS for domain: {} -> {} IPs", domain, ips.len()); -// return Ok(ips); -// } -// Err(system_err) => { -// warn!("System DNS failed for domain: {} - {}", domain, system_err); -// } -// } -// -// // Step 3: Fallback to public DNS -// info!("Falling back to public DNS for domain: {}", domain); -// match self.resolve_with_public_dns(domain).await { -// Ok(ips) => { -// self.update_cache(domain, ips.clone()).await; -// info!("DNS resolution completed via public DNS for domain: {} -> {} IPs", domain, ips.len()); -// Ok(ips) -// } -// Err(public_err) => { -// error!( -// "All DNS resolution attempts failed for domain:` {}`. System DNS: failed, Public DNS: {}", -// domain, public_err -// ); -// Err(DnsError::AllAttemptsFailed { -// domain: domain.to_string(), -// }) -// } -// } -// } -// } -// -// /// Global DNS resolver instance -// static GLOBAL_DNS_RESOLVER: OnceLock = OnceLock::new(); -// -// /// Initialize the global DNS resolver -// #[instrument] -// pub async fn init_global_dns_resolver() -> Result<(), DnsError> { -// info!("Initializing global DNS resolver"); -// let resolver = LayeredDnsResolver::new().await?; -// -// match GLOBAL_DNS_RESOLVER.set(resolver) { -// Ok(()) => { -// info!("Global DNS resolver initialized successfully"); -// Ok(()) -// } -// Err(_) => { -// warn!("Global DNS resolver was already initialized"); -// Ok(()) -// } -// } -// } -// -// /// Get the global DNS resolver instance -// pub fn get_global_dns_resolver() -> Option<&'static LayeredDnsResolver> { -// GLOBAL_DNS_RESOLVER.get() -// } -// -// /// Resolve domain using the global DNS resolver with comprehensive tracing -// #[instrument(skip_all, fields(domain = %domain))] -// pub async fn resolve_domain(domain: &str) -> Result, DnsError> { -// info!("resolving domain for: {}", domain); -// match get_global_dns_resolver() { -// Some(resolver) => resolver.resolve(domain).await, -// None => Err(DnsError::InitializationFailed { -// source: "Global DNS resolver not initialized. Call init_global_dns_resolver() first." -// .to_string() -// .into(), -// }), -// } -// } -// -// #[cfg(test)] -// mod tests { -// use super::*; -// -// #[test] -// fn test_domain_validation() { -// // Valid domains -// assert!(LayeredDnsResolver::validate_domain_format("example.com").is_ok()); -// assert!(LayeredDnsResolver::validate_domain_format("sub.example.com").is_ok()); -// assert!(LayeredDnsResolver::validate_domain_format("very.deep.sub.domain.example.com").is_ok()); -// -// // Invalid domains - too long FQDN -// let long_domain = "a".repeat(254); -// assert!(LayeredDnsResolver::validate_domain_format(&long_domain).is_err()); -// -// // Invalid domains - label too long -// let long_label = format!("{}.com", "a".repeat(64)); -// assert!(LayeredDnsResolver::validate_domain_format(&long_label).is_err()); -// -// // Invalid domains - empty label -// assert!(LayeredDnsResolver::validate_domain_format("example..com").is_err()); -// } -// -// #[tokio::test] -// async fn test_cache_functionality() { -// let resolver = LayeredDnsResolver::new().await.unwrap(); -// -// // Test cache miss -// assert!(resolver.check_cache("example.com").await.is_none()); -// -// // Update cache -// let test_ips = vec![IpAddr::from([192, 0, 2, 1])]; -// resolver.update_cache("example.com", test_ips.clone()).await; -// -// // Test cache hit -// assert_eq!(resolver.check_cache("example.com").await, Some(test_ips)); -// -// // Test cache stats (note: moka cache might not immediately reflect changes) -// let (total, _weighted_size) = resolver.cache_stats().await; -// // Cache should have at least the entry we just added (might be 0 due to async nature) -// assert!(total <= 1, "Cache should have at most 1 entry, got {total}"); -// } -// -// #[tokio::test] -// async fn test_dns_resolution() { -// let resolver = LayeredDnsResolver::new().await.unwrap(); -// -// // Test resolution of a known domain (localhost should always resolve) -// match resolver.resolve("localhost").await { -// Ok(ips) => { -// assert!(!ips.is_empty()); -// println!("Resolved localhost to: {ips:?}"); -// } -// Err(e) => { -// // In some test environments, even localhost might fail -// // This is acceptable as long as our error handling works -// println!("DNS resolution failed (might be expected in test environments): {e}"); -// } -// } -// } -// -// #[tokio::test] -// async fn test_invalid_domain_resolution() { -// let resolver = LayeredDnsResolver::new().await.unwrap(); -// -// // Test resolution of invalid domain -// let result = resolver -// .resolve("nonexistent.invalid.domain.example.thisdefinitelydoesnotexist") -// .await; -// assert!(result.is_err()); -// -// if let Err(e) = result { -// println!("Expected error for invalid domain: {e}"); -// // Should be AllAttemptsFailed since both system and public DNS should fail -// assert!(matches!(e, DnsError::AllAttemptsFailed { .. })); -// } -// } -// -// #[tokio::test] -// async fn test_cache_invalidation() { -// let resolver = LayeredDnsResolver::new().await.unwrap(); -// -// // Add entry to cache -// let test_ips = vec![IpAddr::from([192, 0, 2, 1])]; -// resolver.update_cache("test.example.com", test_ips.clone()).await; -// -// // Verify cache hit -// assert_eq!(resolver.check_cache("test.example.com").await, Some(test_ips)); -// -// // Invalidate cache -// resolver.invalidate_cache().await; -// -// // Verify cache miss after invalidation -// assert!(resolver.check_cache("test.example.com").await.is_none()); -// } -// -// #[tokio::test] -// async fn test_global_resolver_initialization() { -// // Test initialization -// assert!(init_global_dns_resolver().await.is_ok()); -// -// // Test that resolver is available -// assert!(get_global_dns_resolver().is_some()); -// -// // Test domain resolution through global resolver -// match resolve_domain("localhost").await { -// Ok(ips) => { -// assert!(!ips.is_empty()); -// println!("Global resolver resolved localhost to: {ips:?}"); -// } -// Err(e) => { -// println!("Global resolver DNS resolution failed (might be expected in test environments): {e}"); -// } -// } -// } -// } diff --git a/crates/utils/src/envs.rs b/crates/utils/src/envs.rs index bc05bf42..b42cf22e 100644 --- a/crates/utils/src/envs.rs +++ b/crates/utils/src/envs.rs @@ -366,8 +366,8 @@ pub fn get_env_bool(key: &str, default: bool) -> bool { env::var(key) .ok() .and_then(|v| match v.to_lowercase().as_str() { - "1" | "true" | "yes" => Some(true), - "0" | "false" | "no" => Some(false), + "1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => Some(true), + "0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => Some(false), _ => None, }) .unwrap_or(default) @@ -383,8 +383,8 @@ pub fn get_env_bool(key: &str, default: bool) -> bool { /// pub fn get_env_opt_bool(key: &str) -> Option { env::var(key).ok().and_then(|v| match v.to_lowercase().as_str() { - "1" | "true" | "yes" => Some(true), - "0" | "false" | "no" => Some(false), + "1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => Some(true), + "0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => Some(false), _ => None, }) } diff --git a/crates/utils/src/hash.rs b/crates/utils/src/hash.rs index 975cdc46..aa0b1e96 100644 --- a/crates/utils/src/hash.rs +++ b/crates/utils/src/hash.rs @@ -72,6 +72,13 @@ fn u8x32_from_u64x4(input: [u64; 4]) -> [u8; 32] { impl HashAlgorithm { /// Hash the input data and return the hash result as Vec. + /// + /// # Arguments + /// * `data` - A byte slice representing the data to be hashed + /// + /// # Returns + /// A byte slice containing the hash of the input data + /// pub fn hash_encode(&self, data: &[u8]) -> impl AsRef<[u8]> { match self { HashAlgorithm::Md5 => HashEncoded::Md5(Md5::digest(data).into()), @@ -92,6 +99,10 @@ impl HashAlgorithm { } /// Return the output size in bytes for the hash algorithm. + /// + /// # Returns + /// The size in bytes of the hash output + /// pub fn size(&self) -> usize { match self { HashAlgorithm::SHA256 => 32, @@ -111,6 +122,16 @@ pub const EMPTY_STRING_SHA256_HASH: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae pub const DEFAULT_SIP_HASH_KEY: [u8; 16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; +/// SipHash function to hash a string key into a bucket index. +/// +/// # Arguments +/// * `key` - The input string to be hashed +/// * `cardinality` - The number of buckets +/// * `id` - A 16-byte array used as the SipHash key +/// +/// # Returns +/// A usize representing the bucket index +/// pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize { // Your key, must be 16 bytes @@ -120,6 +141,15 @@ pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize { (result as usize) % cardinality } +/// CRC32 hash function to hash a string key into a bucket index. +/// +/// # Arguments +/// * `key` - The input string to be hashed +/// * `cardinality` - The number of buckets +/// +/// # Returns +/// A usize representing the bucket index +/// pub fn crc_hash(key: &str, cardinality: usize) -> usize { let mut hasher = Hasher::new(); // Create a new hasher diff --git a/crates/utils/src/http/ip.rs b/crates/utils/src/http/ip.rs index f3d4f4f4..6d42d142 100644 --- a/crates/utils/src/http/ip.rs +++ b/crates/utils/src/http/ip.rs @@ -34,6 +34,10 @@ static FOR_REGEX: LazyLock = LazyLock::new(|| Regex::new(r"(?i)(?:for=)([ static PROTO_REGEX: LazyLock = LazyLock::new(|| Regex::new(r"(?i)^(;|,| )+(?:proto=)(https|http)").unwrap()); /// Used to disable all processing of the X-Forwarded-For header in source IP discovery. +/// +/// # Returns +/// A `bool` indicating whether the X-Forwarded-For header is enabled +/// fn is_xff_header_enabled() -> bool { env::var("_RUSTFS_API_XFF_HEADER") .unwrap_or_else(|_| "on".to_string()) @@ -43,6 +47,13 @@ fn is_xff_header_enabled() -> bool { /// GetSourceScheme retrieves the scheme from the X-Forwarded-Proto and RFC7239 /// Forwarded headers (in that order). +/// +/// # Arguments +/// * `headers` - HTTP headers from the request +/// +/// # Returns +/// An `Option` containing the source scheme if found +/// pub fn get_source_scheme(headers: &HeaderMap) -> Option { // Retrieve the scheme from X-Forwarded-Proto. if let Some(proto) = headers.get(X_FORWARDED_PROTO) { @@ -84,6 +95,13 @@ pub fn get_source_scheme(headers: &HeaderMap) -> Option { /// GetSourceIPFromHeaders retrieves the IP from the X-Forwarded-For, X-Real-IP /// and RFC7239 Forwarded headers (in that order) +/// +/// # Arguments +/// * `headers` - HTTP headers from the request +/// +/// # Returns +/// An `Option` containing the source IP address if found +/// pub fn get_source_ip_from_headers(headers: &HeaderMap) -> Option { let mut addr = None; @@ -132,6 +150,14 @@ pub fn get_source_ip_from_headers(headers: &HeaderMap) -> Option { /// GetSourceIPRaw retrieves the IP from the request headers /// and falls back to remote_addr when necessary. /// however returns without bracketing. +/// +/// # Arguments +/// * `headers` - HTTP headers from the request +/// * `remote_addr` - Remote address as a string +/// +/// # Returns +/// A `String` containing the source IP address +/// pub fn get_source_ip_raw(headers: &HeaderMap, remote_addr: &str) -> String { let addr = get_source_ip_from_headers(headers).unwrap_or_else(|| remote_addr.to_string()); @@ -145,6 +171,15 @@ pub fn get_source_ip_raw(headers: &HeaderMap, remote_addr: &str) -> String { /// GetSourceIP retrieves the IP from the request headers /// and falls back to remote_addr when necessary. +/// It brackets IPv6 addresses. +/// +/// # Arguments +/// * `headers` - HTTP headers from the request +/// * `remote_addr` - Remote address as a string +/// +/// # Returns +/// A `String` containing the source IP address, with IPv6 addresses bracketed +/// pub fn get_source_ip(headers: &HeaderMap, remote_addr: &str) -> String { let addr = get_source_ip_raw(headers, remote_addr); if addr.contains(':') { format!("[{addr}]") } else { addr } diff --git a/crates/utils/src/io.rs b/crates/utils/src/io.rs index 431bc383..42dc5ac2 100644 --- a/crates/utils/src/io.rs +++ b/crates/utils/src/io.rs @@ -83,7 +83,7 @@ pub fn put_uvarint_len(x: u64) -> usize { i + 1 } -/// Decodes a u64 from buf and returns (value, number of bytes read). +/// Decodes an u64 from buf and returns (value, number of bytes read). /// If buf is too small, returns (0, 0). /// If overflow, returns (0, -(n as isize)), where n is the number of bytes read. pub fn uvarint(buf: &[u8]) -> (u64, isize) { diff --git a/crates/utils/src/ip.rs b/crates/utils/src/ip.rs index ed7fea3e..e349e51c 100644 --- a/crates/utils/src/ip.rs +++ b/crates/utils/src/ip.rs @@ -20,9 +20,9 @@ use std::net::{IpAddr, Ipv4Addr}; /// If both fail to retrieve, None is returned. /// /// # Returns -/// /// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6) /// * `None` - Unable to obtain any native IP address +/// pub fn get_local_ip() -> Option { local_ip_address::local_ip() .ok() @@ -34,8 +34,8 @@ pub fn get_local_ip() -> Option { /// If the IP address cannot be obtained, returns "127.0.0.1" as the default value. /// /// # Returns -/// /// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value +/// pub fn get_local_ip_with_default() -> String { get_local_ip() .unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index ecf92c44..c08dc80a 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -14,8 +14,6 @@ #[cfg(feature = "tls")] pub mod certs; -#[cfg(feature = "net")] -pub mod dns_resolver; #[cfg(feature = "ip")] pub mod ip; #[cfg(feature = "net")] diff --git a/crates/utils/src/notify/net.rs b/crates/utils/src/notify/net.rs index 807d9442..d1312cb7 100644 --- a/crates/utils/src/notify/net.rs +++ b/crates/utils/src/notify/net.rs @@ -40,7 +40,8 @@ pub enum NetError { SchemeWithEmptyHost, } -// Host represents a network host with IP/name and port. +/// Host represents a network host with IP/name and port. +/// Similar to Go's net.Host structure. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Host { pub name: String, @@ -130,22 +131,38 @@ fn trim_ipv6(host: &str) -> Result { } } -// URL is a wrapper around url::Url for custom handling. +/// URL is a wrapper around url::Url for custom handling. +/// Provides methods similar to Go's URL struct. #[derive(Debug, Clone)] pub struct ParsedURL(pub Url); impl ParsedURL { /// is_empty returns true if the URL is empty or "about:blank". + /// + /// # Arguments + /// * `&self` - Reference to the ParsedURL instance. + /// + /// # Returns + /// * `bool` - True if the URL is empty or "about:blank", false otherwise. + /// pub fn is_empty(&self) -> bool { self.0.as_str() == "" || (self.0.scheme() == "about" && self.0.path() == "blank") } /// hostname returns the hostname of the URL. + /// + /// # Returns + /// * `String` - The hostname of the URL, or an empty string if not set. + /// pub fn hostname(&self) -> String { self.0.host_str().unwrap_or("").to_string() } /// port returns the port of the URL as a string, defaulting to "80" for http and "443" for https if not set. + /// + /// # Returns + /// * `String` - The port of the URL as a string. + /// pub fn port(&self) -> String { match self.0.port() { Some(p) => p.to_string(), @@ -158,11 +175,19 @@ impl ParsedURL { } /// scheme returns the scheme of the URL. + /// + /// # Returns + /// * `&str` - The scheme of the URL. + /// pub fn scheme(&self) -> &str { self.0.scheme() } /// url returns a reference to the underlying Url. + /// + /// # Returns + /// * `&Url` - Reference to the underlying Url. + /// pub fn url(&self) -> &Url { &self.0 } @@ -213,7 +238,18 @@ impl<'de> serde::Deserialize<'de> for ParsedURL { } } -// parse_url parses a string into a ParsedURL, with host validation and path cleaning. +/// parse_url parses a string into a ParsedURL, with host validation and path cleaning. +/// +/// # Arguments +/// * `s` - The URL string to parse. +/// +/// # Returns +/// * `Ok(ParsedURL)` - If parsing is successful. +/// * `Err(NetError)` - If parsing fails or host is invalid. +/// +/// # Errors +/// Returns NetError if parsing fails or host is invalid. +/// pub fn parse_url(s: &str) -> Result { if let Some(scheme_end) = s.find("://") { if s[scheme_end + 3..].starts_with('/') { @@ -273,6 +309,14 @@ pub fn parse_url(s: &str) -> Result { #[allow(dead_code)] /// parse_http_url parses a string into a ParsedURL, ensuring the scheme is http or https. +/// +/// # Arguments +/// * `s` - The URL string to parse. +/// +/// # Returns +/// * `Ok(ParsedURL)` - If parsing is successful and scheme is http/https. +/// * `Err(NetError)` - If parsing fails or scheme is not http/https. +/// pub fn parse_http_url(s: &str) -> Result { let u = parse_url(s)?; match u.0.scheme() { @@ -283,6 +327,14 @@ pub fn parse_http_url(s: &str) -> Result { #[allow(dead_code)] /// is_network_or_host_down checks if an error indicates network or host down, considering timeouts. +/// +/// # Arguments +/// * `err` - The std::io::Error to check. +/// * `expect_timeouts` - Whether timeouts are expected. +/// +/// # Returns +/// * `bool` - True if the error indicates network or host down, false otherwise. +/// pub fn is_network_or_host_down(err: &std::io::Error, expect_timeouts: bool) -> bool { if err.kind() == std::io::ErrorKind::TimedOut { return !expect_timeouts; @@ -297,12 +349,26 @@ pub fn is_network_or_host_down(err: &std::io::Error, expect_timeouts: bool) -> b #[allow(dead_code)] /// is_conn_reset_err checks if an error indicates a connection reset by peer. +/// +/// # Arguments +/// * `err` - The std::io::Error to check. +/// +/// # Returns +/// * `bool` - True if the error indicates connection reset, false otherwise. +/// pub fn is_conn_reset_err(err: &std::io::Error) -> bool { err.to_string().contains("connection reset by peer") || matches!(err.raw_os_error(), Some(libc::ECONNRESET)) } #[allow(dead_code)] /// is_conn_refused_err checks if an error indicates a connection refused. +/// +/// # Arguments +/// * `err` - The std::io::Error to check. +/// +/// # Returns +/// * `bool` - True if the error indicates connection refused, false otherwise. +/// pub fn is_conn_refused_err(err: &std::io::Error) -> bool { err.to_string().contains("connection refused") || matches!(err.raw_os_error(), Some(libc::ECONNREFUSED)) } diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index 04c5f2ad..cdea044f 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -42,7 +42,6 @@ pub struct RetryTimer { impl RetryTimer { pub fn new(max_retry: i64, base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64) -> Self { - //println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); Self { base_sleep, max_sleep, @@ -70,9 +69,6 @@ impl Stream for RetryTimer { sleep = self.max_sleep; } if (jitter - NO_JITTER).abs() > 1e-9 { - //println!("\njitter: {:?}", jitter); - //println!("sleep: {sleep:?}"); - //println!("0000: {:?}", self.random as f64 * jitter / 100_f64); let sleep_ms = sleep.as_millis(); let reduction = ((sleep_ms as f64) * (self.random as f64 * jitter / 100_f64)).round() as u128; let jittered_ms = sleep_ms.saturating_sub(reduction); @@ -85,29 +81,21 @@ impl Stream for RetryTimer { let mut timer = interval(sleep); timer.set_missed_tick_behavior(MissedTickBehavior::Delay); self.timer = Some(timer); - //println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); } let mut timer = self.timer.as_mut().unwrap(); match Pin::new(&mut timer).poll_tick(cx) { Poll::Ready(_) => { - //println!("ready"); - //println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); self.rem -= 1; if self.rem > 0 { let mut new_timer = interval(sleep); new_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); new_timer.reset(); self.timer = Some(new_timer); - //println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); } Poll::Ready(Some(())) } - Poll::Pending => { - //println!("pending"); - //println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); - Poll::Pending - } + Poll::Pending => Poll::Pending, } } } @@ -176,7 +164,7 @@ pub fn is_request_error_retryable(_err: std::io::Error) -> bool { #[allow(unused_imports)] mod tests { use super::*; - use futures::{Future, StreamExt}; + use futures::StreamExt; use rand::Rng; use std::time::UNIX_EPOCH; diff --git a/crates/utils/src/string.rs b/crates/utils/src/string.rs index fc27f9d5..42a8e0a6 100644 --- a/crates/utils/src/string.rs +++ b/crates/utils/src/string.rs @@ -17,6 +17,29 @@ use regex::Regex; use std::io::{Error, Result}; use std::sync::LazyLock; +/// Parses a boolean value from a string. +/// +/// # Arguments +/// `str` - A string slice representing the boolean value. +/// +/// # Returns +/// A `Result` containing the parsed boolean value or an error if parsing fails. +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::parse_bool; +/// +/// let true_values = ["1", "t", "T", "true", "TRUE", "True", "on", "ON", "On", "enabled"]; +/// let false_values = ["0", "f", "F", "false", "FALSE", "False", "off", "OFF", "Off", "disabled"]; +/// +/// for val in true_values.iter() { +/// assert_eq!(parse_bool(val).unwrap(), true); +/// } +/// for val in false_values.iter() { +/// assert_eq!(parse_bool(val).unwrap(), false); +/// } +/// ``` +/// pub fn parse_bool(str: &str) -> Result { match str { "1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => Ok(true), @@ -25,6 +48,23 @@ pub fn parse_bool(str: &str) -> Result { } } +/// Matches a simple pattern against a name using wildcards. +/// +/// # Arguments +/// * `pattern` - The pattern to match, which may include wildcards '*' and '?' +/// * `name` - The name to match against the pattern +/// +/// # Returns +/// * `true` if the name matches the pattern, `false` otherwise +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::match_simple; +/// assert!(match_simple("file*", "file123")); +/// assert!(match_simple("file?", "file1")); +/// assert!(!match_simple("file?", "file12")); +/// ``` +/// pub fn match_simple(pattern: &str, name: &str) -> bool { if pattern.is_empty() { return name == pattern; @@ -36,6 +76,24 @@ pub fn match_simple(pattern: &str, name: &str) -> bool { deep_match_rune(name.as_bytes(), pattern.as_bytes(), true) } +/// Matches a pattern against a name using wildcards. +/// +/// # Arguments +/// * `pattern` - The pattern to match, which may include wildcards '*' and '?' +/// * `name` - The name to match against the pattern +/// +/// # Returns +/// * `true` if the name matches the pattern, `false` otherwise +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::match_pattern; +/// +/// assert!(match_pattern("file*", "file123")); +/// assert!(match_pattern("file?", "file1")); +/// assert!(!match_pattern("file?", "file12")); +/// ``` +/// pub fn match_pattern(pattern: &str, name: &str) -> bool { if pattern.is_empty() { return name == pattern; @@ -47,6 +105,25 @@ pub fn match_pattern(pattern: &str, name: &str) -> bool { deep_match_rune(name.as_bytes(), pattern.as_bytes(), false) } +/// Checks if any pattern in the list matches the given string. +/// +/// # Arguments +/// * `patterns` - A slice of patterns to match against +/// * `match_str` - The string to match against the patterns +/// +/// # Returns +/// * `true` if any pattern matches the string, `false` otherwise +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::has_pattern; +/// +/// let patterns = vec!["file*", "data?", "image*"]; +/// assert!(has_pattern(&patterns, "file123")); +/// assert!(has_pattern(&patterns, "data1")); +/// assert!(!has_pattern(&patterns, "video1")); +/// ``` +/// pub fn has_pattern(patterns: &[&str], match_str: &str) -> bool { for pattern in patterns { if match_simple(pattern, match_str) { @@ -56,6 +133,23 @@ pub fn has_pattern(patterns: &[&str], match_str: &str) -> bool { false } +/// Checks if the given string has any suffix from the provided list, ignoring case. +/// +/// # Arguments +/// * `str` - The string to check +/// * `list` - A slice of suffixes to check against +/// +/// # Returns +/// * `true` if the string ends with any of the suffixes in the list, `false` otherwise +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::has_string_suffix_in_slice; +/// +/// let suffixes = vec![".txt", ".md", ".rs"]; +/// assert!(has_string_suffix_in_slice("document.TXT", &suffixes)); +/// assert!(!has_string_suffix_in_slice("image.png", &suffixes)); +/// ``` pub fn has_string_suffix_in_slice(str: &str, list: &[&str]) -> bool { let str = str.to_lowercase(); for v in list { @@ -99,6 +193,24 @@ fn deep_match_rune(str_: &[u8], pattern: &[u8], simple: bool) -> bool { str_.is_empty() && pattern.is_empty() } +/// Matches a pattern as a prefix against the given text. +/// +/// # Arguments +/// * `pattern` - The pattern to match, which may include wildcards '*' and '?' +/// * `text` - The text to match against the pattern +/// +/// # Returns +/// * `true` if the text matches the pattern as a prefix, `false` otherwise +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::match_as_pattern_prefix; +/// +/// assert!(match_as_pattern_prefix("file*", "file123")); +/// assert!(match_as_pattern_prefix("file?", "file1")); +/// assert!(!match_as_pattern_prefix("file?", "file12")); +/// ``` +/// pub fn match_as_pattern_prefix(pattern: &str, text: &str) -> bool { let mut i = 0; while i < text.len() && i < pattern.len() { @@ -215,6 +327,24 @@ impl ArgPattern { } /// finds all ellipses patterns, recursively and parses the ranges numerically. +/// +/// # Arguments +/// * `arg` - The argument string to search for ellipses patterns +/// +/// # Returns +/// * `Result` - A result containing the parsed ArgPattern or an error if parsing fails +/// +/// # Errors +/// This function will return an error if the ellipses pattern format is invalid. +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::find_ellipses_patterns; +/// +/// let pattern = "http://rustfs{2...3}/export/set{1...64}"; +/// let arg_pattern = find_ellipses_patterns(pattern).unwrap(); +/// assert_eq!(arg_pattern.total_sizes(), 128); +/// ``` pub fn find_ellipses_patterns(arg: &str) -> Result { let mut parts = match ELLIPSES_RE.captures(arg) { Some(caps) => caps, @@ -268,6 +398,21 @@ pub fn find_ellipses_patterns(arg: &str) -> Result { } /// returns true if input arg has ellipses type pattern. +/// +/// # Arguments +/// * `s` - A slice of strings to check for ellipses patterns +/// +/// # Returns +/// * `true` if any string contains ellipses patterns, `false` otherwise +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::has_ellipses; +/// +/// let args = vec!["http://rustfs{2...3}/export/set{1...64}", "mydisk-{a...z}{1...20}"]; +/// assert!(has_ellipses(&args)); +/// ``` +/// pub fn has_ellipses>(s: &[T]) -> bool { let pattern = [ELLIPSES, OPEN_BRACES, CLOSE_BRACES]; @@ -279,6 +424,24 @@ pub fn has_ellipses>(s: &[T]) -> bool { /// example: /// {1...64} /// {33...64} +/// +/// # Arguments +/// * `pattern` - A string slice representing the ellipses range pattern +/// +/// # Returns +/// * `Result>` - A result containing a vector of strings representing the expanded range or an error if parsing fails +/// +/// # Errors +/// This function will return an error if the ellipses range format is invalid. +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::parse_ellipses_range; +/// +/// let range = parse_ellipses_range("{1...5}").unwrap(); +/// assert_eq!(range, vec!["1", "2", "3", "4", "5"]); +/// ``` +/// pub fn parse_ellipses_range(pattern: &str) -> Result> { if !pattern.contains(OPEN_BRACES) { return Err(Error::other("Invalid argument")); @@ -317,6 +480,25 @@ pub fn parse_ellipses_range(pattern: &str) -> Result> { Ok(ret) } +/// Generates a random access key of the specified length. +/// +/// # Arguments +/// * `length` - The length of the access key to generate +/// +/// # Returns +/// * `Result` - A result containing the generated access key or an error if the length is too short +/// +/// # Errors +/// This function will return an error if the specified length is less than 3. +/// +/// Examples +/// ```no_run +/// use rustfs_utils::string::gen_access_key; +/// +/// let access_key = gen_access_key(16).unwrap(); +/// println!("Generated access key: {}", access_key); +/// ``` +/// pub fn gen_access_key(length: usize) -> Result { const ALPHA_NUMERIC_TABLE: [char; 36] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', @@ -337,6 +519,25 @@ pub fn gen_access_key(length: usize) -> Result { Ok(result) } +/// Generates a random secret key of the specified length. +/// +/// # Arguments +/// * `length` - The length of the secret key to generate +/// +/// # Returns +/// * `Result` - A result containing the generated secret key or an error if the length is too short +/// +/// # Errors +/// This function will return an error if the specified length is less than 8. +/// +/// # Examples +/// ```no_run +/// use rustfs_utils::string::gen_secret_key; +/// +/// let secret_key = gen_secret_key(32).unwrap(); +/// println!("Generated secret key: {}", secret_key); +/// ``` +/// pub fn gen_secret_key(length: usize) -> Result { use base64_simd::URL_SAFE_NO_PAD; @@ -355,6 +556,22 @@ pub fn gen_secret_key(length: usize) -> Result { } /// Tests whether the string s begins with prefix ignoring case +/// +/// # Arguments +/// * `s` - The string to test +/// * `prefix` - The prefix to look for +/// +/// # Returns +/// * `true` if s starts with prefix (case-insensitive), `false` otherwise +/// +/// # Examples +/// ```no_run +/// use rustfs_utils::string::strings_has_prefix_fold; +/// +/// assert!(strings_has_prefix_fold("HelloWorld", "hello")); +/// assert!(!strings_has_prefix_fold("HelloWorld", "world")); +/// ``` +/// pub fn strings_has_prefix_fold(s: &str, prefix: &str) -> bool { if s.len() < prefix.len() { return false; diff --git a/crates/utils/src/sys/user_agent.rs b/crates/utils/src/sys/user_agent.rs index b8fdda55..20eee81a 100644 --- a/crates/utils/src/sys/user_agent.rs +++ b/crates/utils/src/sys/user_agent.rs @@ -39,7 +39,17 @@ impl ServiceType { } } -// UserAgent structure +/// UserAgent structure to hold User-Agent information +/// including OS platform, architecture, version, and service type. +/// It provides methods to generate a formatted User-Agent string. +/// # Examples +/// ``` +/// use rustfs_utils::{get_user_agent, ServiceType}; +/// +/// let ua = get_user_agent(ServiceType::Core); +/// println!("User-Agent: {}", ua); +/// ``` +#[derive(Debug)] struct UserAgent { os_platform: String, arch: String, @@ -146,7 +156,14 @@ impl fmt::Display for UserAgent { } } -// Get the User-Agent string and accept business type parameters +/// Get the User-Agent string and accept business type parameters +/// +/// # Arguments +/// * `service` - The type of service for which the User-Agent is being created. +/// +/// # Returns +/// A formatted User-Agent string. +/// pub fn get_user_agent(service: ServiceType) -> String { UserAgent::new(service).to_string() } diff --git a/rustfs/src/admin/handlers/kms_dynamic.rs b/rustfs/src/admin/handlers/kms_dynamic.rs index 5285e7d8..150fc3ea 100644 --- a/rustfs/src/admin/handlers/kms_dynamic.rs +++ b/rustfs/src/admin/handlers/kms_dynamic.rs @@ -19,14 +19,65 @@ use crate::admin::auth::validate_admin_request; use crate::auth::{check_key_valid, get_session_token}; use hyper::StatusCode; use matchit::Params; +use rustfs_ecstore::config::com::{read_config, save_config}; +use rustfs_ecstore::new_object_layer_fn; use rustfs_kms::{ - ConfigureKmsRequest, ConfigureKmsResponse, KmsConfigSummary, KmsServiceStatus, KmsStatusResponse, StartKmsRequest, + ConfigureKmsRequest, ConfigureKmsResponse, KmsConfig, KmsConfigSummary, KmsServiceStatus, KmsStatusResponse, StartKmsRequest, StartKmsResponse, StopKmsResponse, get_global_kms_service_manager, }; use rustfs_policy::policy::action::{Action, AdminAction}; use s3s::{Body, S3Request, S3Response, S3Result, s3_error}; use tracing::{error, info, warn}; +/// Path to store KMS configuration in the cluster metadata +const KMS_CONFIG_PATH: &str = "config/kms_config.json"; + +/// Save KMS configuration to cluster storage +async fn save_kms_config(config: &KmsConfig) -> Result<(), String> { + let Some(store) = new_object_layer_fn() else { + return Err("Storage layer not initialized".to_string()); + }; + + let data = serde_json::to_vec(config).map_err(|e| format!("Failed to serialize KMS config: {e}"))?; + + save_config(store, KMS_CONFIG_PATH, data) + .await + .map_err(|e| format!("Failed to save KMS config to storage: {e}"))?; + + info!("KMS configuration persisted to cluster storage at {}", KMS_CONFIG_PATH); + Ok(()) +} + +/// Load KMS configuration from cluster storage +pub async fn load_kms_config() -> Option { + let Some(store) = new_object_layer_fn() else { + warn!("Storage layer not initialized, cannot load KMS config"); + return None; + }; + + match read_config(store, KMS_CONFIG_PATH).await { + Ok(data) => match serde_json::from_slice::(&data) { + Ok(config) => { + info!("Loaded KMS configuration from cluster storage"); + Some(config) + } + Err(e) => { + error!("Failed to deserialize KMS config: {}", e); + None + } + }, + Err(e) => { + // Config not found is normal on first run + if e.to_string().contains("ConfigNotFound") || e.to_string().contains("not found") { + info!("No persisted KMS configuration found (first run or not configured yet)"); + } else { + warn!("Failed to load KMS config from storage: {}", e); + } + None + } + } +} + /// Configure KMS service handler pub struct ConfigureKmsHandler; @@ -82,11 +133,19 @@ impl Operation for ConfigureKmsHandler { let kms_config = configure_request.to_kms_config(); // Configure the service - let (success, message, status) = match service_manager.configure(kms_config).await { + let (success, message, status) = match service_manager.configure(kms_config.clone()).await { Ok(()) => { - let status = service_manager.get_status().await; - info!("KMS configured successfully with status: {:?}", status); - (true, "KMS configured successfully".to_string(), status) + // Persist the configuration to cluster storage + if let Err(e) = save_kms_config(&kms_config).await { + let error_msg = format!("KMS configured in memory but failed to persist: {e}"); + error!("{}", error_msg); + let status = service_manager.get_status().await; + (false, error_msg, status) + } else { + let status = service_manager.get_status().await; + info!("KMS configured successfully and persisted with status: {:?}", status); + (true, "KMS configured successfully".to_string(), status) + } } Err(e) => { let error_msg = format!("Failed to configure KMS: {e}"); @@ -441,11 +500,19 @@ impl Operation for ReconfigureKmsHandler { let kms_config = configure_request.to_kms_config(); // Reconfigure the service (stops, reconfigures, and starts) - let (success, message, status) = match service_manager.reconfigure(kms_config).await { + let (success, message, status) = match service_manager.reconfigure(kms_config.clone()).await { Ok(()) => { - let status = service_manager.get_status().await; - info!("KMS reconfigured successfully with status: {:?}", status); - (true, "KMS reconfigured and restarted successfully".to_string(), status) + // Persist the configuration to cluster storage + if let Err(e) = save_kms_config(&kms_config).await { + let error_msg = format!("KMS reconfigured in memory but failed to persist: {e}"); + error!("{}", error_msg); + let status = service_manager.get_status().await; + (false, error_msg, status) + } else { + let status = service_manager.get_status().await; + info!("KMS reconfigured successfully and persisted with status: {:?}", status); + (true, "KMS reconfigured and restarted successfully".to_string(), status) + } } Err(e) => { let error_msg = format!("Failed to reconfigure KMS: {e}"); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 989f40d0..8d5fb327 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -550,7 +550,7 @@ async fn init_kms_system(opt: &config::Opt) -> Result<()> { // If KMS is enabled in configuration, configure and start the service if opt.kms_enable { - info!("KMS is enabled, configuring and starting service..."); + info!("KMS is enabled via command line, configuring and starting service..."); // Create KMS configuration from command line options let kms_config = match opt.kms_backend.as_str() { @@ -619,9 +619,34 @@ async fn init_kms_system(opt: &config::Opt) -> Result<()> { .await .map_err(|e| Error::other(format!("Failed to start KMS: {e}")))?; - info!("KMS service configured and started successfully"); + info!("KMS service configured and started successfully from command line options"); } else { - info!("KMS service manager initialized. KMS is ready for dynamic configuration via API."); + // Try to load persisted KMS configuration from cluster storage + info!("Attempting to load persisted KMS configuration from cluster storage..."); + + if let Some(persisted_config) = admin::handlers::kms_dynamic::load_kms_config().await { + info!("Found persisted KMS configuration, attempting to configure and start service..."); + + // Configure the KMS service with persisted config + match service_manager.configure(persisted_config).await { + Ok(()) => { + // Start the KMS service + match service_manager.start().await { + Ok(()) => { + info!("KMS service configured and started successfully from persisted configuration"); + } + Err(e) => { + warn!("Failed to start KMS with persisted configuration: {}", e); + } + } + } + Err(e) => { + warn!("Failed to configure KMS with persisted configuration: {}", e); + } + } + } else { + info!("No persisted KMS configuration found. KMS is ready for dynamic configuration via API."); + } } Ok(())