diff --git a/.github/workflows/helm-package.yml b/.github/workflows/helm-package.yml new file mode 100644 index 00000000..9c7b46ee --- /dev/null +++ b/.github/workflows/helm-package.yml @@ -0,0 +1,78 @@ +name: Publish helm chart to artifacthub + +on: + workflow_run: + workflows: ["Build and Release"] + types: [completed] + +env: + new_version: ${{ github.event.workflow_run.head_branch }} + +jobs: + build-helm-package: + runs-on: ubuntu-latest + # Only run on successful builds triggered by tag pushes (version format: x.y.z or x.y.z-suffix) + if: | + github.event.workflow_run.conclusion == 'success' && + github.event.workflow_run.event == 'push' && + contains(github.event.workflow_run.head_branch, '.') + + steps: + - name: Checkout helm chart repo + uses: actions/checkout@v2 + + - name: Replace chart appversion + run: | + set -e + set -x + old_version=$(grep "^appVersion:" helm/rustfs/Chart.yaml | awk '{print $2}') + sed -i "s/$old_version/$new_version/g" helm/rustfs/Chart.yaml + sed -i "/^image:/,/^[^ ]/ s/tag:.*/tag: "$new_version"/" helm/rustfs/values.yaml + + - name: Set up Helm + uses: azure/setup-helm@v4.3.0 + + - name: Package Helm Chart + run: | + cp helm/README.md helm/rustfs/ + package_version=$(echo $new_version | awk -F '-' '{print $2}' | awk -F '.' '{print $NF}') + helm package ./helm/rustfs --destination helm/rustfs/ --version "0.0.$package_version" + + - name: Upload helm package as artifact + uses: actions/upload-artifact@v4 + with: + name: helm-package + path: helm/rustfs/*.tgz + retention-days: 1 + + publish-helm-package: + runs-on: ubuntu-latest + needs: [build-helm-package] + + steps: + - name: Checkout helm package repo + uses: actions/checkout@v2 + with: + repository: rustfs/helm + token: ${{ secrets.RUSTFS_HELM_PACKAGE }} + + - name: Download helm package + uses: actions/download-artifact@v4 + with: + name: helm-package + path: ./ + + - name: Set up helm + uses: azure/setup-helm@v4.3.0 + + - name: Generate index + run: helm repo index . --url https://charts.rustfs.com + + - name: Push helm package and index file + run: | + git config --global user.name "${{ secrets.USERNAME }}" + git config --global user.email "${{ secrets.EMAIL_ADDRESS }}" + git status . + git add . + git commit -m "Update rustfs helm package with $new_version." + git push origin main diff --git a/.vscode/launch.json b/.vscode/launch.json index 9bf11962..f054a23a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -22,6 +22,7 @@ "env": { "RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=debug", "RUSTFS_SKIP_BACKGROUND_TASK": "on", + //"RUSTFS_OBS_LOG_DIRECTORY": "./deploy/logs", // "RUSTFS_POLICY_PLUGIN_URL":"http://localhost:8181/v1/data/rustfs/authz/allow", // "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN":"your-opa-token" }, @@ -91,6 +92,9 @@ "RUSTFS_VOLUMES": "./target/volume/test{1...4}", "RUSTFS_ADDRESS": ":9000", "RUSTFS_CONSOLE_ENABLE": "true", + // "RUSTFS_OBS_TRACE_ENDPOINT": "http://127.0.0.1:4318/v1/traces", // jeager otlp http endpoint + // "RUSTFS_OBS_METRIC_ENDPOINT": "http://127.0.0.1:4318/v1/metrics", // default otlp http endpoint + // "RUSTFS_OBS_LOG_ENDPOINT": "http://127.0.0.1:4318/v1/logs", // default otlp http endpoint "RUSTFS_CONSOLE_ADDRESS": "127.0.0.1:9001", "RUSTFS_OBS_LOG_DIRECTORY": "./target/logs", }, diff --git a/Cargo.lock b/Cargo.lock index b9a69113..08ae3bbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7006,6 +7006,7 @@ dependencies = [ "serde_urlencoded", "shadow-rs", "socket2 0.6.1", + "subtle", "sysctl", "sysinfo", "thiserror 2.0.17", diff --git a/Cargo.toml b/Cargo.toml index c369f677..f4608aef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,6 +152,7 @@ rustls-pemfile = "2.2.0" rustls-pki-types = "1.13.1" sha1 = "0.11.0-rc.3" sha2 = "0.11.0-rc.3" +subtle = "2.6" zeroize = { version = "1.8.2", features = ["derive"] } # Time and Date diff --git a/README.md b/README.md index 1d872fc0..bf6d7fb6 100644 --- a/README.md +++ b/README.md @@ -188,7 +188,7 @@ If you have any questions or need assistance: - **Business**: [hello@rustfs.com](mailto:hello@rustfs.com) - **Jobs**: [jobs@rustfs.com](mailto:jobs@rustfs.com) - **General Discussion**: [GitHub Discussions](https://github.com/rustfs/rustfs/discussions) - - **Contributing**: [CONTRIBUTING.md](https://www.google.com/search?q=CONTRIBUTING.md) + - **Contributing**: [CONTRIBUTING.md](CONTRIBUTING.md) ## Contributors diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index d86b2de8..31e2dd77 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -143,16 +143,16 @@ impl PriorityHealQueue { format!("object:{}:{}:{}", bucket, object, version_id.as_deref().unwrap_or("")) } HealType::Bucket { bucket } => { - format!("bucket:{}", bucket) + format!("bucket:{bucket}") } HealType::ErasureSet { set_disk_id, .. } => { - format!("erasure_set:{}", set_disk_id) + format!("erasure_set:{set_disk_id}") } HealType::Metadata { bucket, object } => { - format!("metadata:{}:{}", bucket, object) + format!("metadata:{bucket}:{object}") } HealType::MRF { meta_path } => { - format!("mrf:{}", meta_path) + format!("mrf:{meta_path}") } HealType::ECDecode { bucket, @@ -173,7 +173,7 @@ impl PriorityHealQueue { /// Check if an erasure set heal request for a specific set_disk_id exists fn contains_erasure_set(&self, set_disk_id: &str) -> bool { - let key = format!("erasure_set:{}", set_disk_id); + let key = format!("erasure_set:{set_disk_id}"); self.dedup_keys.contains(&key) } } @@ -327,7 +327,7 @@ impl HealManager { if queue_len >= queue_capacity { return Err(Error::ConfigurationError { - message: format!("Heal queue is full ({}/{})", queue_len, queue_capacity), + message: format!("Heal queue is full ({queue_len}/{queue_capacity})"), }); } diff --git a/crates/common/src/heal_channel.rs b/crates/common/src/heal_channel.rs index 31b906d3..67b7f46d 100644 --- a/crates/common/src/heal_channel.rs +++ b/crates/common/src/heal_channel.rs @@ -125,7 +125,7 @@ impl<'de> Deserialize<'de> for HealScanMode { 0 => Ok(HealScanMode::Unknown), 1 => Ok(HealScanMode::Normal), 2 => Ok(HealScanMode::Deep), - _ => Err(E::custom(format!("invalid HealScanMode value: {}", value))), + _ => Err(E::custom(format!("invalid HealScanMode value: {value}"))), } } @@ -134,7 +134,7 @@ impl<'de> Deserialize<'de> for HealScanMode { E: serde::de::Error, { if value > u8::MAX as u64 { - return Err(E::custom(format!("HealScanMode value too large: {}", value))); + return Err(E::custom(format!("HealScanMode value too large: {value}"))); } self.visit_u8(value as u8) } @@ -144,7 +144,7 @@ impl<'de> Deserialize<'de> for HealScanMode { E: serde::de::Error, { if value < 0 || value > u8::MAX as i64 { - return Err(E::custom(format!("invalid HealScanMode value: {}", value))); + return Err(E::custom(format!("invalid HealScanMode value: {value}"))); } self.visit_u8(value as u8) } @@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for HealScanMode { "Unknown" | "unknown" => Ok(HealScanMode::Unknown), "Normal" | "normal" => Ok(HealScanMode::Normal), "Deep" | "deep" => Ok(HealScanMode::Deep), - _ => Err(E::custom(format!("invalid HealScanMode string: {}", value))), + _ => Err(E::custom(format!("invalid HealScanMode string: {value}"))), } } } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index ea35e79c..5ed851e6 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -1985,24 +1985,20 @@ impl DiskAPI for LocalDisk { // TODO: Healing + let search_version_id = fi.version_id.or(Some(Uuid::nil())); + + // Check if there's an existing version with the same version_id that has a data_dir to clean up + // Note: For non-versioned buckets, fi.version_id is None, but in xl.meta it's stored as Some(Uuid::nil()) let has_old_data_dir = { - if let Ok((_, ver)) = xlmeta.find_version(fi.version_id) { - let has_data_dir = ver.get_data_dir(); - if let Some(data_dir) = has_data_dir { - if xlmeta.shard_data_dir_count(&fi.version_id, &Some(data_dir)) == 0 { - // TODO: Healing - // remove inlinedata\ - Some(data_dir) - } else { - None - } - } else { - None - } - } else { - None - } + xlmeta.find_version(search_version_id).ok().and_then(|(_, ver)| { + // shard_count == 0 means no other version shares this data_dir + ver.get_data_dir() + .filter(|&data_dir| xlmeta.shard_data_dir_count(&search_version_id, &Some(data_dir)) == 0) + }) }; + if let Some(old_data_dir) = has_old_data_dir.as_ref() { + let _ = xlmeta.data.remove(vec![search_version_id.unwrap_or_default(), *old_data_dir]); + } xlmeta.add_version(fi.clone())?; diff --git a/crates/ecstore/src/erasure_coding/encode.rs b/crates/ecstore/src/erasure_coding/encode.rs index 766c96ca..e19690c1 100644 --- a/crates/ecstore/src/erasure_coding/encode.rs +++ b/crates/ecstore/src/erasure_coding/encode.rs @@ -149,6 +149,12 @@ impl Erasure { break; } Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + // Check if the inner error is a checksum mismatch - if so, propagate it + if let Some(inner) = e.get_ref() { + if rustfs_rio::is_checksum_mismatch(inner) { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())); + } + } break; } Err(e) => { diff --git a/crates/ecstore/src/error.rs b/crates/ecstore/src/error.rs index 01d57fbd..410faa72 100644 --- a/crates/ecstore/src/error.rs +++ b/crates/ecstore/src/error.rs @@ -194,6 +194,12 @@ pub enum StorageError { #[error("Precondition failed")] PreconditionFailed, + #[error("Not modified")] + NotModified, + + #[error("Invalid part number: {0}")] + InvalidPartNumber(usize), + #[error("Invalid range specified: {0}")] InvalidRangeSpec(String), } @@ -427,6 +433,8 @@ impl Clone for StorageError { StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()), StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()), StorageError::PreconditionFailed => StorageError::PreconditionFailed, + StorageError::NotModified => StorageError::NotModified, + StorageError::InvalidPartNumber(a) => StorageError::InvalidPartNumber(*a), StorageError::InvalidRangeSpec(a) => StorageError::InvalidRangeSpec(a.clone()), } } @@ -496,6 +504,8 @@ impl StorageError { StorageError::PreconditionFailed => 0x3B, StorageError::EntityTooSmall(_, _, _) => 0x3C, StorageError::InvalidRangeSpec(_) => 0x3D, + StorageError::NotModified => 0x3E, + StorageError::InvalidPartNumber(_) => 0x3F, } } @@ -566,6 +576,8 @@ impl StorageError { 0x3B => Some(StorageError::PreconditionFailed), 0x3C => Some(StorageError::EntityTooSmall(Default::default(), Default::default(), Default::default())), 0x3D => Some(StorageError::InvalidRangeSpec(Default::default())), + 0x3E => Some(StorageError::NotModified), + 0x3F => Some(StorageError::InvalidPartNumber(Default::default())), _ => None, } } @@ -679,6 +691,10 @@ pub fn is_err_data_movement_overwrite(err: &Error) -> bool { matches!(err, &StorageError::DataMovementOverwriteErr(_, _, _)) } +pub fn is_err_io(err: &Error) -> bool { + matches!(err, &StorageError::Io(_)) +} + pub fn is_all_not_found(errs: &[Option]) -> bool { for err in errs.iter() { if let Some(err) = err { diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index fbaa19c0..3097a9e2 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -767,6 +767,12 @@ impl ECStore { def_pool = pinfo.clone(); has_def_pool = true; + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-deletes.html + if is_err_object_not_found(err) { + if let Err(err) = opts.precondition_check(&pinfo.object_info) { + return Err(err.clone()); + } + } if !is_err_object_not_found(err) && !is_err_version_not_found(err) { return Err(err.clone()); @@ -1392,6 +1398,7 @@ impl StorageAPI for ECStore { let (info, _) = self.get_latest_object_info_with_idx(bucket, object.as_str(), opts).await?; + opts.precondition_check(&info)?; Ok(info) } diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index dbad3911..90c8fd96 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -356,6 +356,8 @@ impl HTTPRangeSpec { pub struct HTTPPreconditions { pub if_match: Option, pub if_none_match: Option, + pub if_modified_since: Option, + pub if_unmodified_since: Option, } #[derive(Debug, Default, Clone)] @@ -456,6 +458,76 @@ impl ObjectOptions { ..Default::default() } } + + pub fn precondition_check(&self, obj_info: &ObjectInfo) -> Result<()> { + let has_valid_mod_time = obj_info.mod_time.is_some_and(|t| t != OffsetDateTime::UNIX_EPOCH); + + if let Some(part_number) = self.part_number { + if part_number > 1 && !obj_info.parts.is_empty() { + let part_found = obj_info.parts.iter().any(|pi| pi.number == part_number); + if !part_found { + return Err(Error::InvalidPartNumber(part_number)); + } + } + } + + if let Some(pre) = &self.http_preconditions { + if let Some(if_none_match) = &pre.if_none_match { + if let Some(etag) = &obj_info.etag { + if is_etag_equal(etag, if_none_match) { + return Err(Error::NotModified); + } + } + } + + if has_valid_mod_time { + if let Some(if_modified_since) = &pre.if_modified_since { + if let Some(mod_time) = &obj_info.mod_time { + if !is_modified_since(mod_time, if_modified_since) { + return Err(Error::NotModified); + } + } + } + } + + if let Some(if_match) = &pre.if_match { + if let Some(etag) = &obj_info.etag { + if !is_etag_equal(etag, if_match) { + return Err(Error::PreconditionFailed); + } + } else { + return Err(Error::PreconditionFailed); + } + } + if has_valid_mod_time && pre.if_match.is_none() { + if let Some(if_unmodified_since) = &pre.if_unmodified_since { + if let Some(mod_time) = &obj_info.mod_time { + if is_modified_since(mod_time, if_unmodified_since) { + return Err(Error::PreconditionFailed); + } + } + } + } + } + + Ok(()) + } +} + +fn is_etag_equal(etag1: &str, etag2: &str) -> bool { + let e1 = etag1.trim_matches('"'); + let e2 = etag2.trim_matches('"'); + // Handle wildcard "*" - matches any ETag (per HTTP/1.1 RFC 7232) + if e2 == "*" { + return true; + } + e1 == e2 +} + +fn is_modified_since(mod_time: &OffsetDateTime, given_time: &OffsetDateTime) -> bool { + let mod_secs = mod_time.unix_timestamp(); + let given_secs = given_time.unix_timestamp(); + mod_secs > given_secs } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index 55eb4e98..ad3b0f9e 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -34,7 +34,7 @@ use std::{collections::HashMap, io::Cursor}; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use tokio::io::AsyncRead; -use tracing::error; +use tracing::{error, warn}; use uuid::Uuid; use xxhash_rust::xxh64; @@ -444,8 +444,9 @@ impl FileMeta { // Find version pub fn find_version(&self, vid: Option) -> Result<(usize, FileMetaVersion)> { + let vid = vid.unwrap_or_default(); for (i, fver) in self.versions.iter().enumerate() { - if fver.header.version_id == vid { + if fver.header.version_id == Some(vid) { let version = self.get_idx(i)?; return Ok((i, version)); } @@ -456,9 +457,12 @@ impl FileMeta { // shard_data_dir_count queries the count of data_dir under vid pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { + let vid = vid.unwrap_or_default(); self.versions .iter() - .filter(|v| v.header.version_type == VersionType::Object && v.header.version_id != *vid && v.header.user_data_dir()) + .filter(|v| { + v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.user_data_dir() + }) .map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default()) .filter(|v| v == data_dir) .count() @@ -890,12 +894,11 @@ impl FileMeta { read_data: bool, all_parts: bool, ) -> Result { - let has_vid = { + let vid = { if !version_id.is_empty() { - let id = Uuid::parse_str(version_id)?; - if !id.is_nil() { Some(id) } else { None } + Uuid::parse_str(version_id)? } else { - None + Uuid::nil() } }; @@ -905,12 +908,12 @@ impl FileMeta { for ver in self.versions.iter() { let header = &ver.header; - if let Some(vid) = has_vid { - if header.version_id != Some(vid) { - is_latest = false; - succ_mod_time = header.mod_time; - continue; - } + // TODO: freeVersion + + if !version_id.is_empty() && header.version_id != Some(vid) { + is_latest = false; + succ_mod_time = header.mod_time; + continue; } let mut fi = ver.into_fileinfo(volume, path, all_parts)?; @@ -932,7 +935,7 @@ impl FileMeta { return Ok(fi); } - if has_vid.is_none() { + if version_id.is_empty() { Err(Error::FileNotFound) } else { Err(Error::FileVersionNotFound) @@ -1091,13 +1094,10 @@ impl FileMeta { /// Count shared data directories pub fn shared_data_dir_count(&self, version_id: Option, data_dir: Option) -> usize { + let version_id = version_id.unwrap_or_default(); + if self.data.entries().unwrap_or_default() > 0 - && version_id.is_some() - && self - .data - .find(version_id.unwrap().to_string().as_str()) - .unwrap_or_default() - .is_some() + && self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some() { return 0; } @@ -1105,7 +1105,9 @@ impl FileMeta { self.versions .iter() .filter(|v| { - v.header.version_type == VersionType::Object && v.header.version_id != version_id && v.header.user_data_dir() + v.header.version_type == VersionType::Object + && v.header.version_id != Some(version_id) + && v.header.user_data_dir() }) .filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok()) .filter(|&dir| dir == data_dir) diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index cea23390..e2c5baf7 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -416,76 +416,88 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio // Tracer(HTTP) let tracer_provider = { - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_http() - .with_endpoint(trace_ep.as_str()) - .with_protocol(Protocol::HttpBinary) - .with_compression(Compression::Gzip) - .build() - .map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?; + if trace_ep.is_empty() { + None + } else { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_endpoint(trace_ep.as_str()) + .with_protocol(Protocol::HttpBinary) + .with_compression(Compression::Gzip) + .build() + .map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?; - let mut builder = SdkTracerProvider::builder() - .with_sampler(sampler) - .with_id_generator(RandomIdGenerator::default()) - .with_resource(res.clone()) - .with_batch_exporter(exporter); + let mut builder = SdkTracerProvider::builder() + .with_sampler(sampler) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(res.clone()) + .with_batch_exporter(exporter); - if use_stdout { - builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default()); + if use_stdout { + builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default()); + } + + let provider = builder.build(); + global::set_tracer_provider(provider.clone()); + Some(provider) } - - let provider = builder.build(); - global::set_tracer_provider(provider.clone()); - provider }; // Meter(HTTP) let meter_provider = { - let exporter = opentelemetry_otlp::MetricExporter::builder() - .with_http() - .with_endpoint(metric_ep.as_str()) - .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) - .with_protocol(Protocol::HttpBinary) - .with_compression(Compression::Gzip) - .build() - .map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?; - let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); + if metric_ep.is_empty() { + None + } else { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_endpoint(metric_ep.as_str()) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .with_protocol(Protocol::HttpBinary) + .with_compression(Compression::Gzip) + .build() + .map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?; + let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); - let (provider, recorder) = Recorder::builder(service_name.clone()) - .with_meter_provider(|b| { - let b = b.with_resource(res.clone()).with_reader( - PeriodicReader::builder(exporter) - .with_interval(Duration::from_secs(meter_interval)) - .build(), - ); - if use_stdout { - b.with_reader(create_periodic_reader(meter_interval)) - } else { - b - } - }) - .build(); - global::set_meter_provider(provider.clone()); - metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?; - provider + let (provider, recorder) = Recorder::builder(service_name.clone()) + .with_meter_provider(|b| { + let b = b.with_resource(res.clone()).with_reader( + PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(meter_interval)) + .build(), + ); + if use_stdout { + b.with_reader(create_periodic_reader(meter_interval)) + } else { + b + } + }) + .build(); + global::set_meter_provider(provider.clone()); + metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?; + Some(provider) + } }; // Logger(HTTP) let logger_provider = { - let exporter = opentelemetry_otlp::LogExporter::builder() - .with_http() - .with_endpoint(log_ep.as_str()) - .with_protocol(Protocol::HttpBinary) - .with_compression(Compression::Gzip) - .build() - .map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?; + if log_ep.is_empty() { + None + } else { + let exporter = opentelemetry_otlp::LogExporter::builder() + .with_http() + .with_endpoint(log_ep.as_str()) + .with_protocol(Protocol::HttpBinary) + .with_compression(Compression::Gzip) + .build() + .map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?; - let mut builder = SdkLoggerProvider::builder().with_resource(res); - builder = builder.with_batch_exporter(exporter); - if use_stdout { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + let mut builder = SdkLoggerProvider::builder().with_resource(res); + builder = builder.with_batch_exporter(exporter); + if use_stdout { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + } + Some(builder.build()) } - builder.build() }; // Tracing layer @@ -512,16 +524,21 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio }; let filter = build_env_filter(logger_level, None); - let otel_bridge = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None)); - let tracer = tracer_provider.tracer(service_name.to_string()); + let otel_bridge = logger_provider + .as_ref() + .map(|p| OpenTelemetryTracingBridge::new(p).with_filter(build_env_filter(logger_level, None))); + let tracer_layer = tracer_provider + .as_ref() + .map(|p| OpenTelemetryLayer::new(p.tracer(service_name.to_string()))); + let metrics_layer = meter_provider.as_ref().map(|p| MetricsLayer::new(p.clone())); tracing_subscriber::registry() .with(filter) .with(ErrorLayer::default()) .with(fmt_layer_opt) - .with(OpenTelemetryLayer::new(tracer)) + .with(tracer_layer) .with(otel_bridge) - .with(MetricsLayer::new(meter_provider.clone())) + .with(metrics_layer) .init(); OBSERVABILITY_METRIC_ENABLED.set(true).ok(); @@ -532,9 +549,9 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio ); Ok(OtelGuard { - tracer_provider: Some(tracer_provider), - meter_provider: Some(meter_provider), - logger_provider: Some(logger_provider), + tracer_provider, + meter_provider, + logger_provider, flexi_logger_handles: None, tracing_guard: None, }) diff --git a/crates/protos/src/lib.rs b/crates/protos/src/lib.rs index 396976c5..73bebe71 100644 --- a/crates/protos/src/lib.rs +++ b/crates/protos/src/lib.rs @@ -45,7 +45,13 @@ pub async fn node_service_time_out_client( let channel = match channel { Some(channel) => channel, None => { - let connector = Endpoint::from_shared(addr.to_string())?.connect_timeout(Duration::from_secs(60)); + let connector = Endpoint::from_shared(addr.to_string())? + .connect_timeout(Duration::from_secs(5)) + .tcp_keepalive(Some(Duration::from_secs(10))) + .http2_keep_alive_interval(Duration::from_secs(5)) + .keep_alive_timeout(Duration::from_secs(3)) + .keep_alive_while_idle(true) + .timeout(Duration::from_secs(60)); let channel = connector.connect().await?; { @@ -55,7 +61,6 @@ pub async fn node_service_time_out_client( } }; - // let timeout_channel = Timeout::new(channel, Duration::from_secs(60)); Ok(NodeServiceClient::with_interceptor( channel, Box::new(move |mut req: Request<()>| { diff --git a/crates/rio/src/hash_reader.rs b/crates/rio/src/hash_reader.rs index 15b4a49d..4d672e3c 100644 --- a/crates/rio/src/hash_reader.rs +++ b/crates/rio/src/hash_reader.rs @@ -499,17 +499,18 @@ impl AsyncRead for HashReader { let content_hash = hasher.finalize(); if content_hash != expected_content_hash.raw { + let expected_hex = hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower); + let actual_hex = hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower); error!( "Content hash mismatch, type={:?}, encoded={:?}, expected={:?}, actual={:?}", - expected_content_hash.checksum_type, - expected_content_hash.encoded, - hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower), - hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower) + expected_content_hash.checksum_type, expected_content_hash.encoded, expected_hex, actual_hex ); - return Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Content hash mismatch", - ))); + // Use ChecksumMismatch error so that API layer can return BadDigest + let checksum_err = crate::errors::ChecksumMismatch { + want: expected_hex, + got: actual_hex, + }; + return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, checksum_err))); } } diff --git a/crates/rio/src/http_reader.rs b/crates/rio/src/http_reader.rs index af6a01a5..a2b8d33a 100644 --- a/crates/rio/src/http_reader.rs +++ b/crates/rio/src/http_reader.rs @@ -32,7 +32,16 @@ use crate::{EtagResolvable, HashReaderDetector, HashReaderMut}; fn get_http_client() -> Client { // Reuse the HTTP connection pool in the global `reqwest::Client` instance // TODO: interact with load balancing? - static CLIENT: LazyLock = LazyLock::new(Client::new); + static CLIENT: LazyLock = LazyLock::new(|| { + Client::builder() + .connect_timeout(std::time::Duration::from_secs(5)) + .tcp_keepalive(std::time::Duration::from_secs(10)) + .http2_keep_alive_interval(std::time::Duration::from_secs(5)) + .http2_keep_alive_timeout(std::time::Duration::from_secs(3)) + .http2_keep_alive_while_idle(true) + .build() + .expect("Failed to create global HTTP client") + }); CLIENT.clone() } diff --git a/crates/utils/src/io.rs b/crates/utils/src/io.rs index 42dc5ac2..94777b8d 100644 --- a/crates/utils/src/io.rs +++ b/crates/utils/src/io.rs @@ -41,6 +41,11 @@ pub async fn read_full(mut reader: R, mut bu if total == 0 { return Err(e); } + // If the error is InvalidData (e.g., checksum mismatch), preserve it + // instead of wrapping it as UnexpectedEof, so proper error handling can occur + if e.kind() == std::io::ErrorKind::InvalidData { + return Err(e); + } return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, format!("read {total} bytes, error: {e}"), diff --git a/docs/cluster_recovery.md b/docs/cluster_recovery.md new file mode 100644 index 00000000..21c6b500 --- /dev/null +++ b/docs/cluster_recovery.md @@ -0,0 +1,65 @@ +# Resolution Report: Issue #1001 - Cluster Recovery from Abrupt Power-Off + +## 1. Issue Description +**Problem**: The cluster failed to recover gracefully when a node experienced an abrupt power-off (hard failure). +**Symptoms**: +- The application became unable to upload files. +- The Console Web UI became unresponsive across the cluster. +- The system "hung" indefinitely, unlike the immediate recovery observed during a graceful process termination (`kill`). + +**Root Cause**: +The standard TCP protocol does not immediately detect a silent peer disappearance (power loss) because no `FIN` or `RST` packets are sent. Without active application-layer heartbeats, the surviving nodes kept connections implementation in an `ESTABLISHED` state, waiting indefinitely for responses that would never arrive. + +--- + +## 2. Technical Approach +To resolve this, we needed to transform the passive failure detection (waiting for TCP timeout) into an active detection mechanism. + +### Key Objectives: +1. **Fail Fast**: Detect dead peers in seconds, not minutes. +2. **Accuracy**: Distinguish between network congestion and actual node failure. +3. **Safety**: Ensure no thread or task blocks forever on a remote procedure call (RPC). + +--- + +## 3. Implemented Solution +We modified the internal gRPC client configuration in `crates/protos/src/lib.rs` to implement a multi-layered health check strategy. + +### Solution Overview +The fix implements a multi-layered detection strategy covering both Control Plane (RPC) and Data Plane (Streaming): + +1. **Control Plane (gRPC)**: + * Enabled `http2_keep_alive_interval` (5s) and `keep_alive_timeout` (3s) in `tonic` clients. + * Enforced `tcp_keepalive` (10s) on underlying transport. + * Context: Ensures cluster metadata operations (raft, status checks) fail fast if a node dies. + +2. **Data Plane (File Uploads/Downloads)**: + * **Client (Rio)**: Updated `reqwest` client builder in `crates/rio` to enable TCP Keepalive (10s) and HTTP/2 Keepalive (5s). This prevents hangs during large file streaming (e.g., 1GB uploads). + * **Server**: Enabled `SO_KEEPALIVE` on all incoming TCP connections in `rustfs/src/server/http.rs` to forcefully close sockets from dead clients. + +3. **Cross-Platform Build Stability**: + * Guarded Linux-specific profiling code (`jemalloc_pprof`) with `#[cfg(target_os = "linux")]` to fix build failures on macOS/AArch64. + +### Configuration Changes + +```rust +let connector = Endpoint::from_shared(addr.to_string())? + .connect_timeout(Duration::from_secs(5)) + // 1. App-Layer Heartbeats (Primary Detection) + // Sends a hidden HTTP/2 PING frame every 5 seconds. + .http2_keep_alive_interval(Duration::from_secs(5)) + // If PING is not acknowledged within 3 seconds, closes connection. + .keep_alive_timeout(Duration::from_secs(3)) + // Ensures PINGs are sent even when no active requests are in flight. + .keep_alive_while_idle(true) + // 2. Transport-Layer Keepalive (OS Backup) + .tcp_keepalive(Some(Duration::from_secs(10))) + // 3. Global Safety Net + // Hard deadline for any RPC operation. + .timeout(Duration::from_secs(60)); +``` + +### Outcome +- **Detection Time**: Reduced from ~15+ minutes (OS default) to **~8 seconds** (5s interval + 3s timeout). +- **Behavior**: When a node loses power, surviving peers now detect the lost connection almost immediately, throwing a protocol error that triggers standard cluster recovery/failover logic. +- **Result**: The cluster now handles power-offs with the same resilience as graceful shutdowns. diff --git a/helm/README.md b/helm/README.md index 39aba9f8..924da3ab 100644 --- a/helm/README.md +++ b/helm/README.md @@ -21,6 +21,8 @@ RustFS helm chart supports **standalone and distributed mode**. For standalone m | secret.rustfs.access_key | RustFS Access Key ID | `rustfsadmin` | | secret.rustfs.secret_key | RustFS Secret Key ID | `rustfsadmin` | | storageclass.name | The name for StorageClass. | `local-path` | +| storageclass.dataStorageSize | The storage size for data PVC. | `256Mi` | +| storageclass.logStorageSize | The storage size for log PVC. | `256Mi` | | ingress.className | Specify the ingress class, traefik or nginx. | `nginx` | diff --git a/helm/rustfs/Chart.yaml b/helm/rustfs/Chart.yaml index 725c6c46..2cc92efa 100644 --- a/helm/rustfs/Chart.yaml +++ b/helm/rustfs/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.0-alpha.69 +version: 1.0.3 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "1.0.0-alpha.69" +appVersion: "1.0.0-alpha.73" diff --git a/helm/rustfs/templates/_helpers.tpl b/helm/rustfs/templates/_helpers.tpl index 6560f132..667b9ece 100644 --- a/helm/rustfs/templates/_helpers.tpl +++ b/helm/rustfs/templates/_helpers.tpl @@ -60,3 +60,14 @@ Create the name of the service account to use {{- default "default" .Values.serviceAccount.name }} {{- end }} {{- end }} + +{{/* +Return the secret name +*/}} +{{- define "rustfs.secretName" -}} +{{- if .Values.secret.existingSecret }} +{{- .Values.secret.existingSecret }} +{{- else }} +{{- printf "%s-secret" (include "rustfs.fullname" .) }} +{{- end }} +{{- end }} diff --git a/helm/rustfs/templates/deployment.yaml b/helm/rustfs/templates/deployment.yaml index a8f5ce7b..2edc4736 100644 --- a/helm/rustfs/templates/deployment.yaml +++ b/helm/rustfs/templates/deployment.yaml @@ -55,7 +55,7 @@ spec: - configMapRef: name: {{ include "rustfs.fullname" . }}-config - secretRef: - name: {{ include "rustfs.fullname" . }}-secret + name: {{ include "rustfs.secretName" . }} resources: requests: memory: {{ .Values.resources.requests.memory }} diff --git a/helm/rustfs/templates/extra-manifests.yaml b/helm/rustfs/templates/extra-manifests.yaml new file mode 100644 index 00000000..0e5456bf --- /dev/null +++ b/helm/rustfs/templates/extra-manifests.yaml @@ -0,0 +1,4 @@ +{{- range .Values.extraManifests }} +--- +{{ tpl (toYaml .) $ }} +{{- end }} diff --git a/helm/rustfs/templates/ingress.yaml b/helm/rustfs/templates/ingress.yaml index 792b4782..94eedfc7 100644 --- a/helm/rustfs/templates/ingress.yaml +++ b/helm/rustfs/templates/ingress.yaml @@ -15,7 +15,7 @@ spec: {{- with .Values.ingress.className }} ingressClassName: {{ . }} {{- end }} - {{- if .Values.ingress.tls }} + {{- if .Values.tls.enabled }} tls: {{- range .Values.ingress.tls }} - hosts: diff --git a/helm/rustfs/templates/pvc.yaml b/helm/rustfs/templates/pvc.yaml index 735d3302..1cab744d 100644 --- a/helm/rustfs/templates/pvc.yaml +++ b/helm/rustfs/templates/pvc.yaml @@ -8,7 +8,7 @@ spec: storageClassName: {{ .Values.storageclass.name }} resources: requests: - storage: {{ .Values.storageclass.size }} + storage: {{ .Values.storageclass.dataStorageSize }} --- apiVersion: v1 @@ -20,5 +20,5 @@ spec: storageClassName: {{ .Values.storageclass.name }} resources: requests: - storage: {{ .Values.storageclass.size }} + storage: {{ .Values.storageclass.logStorageSize }} {{- end }} \ No newline at end of file diff --git a/helm/rustfs/templates/secret.yaml b/helm/rustfs/templates/secret.yaml index 5de45709..7d061828 100644 --- a/helm/rustfs/templates/secret.yaml +++ b/helm/rustfs/templates/secret.yaml @@ -1,9 +1,10 @@ +{{- if not .Values.secret.existingSecret }} apiVersion: v1 kind: Secret metadata: - name: {{ include "rustfs.fullname" . }}-secret + name: {{ include "rustfs.secretName" . }} type: Opaque data: RUSTFS_ACCESS_KEY: {{ .Values.secret.rustfs.access_key | b64enc | quote }} RUSTFS_SECRET_KEY: {{ .Values.secret.rustfs.secret_key | b64enc | quote }} - +{{- end }} diff --git a/helm/rustfs/templates/statefulset.yaml b/helm/rustfs/templates/statefulset.yaml index 2045d089..931cfff4 100644 --- a/helm/rustfs/templates/statefulset.yaml +++ b/helm/rustfs/templates/statefulset.yaml @@ -76,7 +76,7 @@ spec: - configMapRef: name: {{ include "rustfs.fullname" . }}-config - secretRef: - name: {{ include "rustfs.fullname" . }}-secret + name: {{ include "rustfs.secretName" . }} resources: requests: memory: {{ .Values.resources.requests.memory }} @@ -122,7 +122,7 @@ spec: storageClassName: {{ $.Values.storageclass.name }} resources: requests: - storage: {{ $.Values.storageclass.size}} + storage: {{ $.Values.storageclass.logStorageSize}} {{- if eq (int .Values.replicaCount) 4 }} {{- range $i := until (int .Values.replicaCount) }} - metadata: @@ -132,7 +132,7 @@ spec: storageClassName: {{ $.Values.storageclass.name }} resources: requests: - storage: {{ $.Values.storageclass.size}} + storage: {{ $.Values.storageclass.dataStorageSize}} {{- end }} {{- else if eq (int .Values.replicaCount) 16 }} - metadata: @@ -142,6 +142,6 @@ spec: storageClassName: {{ $.Values.storageclass.name }} resources: requests: - storage: {{ $.Values.storageclass.size}} + storage: {{ $.Values.storageclass.dataStorageSize}} {{- end }} {{- end }} diff --git a/helm/rustfs/values.yaml b/helm/rustfs/values.yaml index 14714c7f..17d23c43 100644 --- a/helm/rustfs/values.yaml +++ b/helm/rustfs/values.yaml @@ -27,6 +27,7 @@ mode: enabled: true secret: + existingSecret: "" rustfs: access_key: rustfsadmin secret_key: rustfsadmin @@ -146,4 +147,7 @@ affinity: {} storageclass: name: local-path - size: 256Mi + dataStorageSize: 256Mi + logStorageSize: 256Mi + +extraManifests: [] diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 4552bf79..2563d4c7 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -92,6 +92,7 @@ serde_urlencoded = { workspace = true } # Cryptography and Security rustls = { workspace = true } +subtle = { workspace = true } # Time and Date chrono = { workspace = true } diff --git a/rustfs/src/admin/handlers/group.rs b/rustfs/src/admin/handlers/group.rs index 40b1149a..953f3105 100644 --- a/rustfs/src/admin/handlers/group.rs +++ b/rustfs/src/admin/handlers/group.rs @@ -29,7 +29,7 @@ use tracing::warn; use crate::{ admin::{auth::validate_admin_request, router::Operation, utils::has_space_be}, - auth::{check_key_valid, get_session_token}, + auth::{check_key_valid, constant_time_eq, get_session_token}, }; #[derive(Debug, Deserialize, Default)] @@ -240,7 +240,7 @@ impl Operation for UpdateGroupMembers { get_global_action_cred() .map(|cred| { - if cred.access_key == *member { + if constant_time_eq(&cred.access_key, member) { return Err(S3Error::with_message( S3ErrorCode::MethodNotAllowed, format!("can't add root {member}"), diff --git a/rustfs/src/admin/handlers/service_account.rs b/rustfs/src/admin/handlers/service_account.rs index 4295de90..935abcc0 100644 --- a/rustfs/src/admin/handlers/service_account.rs +++ b/rustfs/src/admin/handlers/service_account.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::admin::utils::has_space_be; -use crate::auth::{get_condition_values, get_session_token}; +use crate::auth::{constant_time_eq, get_condition_values, get_session_token}; use crate::{admin::router::Operation, auth::check_key_valid}; use http::HeaderMap; use hyper::StatusCode; @@ -83,7 +83,7 @@ impl Operation for AddServiceAccount { return Err(s3_error!(InvalidRequest, "get sys cred failed")); }; - if sys_cred.access_key == create_req.access_key { + if constant_time_eq(&sys_cred.access_key, &create_req.access_key) { return Err(s3_error!(InvalidArgument, "can't create user with system access key")); } @@ -107,7 +107,7 @@ impl Operation for AddServiceAccount { return Err(s3_error!(InvalidRequest, "iam not init")); }; - let deny_only = cred.access_key == target_user || cred.parent_user == target_user; + let deny_only = constant_time_eq(&cred.access_key, &target_user) || constant_time_eq(&cred.parent_user, &target_user); if !iam_store .is_allowed(&Args { diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index 986c01cf..be20eda0 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -14,7 +14,7 @@ use crate::{ admin::{auth::validate_admin_request, router::Operation, utils::has_space_be}, - auth::{check_key_valid, get_session_token}, + auth::{check_key_valid, constant_time_eq, get_session_token}, }; use http::{HeaderMap, StatusCode}; use matchit::Params; @@ -95,7 +95,7 @@ impl Operation for AddUser { } if let Some(sys_cred) = get_global_action_cred() { - if sys_cred.access_key == ak { + if constant_time_eq(&sys_cred.access_key, ak) { return Err(s3_error!(InvalidArgument, "can't create user with system access key")); } } @@ -162,7 +162,7 @@ impl Operation for SetUserStatus { return Err(s3_error!(InvalidRequest, "get cred failed")); }; - if input_cred.access_key == ak { + if constant_time_eq(&input_cred.access_key, ak) { return Err(s3_error!(InvalidArgument, "can't change status of self")); } diff --git a/rustfs/src/auth.rs b/rustfs/src/auth.rs index e53bf525..cc2d24c2 100644 --- a/rustfs/src/auth.rs +++ b/rustfs/src/auth.rs @@ -29,9 +29,37 @@ use s3s::auth::SimpleAuth; use s3s::s3_error; use serde_json::Value; use std::collections::HashMap; +use subtle::ConstantTimeEq; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; +/// Performs constant-time string comparison to prevent timing attacks. +/// +/// This function should be used when comparing sensitive values like passwords, +/// API keys, or authentication tokens. It ensures the comparison time is +/// independent of the position where strings differ and handles length differences +/// securely. +/// +/// # Security Note +/// This implementation uses the `subtle` crate to provide cryptographically +/// sound constant-time guarantees. The function is resistant to timing side-channel +/// attacks and suitable for security-critical comparisons. +/// +/// # Example +/// ``` +/// use rustfs::auth::constant_time_eq; +/// +/// let secret1 = "my-secret-key"; +/// let secret2 = "my-secret-key"; +/// let secret3 = "wrong-secret"; +/// +/// assert!(constant_time_eq(secret1, secret2)); +/// assert!(!constant_time_eq(secret1, secret3)); +/// ``` +pub fn constant_time_eq(a: &str, b: &str) -> bool { + a.as_bytes().ct_eq(b.as_bytes()).into() +} + // Authentication type constants const JWT_ALGORITHM: &str = "Bearer "; const SIGN_V2_ALGORITHM: &str = "AWS "; @@ -111,7 +139,7 @@ pub async fn check_key_valid(session_token: &str, access_key: &str) -> S3Result< let sys_cred = cred.clone(); - if cred.access_key != access_key { + if !constant_time_eq(&cred.access_key, access_key) { let Ok(iam_store) = rustfs_iam::get() else { return Err(S3Error::with_message( S3ErrorCode::InternalError, @@ -146,7 +174,8 @@ pub async fn check_key_valid(session_token: &str, access_key: &str) -> S3Result< cred.claims = if !claims.is_empty() { Some(claims) } else { None }; - let mut owner = sys_cred.access_key == cred.access_key || cred.parent_user == sys_cred.access_key; + let mut owner = + constant_time_eq(&sys_cred.access_key, &cred.access_key) || constant_time_eq(&cred.parent_user, &sys_cred.access_key); // permitRootAccess if let Some(claims) = &cred.claims { @@ -225,7 +254,7 @@ pub fn get_condition_values( let principal_type = if !username.is_empty() { if claims.is_some() { "AssumedRole" - } else if sys_cred.access_key == username { + } else if constant_time_eq(&sys_cred.access_key, &username) { "Account" } else { "User" @@ -1102,4 +1131,21 @@ mod tests { assert_eq!(auth_type, AuthType::Unknown); } + + #[test] + fn test_constant_time_eq() { + assert!(constant_time_eq("test", "test")); + assert!(!constant_time_eq("test", "Test")); + assert!(!constant_time_eq("test", "test1")); + assert!(!constant_time_eq("test1", "test")); + assert!(!constant_time_eq("", "test")); + assert!(constant_time_eq("", "")); + + // Test with credentials-like strings + let key1 = "AKIAIOSFODNN7EXAMPLE"; + let key2 = "AKIAIOSFODNN7EXAMPLE"; + let key3 = "AKIAIOSFODNN7EXAMPLF"; + assert!(constant_time_eq(key1, key2)); + assert!(!constant_time_eq(key1, key3)); + } } diff --git a/rustfs/src/error.rs b/rustfs/src/error.rs index daeeacb3..5b4fe128 100644 --- a/rustfs/src/error.rs +++ b/rustfs/src/error.rs @@ -192,6 +192,21 @@ impl From for S3Error { impl From for ApiError { fn from(err: StorageError) -> Self { + // Special handling for Io errors that may contain ChecksumMismatch + if let StorageError::Io(ref io_err) = err { + if let Some(inner) = io_err.get_ref() { + if inner.downcast_ref::().is_some() + || inner.downcast_ref::().is_some() + { + return ApiError { + code: S3ErrorCode::BadDigest, + message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest), + source: Some(Box::new(err)), + }; + } + } + } + let code = match &err { StorageError::NotImplemented => S3ErrorCode::NotImplemented, StorageError::InvalidArgument(_, _, _) => S3ErrorCode::InvalidArgument, @@ -239,6 +254,23 @@ impl From for ApiError { impl From for ApiError { fn from(err: std::io::Error) -> Self { + // Check if the error is a ChecksumMismatch (BadDigest) + if let Some(inner) = err.get_ref() { + if inner.downcast_ref::().is_some() { + return ApiError { + code: S3ErrorCode::BadDigest, + message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest), + source: Some(Box::new(err)), + }; + } + if inner.downcast_ref::().is_some() { + return ApiError { + code: S3ErrorCode::BadDigest, + message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest), + source: Some(Box::new(err)), + }; + } + } ApiError { code: S3ErrorCode::InternalError, message: err.to_string(), diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 559aead5..efa10584 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -60,12 +60,11 @@ use rustfs_ecstore::{ use rustfs_iam::init_iam_sys; use rustfs_notify::notifier_global; use rustfs_obs::{init_obs, set_global_guard}; -use rustfs_targets::arn::TargetID; +use rustfs_targets::arn::{ARN, TargetIDError}; use rustfs_utils::net::parse_and_resolve_address; use s3s::s3_error; use std::env; use std::io::{Error, Result}; -use std::str::FromStr; use std::sync::Arc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; @@ -516,9 +515,21 @@ async fn add_bucket_notification_configuration(buckets: Vec) { "Bucket '{}' has existing notification configuration: {:?}", bucket, cfg); let mut event_rules = Vec::new(); - process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), TargetID::from_str); - process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str); - process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str); + process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }); + process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }); + process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }); if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules) .await diff --git a/rustfs/src/profiling.rs b/rustfs/src/profiling.rs index 1b219aab..11f69c10 100644 --- a/rustfs/src/profiling.rs +++ b/rustfs/src/profiling.rs @@ -12,272 +12,291 @@ // See the License for the specific language governing permissions and // limitations under the License. -use chrono::Utc; -use jemalloc_pprof::PROF_CTL; -use pprof::protos::Message; -use rustfs_config::{ - DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING, - DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ, - ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR, -}; -use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize}; -use std::fs::{File, create_dir_all}; -use std::io::Write; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, OnceLock}; -use std::time::Duration; -use tokio::sync::Mutex; -use tokio::time::sleep; -use tracing::{debug, error, info, warn}; +#[cfg(not(target_os = "linux"))] +pub async fn init_from_env() {} -static CPU_CONT_GUARD: OnceLock>>>> = OnceLock::new(); - -/// CPU profiling mode -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum CpuMode { - Off, - Continuous, - Periodic, +#[cfg(not(target_os = "linux"))] +pub async fn dump_cpu_pprof_for(_duration: std::time::Duration) -> Result { + Err("CPU profiling is only supported on Linux".to_string()) } -/// Get or create output directory -fn output_dir() -> PathBuf { - let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR); - let p = PathBuf::from(dir); - if let Err(e) = create_dir_all(&p) { - warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e); - return PathBuf::from("."); +#[cfg(not(target_os = "linux"))] +pub async fn dump_memory_pprof_now() -> Result { + Err("Memory profiling is only supported on Linux".to_string()) +} + +#[cfg(target_os = "linux")] +mod linux_impl { + use chrono::Utc; + use jemalloc_pprof::PROF_CTL; + use pprof::protos::Message; + use rustfs_config::{ + DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING, + DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ, + ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR, + }; + use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize}; + use std::fs::{File, create_dir_all}; + use std::io::Write; + use std::path::{Path, PathBuf}; + use std::sync::{Arc, OnceLock}; + use std::time::Duration; + use tokio::sync::Mutex; + use tokio::time::sleep; + use tracing::{debug, error, info, warn}; + + static CPU_CONT_GUARD: OnceLock>>>> = OnceLock::new(); + + /// CPU profiling mode + #[derive(Clone, Copy, Debug, Eq, PartialEq)] + enum CpuMode { + Off, + Continuous, + Periodic, } - p -} -/// Read CPU profiling mode from env -fn read_cpu_mode() -> CpuMode { - match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() { - "continuous" => CpuMode::Continuous, - "periodic" => CpuMode::Periodic, - _ => CpuMode::Off, + /// Get or create output directory + fn output_dir() -> PathBuf { + let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR); + let p = PathBuf::from(dir); + if let Err(e) = create_dir_all(&p) { + warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e); + return PathBuf::from("."); + } + p } -} -/// Generate timestamp string for filenames -fn ts() -> String { - Utc::now().format("%Y%m%dT%H%M%S").to_string() -} - -/// Write pprof report to file in protobuf format -fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> { - let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?; - let mut buf = Vec::with_capacity(512 * 1024); - profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?; - let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?; - f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?; - Ok(()) -} - -/// Internal: dump CPU pprof from existing guard -async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result { - let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?; - let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); - write_pprof_report_pb(&report, &out)?; - info!("CPU profile exported: {}", out.display()); - Ok(out) -} - -// Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately. -pub async fn dump_cpu_pprof_for(duration: Duration) -> Result { - if let Some(cell) = CPU_CONT_GUARD.get() { - let guard_slot = cell.lock().await; - if let Some(ref guard) = *guard_slot { - debug!("profiling: using continuous profiler guard for CPU dump"); - return dump_cpu_with_guard(guard).await; + /// Read CPU profiling mode from env + fn read_cpu_mode() -> CpuMode { + match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() { + "continuous" => CpuMode::Continuous, + "periodic" => CpuMode::Periodic, + _ => CpuMode::Off, } } - let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; - let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?; - sleep(duration).await; - - dump_cpu_with_guard(&guard).await -} - -// Public API: dump memory pprof now (jemalloc) -pub async fn dump_memory_pprof_now() -> Result { - let out = output_dir().join(format!("mem_profile_{}.pb", ts())); - let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?; - - let prof_ctl_cell = PROF_CTL - .as_ref() - .ok_or_else(|| "jemalloc profiling control not available".to_string())?; - let mut prof_ctl = prof_ctl_cell.lock().await; - - if !prof_ctl.activated() { - return Err("jemalloc profiling is not active".to_string()); + /// Generate timestamp string for filenames + fn ts() -> String { + Utc::now().format("%Y%m%dT%H%M%S").to_string() } - let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?; - f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?; - info!("Memory profile exported: {}", out.display()); - Ok(out) -} - -// Jemalloc status check (No forced placement, only status observation) -pub async fn check_jemalloc_profiling() { - use tikv_jemalloc_ctl::{config, epoch, stats}; - - if let Err(e) = epoch::advance() { - warn!("jemalloc epoch advance failed: {e}"); + /// Write pprof report to file in protobuf format + fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> { + let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?; + let mut buf = Vec::with_capacity(512 * 1024); + profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?; + let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?; + f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?; + Ok(()) } - match config::malloc_conf::read() { - Ok(conf) => debug!("jemalloc malloc_conf: {}", conf), - Err(e) => debug!("jemalloc read malloc_conf failed: {e}"), + /// Internal: dump CPU pprof from existing guard + async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result { + let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?; + let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); + write_pprof_report_pb(&report, &out)?; + info!("CPU profile exported: {}", out.display()); + Ok(out) } - match std::env::var("MALLOC_CONF") { - Ok(v) => debug!("MALLOC_CONF={}", v), - Err(_) => debug!("MALLOC_CONF is not set"), - } - - if let Some(lock) = PROF_CTL.as_ref() { - let ctl = lock.lock().await; - info!(activated = ctl.activated(), "jemalloc profiling status"); - } else { - info!("jemalloc profiling controller is NOT available"); - } - - let _ = epoch::advance(); - macro_rules! show { - ($name:literal, $reader:expr) => { - match $reader { - Ok(v) => debug!(concat!($name, "={}"), v), - Err(e) => debug!(concat!($name, " read failed: {}"), e), + // Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately. + pub async fn dump_cpu_pprof_for(duration: Duration) -> Result { + if let Some(cell) = CPU_CONT_GUARD.get() { + let guard_slot = cell.lock().await; + if let Some(ref guard) = *guard_slot { + debug!("profiling: using continuous profiler guard for CPU dump"); + return dump_cpu_with_guard(guard).await; } - }; - } - show!("allocated", stats::allocated::read()); - show!("resident", stats::resident::read()); - show!("mapped", stats::mapped::read()); - show!("metadata", stats::metadata::read()); - show!("active", stats::active::read()); -} - -// Internal: start continuous CPU profiling -async fn start_cpu_continuous(freq_hz: i32) { - let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone(); - let mut slot = cell.lock().await; - if slot.is_some() { - warn!("profiling: continuous CPU guard already running"); - return; - } - match pprof::ProfilerGuardBuilder::default() - .frequency(freq_hz) - .blocklist(&["libc", "libgcc", "pthread", "vdso"]) - .build() - { - Ok(guard) => { - *slot = Some(guard); - info!(freq = freq_hz, "start continuous CPU profiling"); } - Err(e) => warn!("start continuous CPU profiling failed: {e}"), - } -} -// Internal: start periodic CPU sampling loop -async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) { - info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling"); - tokio::spawn(async move { - loop { - sleep(interval).await; - let guard = match pprof::ProfilerGuard::new(freq_hz) { - Ok(g) => g, - Err(e) => { - warn!("periodic CPU profiler create failed: {e}"); - continue; + let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; + let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?; + sleep(duration).await; + + dump_cpu_with_guard(&guard).await + } + + // Public API: dump memory pprof now (jemalloc) + pub async fn dump_memory_pprof_now() -> Result { + let out = output_dir().join(format!("mem_profile_{}.pb", ts())); + let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?; + + let prof_ctl_cell = PROF_CTL + .as_ref() + .ok_or_else(|| "jemalloc profiling control not available".to_string())?; + let mut prof_ctl = prof_ctl_cell.lock().await; + + if !prof_ctl.activated() { + return Err("jemalloc profiling is not active".to_string()); + } + + let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?; + f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?; + info!("Memory profile exported: {}", out.display()); + Ok(out) + } + + // Jemalloc status check (No forced placement, only status observation) + pub async fn check_jemalloc_profiling() { + use tikv_jemalloc_ctl::{config, epoch, stats}; + + if let Err(e) = epoch::advance() { + warn!("jemalloc epoch advance failed: {e}"); + } + + match config::malloc_conf::read() { + Ok(conf) => debug!("jemalloc malloc_conf: {}", conf), + Err(e) => debug!("jemalloc read malloc_conf failed: {e}"), + } + + match std::env::var("MALLOC_CONF") { + Ok(v) => debug!("MALLOC_CONF={}", v), + Err(_) => debug!("MALLOC_CONF is not set"), + } + + if let Some(lock) = PROF_CTL.as_ref() { + let ctl = lock.lock().await; + info!(activated = ctl.activated(), "jemalloc profiling status"); + } else { + info!("jemalloc profiling controller is NOT available"); + } + + let _ = epoch::advance(); + macro_rules! show { + ($name:literal, $reader:expr) => { + match $reader { + Ok(v) => debug!(concat!($name, "={}"), v), + Err(e) => debug!(concat!($name, " read failed: {}"), e), } }; - sleep(duration).await; - match guard.report().build() { - Ok(report) => { - let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); - if let Err(e) = write_pprof_report_pb(&report, &out) { - warn!("write periodic CPU pprof failed: {e}"); - } else { - info!("periodic CPU profile exported: {}", out.display()); + } + show!("allocated", stats::allocated::read()); + show!("resident", stats::resident::read()); + show!("mapped", stats::mapped::read()); + show!("metadata", stats::metadata::read()); + show!("active", stats::active::read()); + } + + // Internal: start continuous CPU profiling + async fn start_cpu_continuous(freq_hz: i32) { + let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone(); + let mut slot = cell.lock().await; + if slot.is_some() { + warn!("profiling: continuous CPU guard already running"); + return; + } + match pprof::ProfilerGuardBuilder::default() + .frequency(freq_hz) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + { + Ok(guard) => { + *slot = Some(guard); + info!(freq = freq_hz, "start continuous CPU profiling"); + } + Err(e) => warn!("start continuous CPU profiling failed: {e}"), + } + } + + // Internal: start periodic CPU sampling loop + async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) { + info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling"); + tokio::spawn(async move { + loop { + sleep(interval).await; + let guard = match pprof::ProfilerGuard::new(freq_hz) { + Ok(g) => g, + Err(e) => { + warn!("periodic CPU profiler create failed: {e}"); + continue; } - } - Err(e) => warn!("periodic CPU report build failed: {e}"), - } - } - }); -} - -// Internal: start periodic memory dump when jemalloc profiling is active -async fn start_memory_periodic(interval: Duration) { - info!(?interval, "start periodic memory pprof dump"); - tokio::spawn(async move { - loop { - sleep(interval).await; - - let Some(lock) = PROF_CTL.as_ref() else { - debug!("skip memory dump: PROF_CTL not available"); - continue; - }; - - let mut ctl = lock.lock().await; - if !ctl.activated() { - debug!("skip memory dump: jemalloc profiling not active"); - continue; - } - - let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts())); - match File::create(&out) { - Err(e) => { - error!("periodic mem dump create file failed: {}", e); - continue; - } - Ok(mut f) => match ctl.dump_pprof() { - Ok(bytes) => { - if let Err(e) = f.write_all(&bytes) { - error!("periodic mem dump write failed: {}", e); + }; + sleep(duration).await; + match guard.report().build() { + Ok(report) => { + let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); + if let Err(e) = write_pprof_report_pb(&report, &out) { + warn!("write periodic CPU pprof failed: {e}"); } else { - info!("periodic memory profile dumped to {}", out.display()); + info!("periodic CPU profile exported: {}", out.display()); } } - Err(e) => error!("periodic mem dump failed: {}", e), - }, + Err(e) => warn!("periodic CPU report build failed: {e}"), + } } + }); + } + + // Internal: start periodic memory dump when jemalloc profiling is active + async fn start_memory_periodic(interval: Duration) { + info!(?interval, "start periodic memory pprof dump"); + tokio::spawn(async move { + loop { + sleep(interval).await; + + let Some(lock) = PROF_CTL.as_ref() else { + debug!("skip memory dump: PROF_CTL not available"); + continue; + }; + + let mut ctl = lock.lock().await; + if !ctl.activated() { + debug!("skip memory dump: jemalloc profiling not active"); + continue; + } + + let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts())); + match File::create(&out) { + Err(e) => { + error!("periodic mem dump create file failed: {}", e); + continue; + } + Ok(mut f) => match ctl.dump_pprof() { + Ok(bytes) => { + if let Err(e) = f.write_all(&bytes) { + error!("periodic mem dump write failed: {}", e); + } else { + info!("periodic memory profile dumped to {}", out.display()); + } + } + Err(e) => error!("periodic mem dump failed: {}", e), + }, + } + } + }); + } + + // Public: unified init entry, avoid duplication/conflict + pub async fn init_from_env() { + let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING); + if !enabled { + debug!("profiling: disabled by env"); + return; } - }); -} -// Public: unified init entry, avoid duplication/conflict -pub async fn init_from_env() { - let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING); - if !enabled { - debug!("profiling: disabled by env"); - return; - } + // Jemalloc state check once (no dump) + check_jemalloc_profiling().await; - // Jemalloc state check once (no dump) - check_jemalloc_profiling().await; + // CPU + let cpu_mode = read_cpu_mode(); + let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; + let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS)); + let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS)); - // CPU - let cpu_mode = read_cpu_mode(); - let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; - let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS)); - let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS)); + match cpu_mode { + CpuMode::Off => debug!("profiling: CPU mode off"), + CpuMode::Continuous => start_cpu_continuous(cpu_freq).await, + CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await, + } - match cpu_mode { - CpuMode::Off => debug!("profiling: CPU mode off"), - CpuMode::Continuous => start_cpu_continuous(cpu_freq).await, - CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await, - } - - // Memory - let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC); - let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS)); - if mem_periodic { - start_memory_periodic(mem_interval).await; + // Memory + let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC); + let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS)); + if mem_periodic { + start_memory_periodic(mem_interval).await; + } } } + +#[cfg(target_os = "linux")] +pub use linux_impl::{dump_cpu_pprof_for, dump_memory_pprof_now, init_from_env}; diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index edc5dd52..521c2b06 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -33,7 +33,7 @@ use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServ use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3Service, service::S3ServiceBuilder}; -use socket2::SockRef; +use socket2::{SockRef, TcpKeepalive}; use std::io::{Error, Result}; use std::net::SocketAddr; use std::sync::Arc; @@ -371,6 +371,20 @@ pub async fn start_http_server( }; let socket_ref = SockRef::from(&socket); + + // Enable TCP Keepalive to detect dead clients (e.g. power loss) + // Idle: 10s, Interval: 5s, Retries: 3 + let ka = TcpKeepalive::new() + .with_time(Duration::from_secs(10)) + .with_interval(Duration::from_secs(5)); + + #[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] + let ka = ka.with_retries(3); + + if let Err(err) = socket_ref.set_tcp_keepalive(&ka) { + warn!(?err, "Failed to set TCP_KEEPALIVE"); + } + if let Err(err) = socket_ref.set_tcp_nodelay(true) { warn!(?err, "Failed to set TCP_NODELAY"); } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 7b779b6e..29a78816 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -108,7 +108,7 @@ use rustfs_s3select_api::{ use rustfs_s3select_query::get_global_db; use rustfs_targets::{ EventName, - arn::{TargetID, TargetIDError}, + arn::{ARN, TargetID, TargetIDError}, }; use rustfs_utils::{ CompressionAlgorithm, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent, @@ -1418,12 +1418,17 @@ impl S3 for FS { ..Default::default() }; - let opts = ObjectOptions { - version_id: object.version_id.map(|v| v.to_string()), - versioned: version_cfg.prefix_enabled(&object.object_name), - version_suspended: version_cfg.suspended(), - ..Default::default() - }; + let metadata = extract_metadata(&req.headers); + + let opts: ObjectOptions = del_opts( + &bucket, + &object.object_name, + object.version_id.map(|f| f.to_string()), + &req.headers, + metadata, + ) + .await + .map_err(ApiError::from)?; let mut goi = ObjectInfo::default(); let mut gerr = None; @@ -1684,10 +1689,6 @@ impl S3 for FS { version_id, part_number, range, - if_none_match, - if_match, - if_modified_since, - if_unmodified_since, .. } = req.input.clone(); @@ -1880,35 +1881,6 @@ impl S3 for FS { let info = reader.object_info; - if let Some(match_etag) = if_none_match { - if info.etag.as_ref().is_some_and(|etag| etag == match_etag.as_str()) { - return Err(S3Error::new(S3ErrorCode::NotModified)); - } - } - - if let Some(modified_since) = if_modified_since { - // obj_time < givenTime + 1s - if info.mod_time.is_some_and(|mod_time| { - let give_time: OffsetDateTime = modified_since.into(); - mod_time < give_time.add(time::Duration::seconds(1)) - }) { - return Err(S3Error::new(S3ErrorCode::NotModified)); - } - } - - if let Some(match_etag) = if_match { - if info.etag.as_ref().is_some_and(|etag| etag != match_etag.as_str()) { - return Err(S3Error::new(S3ErrorCode::PreconditionFailed)); - } - } else if let Some(unmodified_since) = if_unmodified_since { - if info.mod_time.is_some_and(|mod_time| { - let give_time: OffsetDateTime = unmodified_since.into(); - mod_time > give_time.add(time::Duration::seconds(1)) - }) { - return Err(S3Error::new(S3ErrorCode::PreconditionFailed)); - } - } - debug!(object_size = info.size, part_count = info.parts.len(), "GET object metadata snapshot"); for part in &info.parts { debug!( @@ -2773,7 +2745,7 @@ impl S3 for FS { .collect::>(); let output = ListObjectVersionsOutput { - // is_truncated: Some(object_infos.is_truncated), + is_truncated: Some(object_infos.is_truncated), max_keys: Some(key_count), delimiter, name: Some(bucket), @@ -4194,6 +4166,13 @@ impl S3 for FS { .. } = req.input.clone(); + if tagging.tag_set.len() > 10 { + // TOTO: Note that Amazon S3 limits the maximum number of tags to 10 tags per object. + // Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html + // Reference: https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_PutObjectTagging.html + // https://github.com/minio/mint/blob/master/run/core/aws-sdk-go-v2/main.go#L1647 + } + let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; @@ -4890,20 +4869,24 @@ impl S3 for FS { let parse_rules = async { let mut event_rules = Vec::new(); - process_queue_configurations( - &mut event_rules, - notification_configuration.queue_configurations.clone(), - TargetID::from_str, - ); - process_topic_configurations( - &mut event_rules, - notification_configuration.topic_configurations.clone(), - TargetID::from_str, - ); + process_queue_configurations(&mut event_rules, notification_configuration.queue_configurations.clone(), |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }); + process_topic_configurations(&mut event_rules, notification_configuration.topic_configurations.clone(), |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }); process_lambda_configurations( &mut event_rules, notification_configuration.lambda_function_configurations.clone(), - TargetID::from_str, + |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }, ); event_rules diff --git a/rustfs/src/storage/options.rs b/rustfs/src/storage/options.rs index f2075a0f..aacd6089 100644 --- a/rustfs/src/storage/options.rs +++ b/rustfs/src/storage/options.rs @@ -93,6 +93,8 @@ pub async fn del_opts( .map(|v| v.to_str().unwrap() == "true") .unwrap_or_default(); + fill_conditional_writes_opts_from_header(headers, &mut opts)?; + Ok(opts) } @@ -133,6 +135,8 @@ pub async fn get_opts( opts.version_suspended = version_suspended; opts.versioned = versioned; + fill_conditional_writes_opts_from_header(headers, &mut opts)?; + Ok(opts) }