chore: bump dependencies, add metrics support, remove DNS resolver (#699)

* upgrade version

* add metrics

* remove dns resolver

* add metrics counter for create bucket

* fix

* fix

* fix
This commit is contained in:
houseme
2025-10-24 00:16:17 +08:00
committed by GitHub
parent 1d069fd351
commit e22b24684f
15 changed files with 726 additions and 850 deletions

563
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -118,7 +118,7 @@ convert_case = "0.8.0"
crc-fast = "1.3.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.49", features = ["derive", "env"] }
clap = { version = "4.5.50", features = ["derive", "env"] }
const-str = { version = "0.7.0", features = ["std", "proc"] }
crc32fast = "1.5.0"
crc32c = "0.6.8"
@@ -139,7 +139,6 @@ glob = "0.3.3"
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
hickory-resolver = { version = "0.25.2", features = ["tls-ring"] }
hmac = "0.12.1"
hyper = "1.7.0"
hyper-util = { version = "0.1.17", features = [
@@ -152,7 +151,7 @@ http = "1.3.1"
http-body = "1.0.1"
humantime = "2.3.0"
ipnetwork = { version = "0.21.1", features = ["serde"] }
jsonwebtoken = { version = "10.0.0", features = ["rust_crypto"] }
jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] }
lazy_static = "1.5.0"
libc = "0.2.177"
libsystemd = { version = "0.7.2" }
@@ -161,6 +160,8 @@ lz4 = "1.28.1"
matchit = "0.8.4"
md-5 = "0.10.6"
md5 = "0.8.0"
metrics = "0.24.2"
metrics-exporter-opentelemetry = "0.1.2"
mime_guess = "2.0.5"
moka = { version = "0.12.11", features = ["future"] }
netif = "0.1.6"
@@ -206,14 +207,14 @@ reqwest = { version = "0.12.24", default-features = false, features = [
"json",
"blocking",
] }
rmcp = { version = "0.8.1" }
rmcp = { version = "0.8.3" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
rsa = { version = "0.9.8" }
rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.7.2" }
rust-embed = { version = "8.8.0" }
rustc-hash = { version = "2.1.1" }
rustls = { version = "0.23.32", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls = { version = "0.23.34", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"
s3s = { version = "0.12.0-rc.3", features = ["minio"] }
@@ -258,7 +259,6 @@ tonic-prost-build = { version = "0.14.2" }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.6", features = ["cors"] }
tracing = { version = "0.1.41" }
tracing-core = "0.1.34"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
@@ -281,17 +281,7 @@ zstd = "0.13.3"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "tokio-test", "rustfs-audit"]
[profile.wasm-dev]
inherits = "dev"
opt-level = 1
[profile.server-dev]
inherits = "dev"
[profile.android-dev]
inherits = "dev"
ignored = ["rustfs", "rustfs-mcp", "tokio-test"]
[profile.release]
opt-level = 3

View File

@@ -40,7 +40,6 @@ pub const ENV_ACCESS_KEY: &str = "RUSTFS_ACCESS_KEY";
pub const ENV_SECRET_KEY: &str = "RUSTFS_SECRET_KEY";
pub const ENV_ROOT_USER: &str = "RUSTFS_ROOT_USER";
pub const ENV_ROOT_PASSWORD: &str = "RUSTFS_ROOT_PASSWORD";
pub static RUSTFS_CONFIG_PREFIX: &str = "config";
pub struct ConfigSys {}

View File

@@ -34,7 +34,6 @@ time = { workspace = true, features = ["serde-human-readable"] }
serde = { workspace = true, features = ["derive", "rc"] }
rustfs-ecstore = { workspace = true }
rustfs-policy.workspace = true
rustfs-config.workspace = true
serde_json.workspace = true
async-trait.workspace = true
thiserror.workspace = true

View File

@@ -36,6 +36,8 @@ gpu = ["dep:nvml-wrapper"]
rustfs-config = { workspace = true, features = ["constants", "observability"] }
rustfs-utils = { workspace = true, features = ["ip", "path"] }
flexi_logger = { workspace = true }
metrics = { workspace = true }
metrics-exporter-opentelemetry = { workspace = true }
nu-ansi-term = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
@@ -47,7 +49,6 @@ opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_ex
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-core = { workspace = true }
tracing-error = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }

View File

@@ -19,6 +19,7 @@ use flexi_logger::{
WriteMode::{AsyncWith, BufferAndFlush},
style,
};
use metrics::counter;
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
@@ -233,6 +234,13 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
meter_provider
};
match metrics_exporter_opentelemetry::Recorder::builder("order-service").install_global() {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to set global metrics recorder: {:?}", e);
}
}
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
@@ -307,7 +315,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
OBSERVABILITY_METER_NAME.set(service_name.to_string()).ok();
}
}
counter!("rustfs.start.total").increment(1);
return OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),

View File

@@ -48,7 +48,6 @@ thiserror.workspace = true
base64.workspace = true
sha1.workspace = true
sha2.workspace = true
base64-simd.workspace = true
crc64fast-nvme.workspace = true
s3s.workspace = true
hex-simd.workspace = true

View File

@@ -35,7 +35,6 @@ futures = { workspace = true, optional = true }
hashbrown = { workspace = true, optional = true }
hex-simd = { workspace = true, optional = true }
highway = { workspace = true, optional = true }
hickory-resolver = { workspace = true, optional = true }
hmac = { workspace = true, optional = true }
http = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
@@ -43,7 +42,6 @@ libc = { workspace = true, optional = true }
local-ip-address = { workspace = true, optional = true }
lz4 = { workspace = true, optional = true }
md-5 = { workspace = true, optional = true }
moka = { workspace = true, optional = true, features = ["future"] }
netif = { workspace = true, optional = true }
nix = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
@@ -83,7 +81,7 @@ workspace = true
default = ["ip"] # features that are enabled by default
ip = ["dep:local-ip-address"] # ip characteristics and their dependencies
tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies
net = ["ip", "dep:url", "dep:netif", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:hickory-resolver", "dep:moka", "dep:thiserror", "dep:tokio"] # network features with DNS resolver
net = ["ip", "dep:url", "dep:netif", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:thiserror", "dep:tokio"] # network features with DNS resolver
io = ["dep:tokio"]
path = []
notify = ["dep:hyper", "dep:s3s", "dep:hashbrown", "dep:thiserror", "dep:serde", "dep:libc"] # file system notification features

View File

@@ -12,465 +12,465 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
//! Layered DNS resolution utility for Kubernetes environments
//!
//! This module provides robust DNS resolution with multiple fallback layers:
//! 1. Local cache (Moka) for previously resolved results
//! 2. System DNS resolver (container/host adaptive) using hickory-resolver
//! 3. Public DNS servers as final fallback (8.8.8.8, 1.1.1.1) using hickory-resolver with TLS
//!
//! The resolver is designed to handle 5-level or deeper domain names that may fail
//! in Kubernetes environments due to CoreDNS configuration, DNS recursion limits,
//! or network-related issues. Uses hickory-resolver for actual DNS queries with TLS support.
use hickory_resolver::Resolver;
use hickory_resolver::config::ResolverConfig;
use hickory_resolver::name_server::TokioConnectionProvider;
use moka::future::Cache;
use std::net::IpAddr;
use std::sync::OnceLock;
use std::time::Duration;
use tracing::{debug, error, info, instrument, warn};
/// Maximum FQDN length according to RFC standards
const MAX_FQDN_LENGTH: usize = 253;
/// Maximum DNS label length according to RFC standards
const MAX_LABEL_LENGTH: usize = 63;
/// Cache entry TTL in seconds
const CACHE_TTL_SECONDS: u64 = 300; // 5 minutes
/// Maximum cache size (number of entries)
const MAX_CACHE_SIZE: u64 = 10000;
/// DNS resolution error types with detailed context and tracing information
#[derive(Debug, thiserror::Error)]
pub enum DnsError {
#[error("Invalid domain format: {reason}")]
InvalidFormat { reason: String },
#[error("Local cache miss for domain: {domain}")]
CacheMiss { domain: String },
#[error("System DNS resolution failed for domain: {domain} - {source}")]
SystemDnsFailed {
domain: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Public DNS resolution failed for domain: {domain} - {source}")]
PublicDnsFailed {
domain: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error(
"All DNS resolution attempts failed for domain: {domain}. Please check your domain spelling, network connectivity, or DNS configuration"
)]
AllAttemptsFailed { domain: String },
#[error("DNS resolver initialization failed: {source}")]
InitializationFailed {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("DNS configuration error: {source}")]
ConfigurationError {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
}
/// Layered DNS resolver with caching and multiple fallback strategies
pub struct LayeredDnsResolver {
/// Local cache for resolved domains using Moka for high performance
cache: Cache<String, Vec<IpAddr>>,
/// System DNS resolver using hickory-resolver with default configuration
system_resolver: Resolver<TokioConnectionProvider>,
/// Public DNS resolver using hickory-resolver with Cloudflare DNS servers
public_resolver: Resolver<TokioConnectionProvider>,
}
impl LayeredDnsResolver {
/// Create a new layered DNS resolver with automatic DNS configuration detection
#[instrument(skip_all)]
pub async fn new() -> Result<Self, DnsError> {
info!("Initializing layered DNS resolver with hickory-resolver, Moka cache and public DNS fallback");
// Create Moka cache with TTL and size limits
let cache = Cache::builder()
.time_to_live(Duration::from_secs(CACHE_TTL_SECONDS))
.max_capacity(MAX_CACHE_SIZE)
.build();
// Create system DNS resolver with default configuration (auto-detects container/host DNS)
let system_resolver =
Resolver::builder_with_config(ResolverConfig::default(), TokioConnectionProvider::default()).build();
let mut config = ResolverConfig::cloudflare_tls();
for ns in ResolverConfig::google_tls().name_servers() {
config.add_name_server(ns.clone())
}
// Create public DNS resolver using Cloudflare DNS with TLS support
let public_resolver = Resolver::builder_with_config(config, TokioConnectionProvider::default()).build();
info!("DNS resolver initialized successfully with hickory-resolver system and Cloudflare TLS public fallback");
Ok(Self {
cache,
system_resolver,
public_resolver,
})
}
/// Validate domain format according to RFC standards
#[instrument(skip_all, fields(domain = %domain))]
fn validate_domain_format(domain: &str) -> Result<(), DnsError> {
info!("Validating domain format start");
// Check FQDN length
if domain.len() > MAX_FQDN_LENGTH {
return Err(DnsError::InvalidFormat {
reason: format!("FQDN must not exceed {} bytes, got {} bytes", MAX_FQDN_LENGTH, domain.len()),
});
}
// Check each label length
for label in domain.split('.') {
if label.len() > MAX_LABEL_LENGTH {
return Err(DnsError::InvalidFormat {
reason: format!(
"Each label must not exceed {} bytes, label '{}' has {} bytes",
MAX_LABEL_LENGTH,
label,
label.len()
),
});
}
}
// Check for empty labels (except trailing dot)
let labels: Vec<&str> = domain.trim_end_matches('.').split('.').collect();
for label in &labels {
if label.is_empty() {
return Err(DnsError::InvalidFormat {
reason: "Domain contains empty labels".to_string(),
});
}
}
info!("DNS resolver validated successfully");
Ok(())
}
/// Check local cache for resolved domain
#[instrument(skip_all, fields(domain = %domain))]
async fn check_cache(&self, domain: &str) -> Option<Vec<IpAddr>> {
match self.cache.get(domain).await {
Some(ips) => {
debug!("DNS cache hit for domain: {}, found {} IPs", domain, ips.len());
Some(ips)
}
None => {
debug!("DNS cache miss for domain: {}", domain);
None
}
}
}
/// Update local cache with resolved IPs
#[instrument(skip_all, fields(domain = %domain, ip_count = ips.len()))]
async fn update_cache(&self, domain: &str, ips: Vec<IpAddr>) {
self.cache.insert(domain.to_string(), ips.clone()).await;
debug!("DNS cache updated for domain: {} with {} IPs", domain, ips.len());
}
/// Get cache statistics for monitoring
#[instrument(skip_all)]
pub async fn cache_stats(&self) -> (u64, u64) {
let entry_count = self.cache.entry_count();
let weighted_size = self.cache.weighted_size();
debug!("DNS cache stats - entries: {}, weighted_size: {}", entry_count, weighted_size);
(entry_count, weighted_size)
}
/// Manually invalidate cache entries (useful for testing or forced refresh)
#[instrument(skip_all)]
pub async fn invalidate_cache(&self) {
self.cache.invalidate_all();
info!("DNS cache invalidated");
}
/// Resolve domain using system DNS (cluster/host DNS configuration) with hickory-resolver
#[instrument(skip_all, fields(domain = %domain))]
async fn resolve_with_system_dns(&self, domain: &str) -> Result<Vec<IpAddr>, DnsError> {
debug!("Attempting system DNS resolution for domain: {} using hickory-resolver", domain);
match self.system_resolver.lookup_ip(domain).await {
Ok(lookup) => {
let ips: Vec<IpAddr> = lookup.iter().collect();
if !ips.is_empty() {
info!("System DNS resolution successful for domain: {} -> {} IPs", domain, ips.len());
Ok(ips)
} else {
warn!("System DNS returned empty result for domain: {}", domain);
Err(DnsError::SystemDnsFailed {
domain: domain.to_string(),
source: "No IP addresses found".to_string().into(),
})
}
}
Err(e) => {
warn!("System DNS resolution failed for domain: {} - {}", domain, e);
Err(DnsError::SystemDnsFailed {
domain: domain.to_string(),
source: Box::new(e),
})
}
}
}
/// Resolve domain using public DNS servers (Cloudflare TLS DNS) with hickory-resolver
#[instrument(skip_all, fields(domain = %domain))]
async fn resolve_with_public_dns(&self, domain: &str) -> Result<Vec<IpAddr>, DnsError> {
debug!(
"Attempting public DNS resolution for domain: {} using hickory-resolver with TLS-enabled Cloudflare DNS",
domain
);
match self.public_resolver.lookup_ip(domain).await {
Ok(lookup) => {
let ips: Vec<IpAddr> = lookup.iter().collect();
if !ips.is_empty() {
info!("Public DNS resolution successful for domain: {} -> {} IPs", domain, ips.len());
Ok(ips)
} else {
warn!("Public DNS returned empty result for domain: {}", domain);
Err(DnsError::PublicDnsFailed {
domain: domain.to_string(),
source: "No IP addresses found".to_string().into(),
})
}
}
Err(e) => {
error!("Public DNS resolution failed for domain: {} - {}", domain, e);
Err(DnsError::PublicDnsFailed {
domain: domain.to_string(),
source: Box::new(e),
})
}
}
}
/// Resolve domain with layered fallback strategy using hickory-resolver
///
/// Resolution order with detailed tracing:
/// 1. Local cache (Moka with TTL)
/// 2. System DNS (hickory-resolver with host/container adaptive configuration)
/// 3. Public DNS (hickory-resolver with TLS-enabled Cloudflare DNS fallback)
#[instrument(skip_all, fields(domain = %domain))]
pub async fn resolve(&self, domain: &str) -> Result<Vec<IpAddr>, DnsError> {
info!("Starting DNS resolution process for domain: {} start", domain);
// Validate domain format first
Self::validate_domain_format(domain)?;
info!("Starting DNS resolution for domain: {}", domain);
// Step 1: Check local cache
if let Some(ips) = self.check_cache(domain).await {
info!("DNS resolution completed from cache for domain: {} -> {} IPs", domain, ips.len());
return Ok(ips);
}
debug!("Local cache miss for domain: {}, attempting system DNS", domain);
// Step 2: Try system DNS (cluster/host adaptive)
match self.resolve_with_system_dns(domain).await {
Ok(ips) => {
self.update_cache(domain, ips.clone()).await;
info!("DNS resolution completed via system DNS for domain: {} -> {} IPs", domain, ips.len());
return Ok(ips);
}
Err(system_err) => {
warn!("System DNS failed for domain: {} - {}", domain, system_err);
}
}
// Step 3: Fallback to public DNS
info!("Falling back to public DNS for domain: {}", domain);
match self.resolve_with_public_dns(domain).await {
Ok(ips) => {
self.update_cache(domain, ips.clone()).await;
info!("DNS resolution completed via public DNS for domain: {} -> {} IPs", domain, ips.len());
Ok(ips)
}
Err(public_err) => {
error!(
"All DNS resolution attempts failed for domain:` {}`. System DNS: failed, Public DNS: {}",
domain, public_err
);
Err(DnsError::AllAttemptsFailed {
domain: domain.to_string(),
})
}
}
}
}
/// Global DNS resolver instance
static GLOBAL_DNS_RESOLVER: OnceLock<LayeredDnsResolver> = OnceLock::new();
/// Initialize the global DNS resolver
#[instrument]
pub async fn init_global_dns_resolver() -> Result<(), DnsError> {
info!("Initializing global DNS resolver");
let resolver = LayeredDnsResolver::new().await?;
match GLOBAL_DNS_RESOLVER.set(resolver) {
Ok(()) => {
info!("Global DNS resolver initialized successfully");
Ok(())
}
Err(_) => {
warn!("Global DNS resolver was already initialized");
Ok(())
}
}
}
/// Get the global DNS resolver instance
pub fn get_global_dns_resolver() -> Option<&'static LayeredDnsResolver> {
GLOBAL_DNS_RESOLVER.get()
}
/// Resolve domain using the global DNS resolver with comprehensive tracing
#[instrument(skip_all, fields(domain = %domain))]
pub async fn resolve_domain(domain: &str) -> Result<Vec<IpAddr>, DnsError> {
info!("resolving domain for: {}", domain);
match get_global_dns_resolver() {
Some(resolver) => resolver.resolve(domain).await,
None => Err(DnsError::InitializationFailed {
source: "Global DNS resolver not initialized. Call init_global_dns_resolver() first."
.to_string()
.into(),
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_domain_validation() {
// Valid domains
assert!(LayeredDnsResolver::validate_domain_format("example.com").is_ok());
assert!(LayeredDnsResolver::validate_domain_format("sub.example.com").is_ok());
assert!(LayeredDnsResolver::validate_domain_format("very.deep.sub.domain.example.com").is_ok());
// Invalid domains - too long FQDN
let long_domain = "a".repeat(254);
assert!(LayeredDnsResolver::validate_domain_format(&long_domain).is_err());
// Invalid domains - label too long
let long_label = format!("{}.com", "a".repeat(64));
assert!(LayeredDnsResolver::validate_domain_format(&long_label).is_err());
// Invalid domains - empty label
assert!(LayeredDnsResolver::validate_domain_format("example..com").is_err());
}
#[tokio::test]
async fn test_cache_functionality() {
let resolver = LayeredDnsResolver::new().await.unwrap();
// Test cache miss
assert!(resolver.check_cache("example.com").await.is_none());
// Update cache
let test_ips = vec![IpAddr::from([192, 0, 2, 1])];
resolver.update_cache("example.com", test_ips.clone()).await;
// Test cache hit
assert_eq!(resolver.check_cache("example.com").await, Some(test_ips));
// Test cache stats (note: moka cache might not immediately reflect changes)
let (total, _weighted_size) = resolver.cache_stats().await;
// Cache should have at least the entry we just added (might be 0 due to async nature)
assert!(total <= 1, "Cache should have at most 1 entry, got {total}");
}
#[tokio::test]
async fn test_dns_resolution() {
let resolver = LayeredDnsResolver::new().await.unwrap();
// Test resolution of a known domain (localhost should always resolve)
match resolver.resolve("localhost").await {
Ok(ips) => {
assert!(!ips.is_empty());
println!("Resolved localhost to: {ips:?}");
}
Err(e) => {
// In some test environments, even localhost might fail
// This is acceptable as long as our error handling works
println!("DNS resolution failed (might be expected in test environments): {e}");
}
}
}
#[tokio::test]
async fn test_invalid_domain_resolution() {
let resolver = LayeredDnsResolver::new().await.unwrap();
// Test resolution of invalid domain
let result = resolver
.resolve("nonexistent.invalid.domain.example.thisdefinitelydoesnotexist")
.await;
assert!(result.is_err());
if let Err(e) = result {
println!("Expected error for invalid domain: {e}");
// Should be AllAttemptsFailed since both system and public DNS should fail
assert!(matches!(e, DnsError::AllAttemptsFailed { .. }));
}
}
#[tokio::test]
async fn test_cache_invalidation() {
let resolver = LayeredDnsResolver::new().await.unwrap();
// Add entry to cache
let test_ips = vec![IpAddr::from([192, 0, 2, 1])];
resolver.update_cache("test.example.com", test_ips.clone()).await;
// Verify cache hit
assert_eq!(resolver.check_cache("test.example.com").await, Some(test_ips));
// Invalidate cache
resolver.invalidate_cache().await;
// Verify cache miss after invalidation
assert!(resolver.check_cache("test.example.com").await.is_none());
}
#[tokio::test]
async fn test_global_resolver_initialization() {
// Test initialization
assert!(init_global_dns_resolver().await.is_ok());
// Test that resolver is available
assert!(get_global_dns_resolver().is_some());
// Test domain resolution through global resolver
match resolve_domain("localhost").await {
Ok(ips) => {
assert!(!ips.is_empty());
println!("Global resolver resolved localhost to: {ips:?}");
}
Err(e) => {
println!("Global resolver DNS resolution failed (might be expected in test environments): {e}");
}
}
}
}
// #![allow(dead_code)]
//
// //! Layered DNS resolution utility for Kubernetes environments
// //!
// //! This module provides robust DNS resolution with multiple fallback layers:
// //! 1. Local cache (Moka) for previously resolved results
// //! 2. System DNS resolver (container/host adaptive) using hickory-resolver
// //! 3. Public DNS servers as final fallback (8.8.8.8, 1.1.1.1) using hickory-resolver with TLS
// //!
// //! The resolver is designed to handle 5-level or deeper domain names that may fail
// //! in Kubernetes environments due to CoreDNS configuration, DNS recursion limits,
// //! or network-related issues. Uses hickory-resolver for actual DNS queries with TLS support.
//
// use hickory_resolver::Resolver;
// use hickory_resolver::config::ResolverConfig;
// use hickory_resolver::name_server::TokioConnectionProvider;
// use moka::future::Cache;
// use std::net::IpAddr;
// use std::sync::OnceLock;
// use std::time::Duration;
// use tracing::{debug, error, info, instrument, warn};
//
// /// Maximum FQDN length according to RFC standards
// const MAX_FQDN_LENGTH: usize = 253;
// /// Maximum DNS label length according to RFC standards
// const MAX_LABEL_LENGTH: usize = 63;
// /// Cache entry TTL in seconds
// const CACHE_TTL_SECONDS: u64 = 300; // 5 minutes
// /// Maximum cache size (number of entries)
// const MAX_CACHE_SIZE: u64 = 10000;
//
// /// DNS resolution error types with detailed context and tracing information
// #[derive(Debug, thiserror::Error)]
// pub enum DnsError {
// #[error("Invalid domain format: {reason}")]
// InvalidFormat { reason: String },
//
// #[error("Local cache miss for domain: {domain}")]
// CacheMiss { domain: String },
//
// #[error("System DNS resolution failed for domain: {domain} - {source}")]
// SystemDnsFailed {
// domain: String,
// #[source]
// source: Box<dyn std::error::Error + Send + Sync>,
// },
//
// #[error("Public DNS resolution failed for domain: {domain} - {source}")]
// PublicDnsFailed {
// domain: String,
// #[source]
// source: Box<dyn std::error::Error + Send + Sync>,
// },
//
// #[error(
// "All DNS resolution attempts failed for domain: {domain}. Please check your domain spelling, network connectivity, or DNS configuration"
// )]
// AllAttemptsFailed { domain: String },
//
// #[error("DNS resolver initialization failed: {source}")]
// InitializationFailed {
// #[source]
// source: Box<dyn std::error::Error + Send + Sync>,
// },
//
// #[error("DNS configuration error: {source}")]
// ConfigurationError {
// #[source]
// source: Box<dyn std::error::Error + Send + Sync>,
// },
// }
//
// /// Layered DNS resolver with caching and multiple fallback strategies
// pub struct LayeredDnsResolver {
// /// Local cache for resolved domains using Moka for high performance
// cache: Cache<String, Vec<IpAddr>>,
// /// System DNS resolver using hickory-resolver with default configuration
// system_resolver: Resolver<TokioConnectionProvider>,
// /// Public DNS resolver using hickory-resolver with Cloudflare DNS servers
// public_resolver: Resolver<TokioConnectionProvider>,
// }
//
// impl LayeredDnsResolver {
// /// Create a new layered DNS resolver with automatic DNS configuration detection
// #[instrument(skip_all)]
// pub async fn new() -> Result<Self, DnsError> {
// info!("Initializing layered DNS resolver with hickory-resolver, Moka cache and public DNS fallback");
//
// // Create Moka cache with TTL and size limits
// let cache = Cache::builder()
// .time_to_live(Duration::from_secs(CACHE_TTL_SECONDS))
// .max_capacity(MAX_CACHE_SIZE)
// .build();
//
// // Create system DNS resolver with default configuration (auto-detects container/host DNS)
// let system_resolver =
// Resolver::builder_with_config(ResolverConfig::default(), TokioConnectionProvider::default()).build();
//
// let mut config = ResolverConfig::cloudflare_tls();
// for ns in ResolverConfig::google_tls().name_servers() {
// config.add_name_server(ns.clone())
// }
// // Create public DNS resolver using Cloudflare DNS with TLS support
// let public_resolver = Resolver::builder_with_config(config, TokioConnectionProvider::default()).build();
//
// info!("DNS resolver initialized successfully with hickory-resolver system and Cloudflare TLS public fallback");
//
// Ok(Self {
// cache,
// system_resolver,
// public_resolver,
// })
// }
//
// /// Validate domain format according to RFC standards
// #[instrument(skip_all, fields(domain = %domain))]
// fn validate_domain_format(domain: &str) -> Result<(), DnsError> {
// info!("Validating domain format start");
// // Check FQDN length
// if domain.len() > MAX_FQDN_LENGTH {
// return Err(DnsError::InvalidFormat {
// reason: format!("FQDN must not exceed {} bytes, got {} bytes", MAX_FQDN_LENGTH, domain.len()),
// });
// }
//
// // Check each label length
// for label in domain.split('.') {
// if label.len() > MAX_LABEL_LENGTH {
// return Err(DnsError::InvalidFormat {
// reason: format!(
// "Each label must not exceed {} bytes, label '{}' has {} bytes",
// MAX_LABEL_LENGTH,
// label,
// label.len()
// ),
// });
// }
// }
//
// // Check for empty labels (except trailing dot)
// let labels: Vec<&str> = domain.trim_end_matches('.').split('.').collect();
// for label in &labels {
// if label.is_empty() {
// return Err(DnsError::InvalidFormat {
// reason: "Domain contains empty labels".to_string(),
// });
// }
// }
// info!("DNS resolver validated successfully");
// Ok(())
// }
//
// /// Check local cache for resolved domain
// #[instrument(skip_all, fields(domain = %domain))]
// async fn check_cache(&self, domain: &str) -> Option<Vec<IpAddr>> {
// match self.cache.get(domain).await {
// Some(ips) => {
// debug!("DNS cache hit for domain: {}, found {} IPs", domain, ips.len());
// Some(ips)
// }
// None => {
// debug!("DNS cache miss for domain: {}", domain);
// None
// }
// }
// }
//
// /// Update local cache with resolved IPs
// #[instrument(skip_all, fields(domain = %domain, ip_count = ips.len()))]
// async fn update_cache(&self, domain: &str, ips: Vec<IpAddr>) {
// self.cache.insert(domain.to_string(), ips.clone()).await;
// debug!("DNS cache updated for domain: {} with {} IPs", domain, ips.len());
// }
//
// /// Get cache statistics for monitoring
// #[instrument(skip_all)]
// pub async fn cache_stats(&self) -> (u64, u64) {
// let entry_count = self.cache.entry_count();
// let weighted_size = self.cache.weighted_size();
// debug!("DNS cache stats - entries: {}, weighted_size: {}", entry_count, weighted_size);
// (entry_count, weighted_size)
// }
//
// /// Manually invalidate cache entries (useful for testing or forced refresh)
// #[instrument(skip_all)]
// pub async fn invalidate_cache(&self) {
// self.cache.invalidate_all();
// info!("DNS cache invalidated");
// }
//
// /// Resolve domain using system DNS (cluster/host DNS configuration) with hickory-resolver
// #[instrument(skip_all, fields(domain = %domain))]
// async fn resolve_with_system_dns(&self, domain: &str) -> Result<Vec<IpAddr>, DnsError> {
// debug!("Attempting system DNS resolution for domain: {} using hickory-resolver", domain);
//
// match self.system_resolver.lookup_ip(domain).await {
// Ok(lookup) => {
// let ips: Vec<IpAddr> = lookup.iter().collect();
// if !ips.is_empty() {
// info!("System DNS resolution successful for domain: {} -> {} IPs", domain, ips.len());
// Ok(ips)
// } else {
// warn!("System DNS returned empty result for domain: {}", domain);
// Err(DnsError::SystemDnsFailed {
// domain: domain.to_string(),
// source: "No IP addresses found".to_string().into(),
// })
// }
// }
// Err(e) => {
// warn!("System DNS resolution failed for domain: {} - {}", domain, e);
// Err(DnsError::SystemDnsFailed {
// domain: domain.to_string(),
// source: Box::new(e),
// })
// }
// }
// }
//
// /// Resolve domain using public DNS servers (Cloudflare TLS DNS) with hickory-resolver
// #[instrument(skip_all, fields(domain = %domain))]
// async fn resolve_with_public_dns(&self, domain: &str) -> Result<Vec<IpAddr>, DnsError> {
// debug!(
// "Attempting public DNS resolution for domain: {} using hickory-resolver with TLS-enabled Cloudflare DNS",
// domain
// );
//
// match self.public_resolver.lookup_ip(domain).await {
// Ok(lookup) => {
// let ips: Vec<IpAddr> = lookup.iter().collect();
// if !ips.is_empty() {
// info!("Public DNS resolution successful for domain: {} -> {} IPs", domain, ips.len());
// Ok(ips)
// } else {
// warn!("Public DNS returned empty result for domain: {}", domain);
// Err(DnsError::PublicDnsFailed {
// domain: domain.to_string(),
// source: "No IP addresses found".to_string().into(),
// })
// }
// }
// Err(e) => {
// error!("Public DNS resolution failed for domain: {} - {}", domain, e);
// Err(DnsError::PublicDnsFailed {
// domain: domain.to_string(),
// source: Box::new(e),
// })
// }
// }
// }
//
// /// Resolve domain with layered fallback strategy using hickory-resolver
// ///
// /// Resolution order with detailed tracing:
// /// 1. Local cache (Moka with TTL)
// /// 2. System DNS (hickory-resolver with host/container adaptive configuration)
// /// 3. Public DNS (hickory-resolver with TLS-enabled Cloudflare DNS fallback)
// #[instrument(skip_all, fields(domain = %domain))]
// pub async fn resolve(&self, domain: &str) -> Result<Vec<IpAddr>, DnsError> {
// info!("Starting DNS resolution process for domain: {} start", domain);
// // Validate domain format first
// Self::validate_domain_format(domain)?;
//
// info!("Starting DNS resolution for domain: {}", domain);
//
// // Step 1: Check local cache
// if let Some(ips) = self.check_cache(domain).await {
// info!("DNS resolution completed from cache for domain: {} -> {} IPs", domain, ips.len());
// return Ok(ips);
// }
//
// debug!("Local cache miss for domain: {}, attempting system DNS", domain);
//
// // Step 2: Try system DNS (cluster/host adaptive)
// match self.resolve_with_system_dns(domain).await {
// Ok(ips) => {
// self.update_cache(domain, ips.clone()).await;
// info!("DNS resolution completed via system DNS for domain: {} -> {} IPs", domain, ips.len());
// return Ok(ips);
// }
// Err(system_err) => {
// warn!("System DNS failed for domain: {} - {}", domain, system_err);
// }
// }
//
// // Step 3: Fallback to public DNS
// info!("Falling back to public DNS for domain: {}", domain);
// match self.resolve_with_public_dns(domain).await {
// Ok(ips) => {
// self.update_cache(domain, ips.clone()).await;
// info!("DNS resolution completed via public DNS for domain: {} -> {} IPs", domain, ips.len());
// Ok(ips)
// }
// Err(public_err) => {
// error!(
// "All DNS resolution attempts failed for domain:` {}`. System DNS: failed, Public DNS: {}",
// domain, public_err
// );
// Err(DnsError::AllAttemptsFailed {
// domain: domain.to_string(),
// })
// }
// }
// }
// }
//
// /// Global DNS resolver instance
// static GLOBAL_DNS_RESOLVER: OnceLock<LayeredDnsResolver> = OnceLock::new();
//
// /// Initialize the global DNS resolver
// #[instrument]
// pub async fn init_global_dns_resolver() -> Result<(), DnsError> {
// info!("Initializing global DNS resolver");
// let resolver = LayeredDnsResolver::new().await?;
//
// match GLOBAL_DNS_RESOLVER.set(resolver) {
// Ok(()) => {
// info!("Global DNS resolver initialized successfully");
// Ok(())
// }
// Err(_) => {
// warn!("Global DNS resolver was already initialized");
// Ok(())
// }
// }
// }
//
// /// Get the global DNS resolver instance
// pub fn get_global_dns_resolver() -> Option<&'static LayeredDnsResolver> {
// GLOBAL_DNS_RESOLVER.get()
// }
//
// /// Resolve domain using the global DNS resolver with comprehensive tracing
// #[instrument(skip_all, fields(domain = %domain))]
// pub async fn resolve_domain(domain: &str) -> Result<Vec<IpAddr>, DnsError> {
// info!("resolving domain for: {}", domain);
// match get_global_dns_resolver() {
// Some(resolver) => resolver.resolve(domain).await,
// None => Err(DnsError::InitializationFailed {
// source: "Global DNS resolver not initialized. Call init_global_dns_resolver() first."
// .to_string()
// .into(),
// }),
// }
// }
//
// #[cfg(test)]
// mod tests {
// use super::*;
//
// #[test]
// fn test_domain_validation() {
// // Valid domains
// assert!(LayeredDnsResolver::validate_domain_format("example.com").is_ok());
// assert!(LayeredDnsResolver::validate_domain_format("sub.example.com").is_ok());
// assert!(LayeredDnsResolver::validate_domain_format("very.deep.sub.domain.example.com").is_ok());
//
// // Invalid domains - too long FQDN
// let long_domain = "a".repeat(254);
// assert!(LayeredDnsResolver::validate_domain_format(&long_domain).is_err());
//
// // Invalid domains - label too long
// let long_label = format!("{}.com", "a".repeat(64));
// assert!(LayeredDnsResolver::validate_domain_format(&long_label).is_err());
//
// // Invalid domains - empty label
// assert!(LayeredDnsResolver::validate_domain_format("example..com").is_err());
// }
//
// #[tokio::test]
// async fn test_cache_functionality() {
// let resolver = LayeredDnsResolver::new().await.unwrap();
//
// // Test cache miss
// assert!(resolver.check_cache("example.com").await.is_none());
//
// // Update cache
// let test_ips = vec![IpAddr::from([192, 0, 2, 1])];
// resolver.update_cache("example.com", test_ips.clone()).await;
//
// // Test cache hit
// assert_eq!(resolver.check_cache("example.com").await, Some(test_ips));
//
// // Test cache stats (note: moka cache might not immediately reflect changes)
// let (total, _weighted_size) = resolver.cache_stats().await;
// // Cache should have at least the entry we just added (might be 0 due to async nature)
// assert!(total <= 1, "Cache should have at most 1 entry, got {total}");
// }
//
// #[tokio::test]
// async fn test_dns_resolution() {
// let resolver = LayeredDnsResolver::new().await.unwrap();
//
// // Test resolution of a known domain (localhost should always resolve)
// match resolver.resolve("localhost").await {
// Ok(ips) => {
// assert!(!ips.is_empty());
// println!("Resolved localhost to: {ips:?}");
// }
// Err(e) => {
// // In some test environments, even localhost might fail
// // This is acceptable as long as our error handling works
// println!("DNS resolution failed (might be expected in test environments): {e}");
// }
// }
// }
//
// #[tokio::test]
// async fn test_invalid_domain_resolution() {
// let resolver = LayeredDnsResolver::new().await.unwrap();
//
// // Test resolution of invalid domain
// let result = resolver
// .resolve("nonexistent.invalid.domain.example.thisdefinitelydoesnotexist")
// .await;
// assert!(result.is_err());
//
// if let Err(e) = result {
// println!("Expected error for invalid domain: {e}");
// // Should be AllAttemptsFailed since both system and public DNS should fail
// assert!(matches!(e, DnsError::AllAttemptsFailed { .. }));
// }
// }
//
// #[tokio::test]
// async fn test_cache_invalidation() {
// let resolver = LayeredDnsResolver::new().await.unwrap();
//
// // Add entry to cache
// let test_ips = vec![IpAddr::from([192, 0, 2, 1])];
// resolver.update_cache("test.example.com", test_ips.clone()).await;
//
// // Verify cache hit
// assert_eq!(resolver.check_cache("test.example.com").await, Some(test_ips));
//
// // Invalidate cache
// resolver.invalidate_cache().await;
//
// // Verify cache miss after invalidation
// assert!(resolver.check_cache("test.example.com").await.is_none());
// }
//
// #[tokio::test]
// async fn test_global_resolver_initialization() {
// // Test initialization
// assert!(init_global_dns_resolver().await.is_ok());
//
// // Test that resolver is available
// assert!(get_global_dns_resolver().is_some());
//
// // Test domain resolution through global resolver
// match resolve_domain("localhost").await {
// Ok(ips) => {
// assert!(!ips.is_empty());
// println!("Global resolver resolved localhost to: {ips:?}");
// }
// Err(e) => {
// println!("Global resolver DNS resolution failed (might be expected in test environments): {e}");
// }
// }
// }
// }

View File

@@ -24,8 +24,6 @@ pub mod net;
#[cfg(feature = "http")]
pub mod http;
#[cfg(feature = "net")]
pub use dns_resolver::*;
#[cfg(feature = "net")]
pub use net::*;

View File

@@ -148,17 +148,6 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R
pub async fn get_host_ip(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => {
match crate::dns_resolver::resolve_domain(domain).await {
Ok(ips) => {
info!("Resolved domain {domain} using custom DNS resolver: {ips:?}");
return Ok(ips.into_iter().collect());
}
Err(err) => {
error!(
"Failed to resolve domain {domain} using custom DNS resolver, falling back to system resolver,err: {err}"
);
}
}
// Check cache first
if CUSTOM_DNS_RESOLVER.read().unwrap().is_none() {
if let Ok(mut cache) = DNS_CACHE.lock() {
@@ -321,7 +310,6 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::init_global_dns_resolver;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::{collections::HashSet, io::Error as IoError};
@@ -448,12 +436,7 @@ mod test {
#[tokio::test]
async fn test_get_host_ip() {
set_mock_dns_resolver(mock_resolver);
match init_global_dns_resolver().await {
Ok(_) => {}
Err(e) => {
error!("Failed to initialize global DNS resolver: {e}");
}
}
// Test IPv4 address
let ipv4_host = Host::Ipv4(Ipv4Addr::new(192, 168, 1, 1));
let ipv4_result = get_host_ip(ipv4_host).await.unwrap();

View File

@@ -122,6 +122,7 @@ uuid = { workspace = true }
zip = { workspace = true }
base64-simd.workspace = true
hex-simd.workspace = true
metrics = { workspace = true }
[target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
sysctl = { workspace = true }

View File

@@ -158,13 +158,6 @@ async fn async_main() -> Result<()> {
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
let dns_init = tokio::spawn(async {
if let Err(e) = rustfs_utils::dns_resolver::init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
});
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
}
@@ -190,9 +183,6 @@ async fn run(opt: config::Opt) -> Result<()> {
set_global_addr(&opt.address).await;
// Wait for DNS initialization to complete before network-heavy operations
dns_init.await.map_err(Error::other)?;
// For RPC
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone())
.await

View File

@@ -27,6 +27,7 @@ use hyper_util::{
server::graceful::GracefulShutdown,
service::TowerToHyperService,
};
use metrics::counter;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::SystemObserver;
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
@@ -488,7 +489,12 @@ fn process_connection(
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
debug!("http started method: {}, url path: {}", request.method(), request.uri().path());
let labels = [
("key_request_method", format!("{}", request.method())),
("key_request_uri_path", request.uri().path().to_owned().to_string()),
];
counter!("rustfs_api_requests_total", &labels).increment(1);
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));

View File

@@ -30,6 +30,7 @@ use datafusion::arrow::{
};
use futures::StreamExt;
use http::{HeaderMap, StatusCode};
use metrics::counter;
use rustfs_ecstore::{
bucket::{
lifecycle::{bucket_lifecycle_ops::validate_transition_tier, lifecycle::Lifecycle},
@@ -603,6 +604,8 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
counter!("rustfs_create_bucket_total").increment(1);
store
.make_bucket(
&bucket,