Merge branch 'main' into feat/scan

This commit is contained in:
houseme
2025-12-07 21:45:36 +08:00
committed by GitHub
40 changed files with 857 additions and 440 deletions

78
.github/workflows/helm-package.yml vendored Normal file
View File

@@ -0,0 +1,78 @@
name: Publish helm chart to artifacthub
on:
workflow_run:
workflows: ["Build and Release"]
types: [completed]
env:
new_version: ${{ github.event.workflow_run.head_branch }}
jobs:
build-helm-package:
runs-on: ubuntu-latest
# Only run on successful builds triggered by tag pushes (version format: x.y.z or x.y.z-suffix)
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.event == 'push' &&
contains(github.event.workflow_run.head_branch, '.')
steps:
- name: Checkout helm chart repo
uses: actions/checkout@v2
- name: Replace chart appversion
run: |
set -e
set -x
old_version=$(grep "^appVersion:" helm/rustfs/Chart.yaml | awk '{print $2}')
sed -i "s/$old_version/$new_version/g" helm/rustfs/Chart.yaml
sed -i "/^image:/,/^[^ ]/ s/tag:.*/tag: "$new_version"/" helm/rustfs/values.yaml
- name: Set up Helm
uses: azure/setup-helm@v4.3.0
- name: Package Helm Chart
run: |
cp helm/README.md helm/rustfs/
package_version=$(echo $new_version | awk -F '-' '{print $2}' | awk -F '.' '{print $NF}')
helm package ./helm/rustfs --destination helm/rustfs/ --version "0.0.$package_version"
- name: Upload helm package as artifact
uses: actions/upload-artifact@v4
with:
name: helm-package
path: helm/rustfs/*.tgz
retention-days: 1
publish-helm-package:
runs-on: ubuntu-latest
needs: [build-helm-package]
steps:
- name: Checkout helm package repo
uses: actions/checkout@v2
with:
repository: rustfs/helm
token: ${{ secrets.RUSTFS_HELM_PACKAGE }}
- name: Download helm package
uses: actions/download-artifact@v4
with:
name: helm-package
path: ./
- name: Set up helm
uses: azure/setup-helm@v4.3.0
- name: Generate index
run: helm repo index . --url https://charts.rustfs.com
- name: Push helm package and index file
run: |
git config --global user.name "${{ secrets.USERNAME }}"
git config --global user.email "${{ secrets.EMAIL_ADDRESS }}"
git status .
git add .
git commit -m "Update rustfs helm package with $new_version."
git push origin main

4
.vscode/launch.json vendored
View File

@@ -22,6 +22,7 @@
"env": {
"RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=debug",
"RUSTFS_SKIP_BACKGROUND_TASK": "on",
//"RUSTFS_OBS_LOG_DIRECTORY": "./deploy/logs",
// "RUSTFS_POLICY_PLUGIN_URL":"http://localhost:8181/v1/data/rustfs/authz/allow",
// "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN":"your-opa-token"
},
@@ -91,6 +92,9 @@
"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
"RUSTFS_ADDRESS": ":9000",
"RUSTFS_CONSOLE_ENABLE": "true",
// "RUSTFS_OBS_TRACE_ENDPOINT": "http://127.0.0.1:4318/v1/traces", // jeager otlp http endpoint
// "RUSTFS_OBS_METRIC_ENDPOINT": "http://127.0.0.1:4318/v1/metrics", // default otlp http endpoint
// "RUSTFS_OBS_LOG_ENDPOINT": "http://127.0.0.1:4318/v1/logs", // default otlp http endpoint
"RUSTFS_CONSOLE_ADDRESS": "127.0.0.1:9001",
"RUSTFS_OBS_LOG_DIRECTORY": "./target/logs",
},

1
Cargo.lock generated
View File

@@ -7006,6 +7006,7 @@ dependencies = [
"serde_urlencoded",
"shadow-rs",
"socket2 0.6.1",
"subtle",
"sysctl",
"sysinfo",
"thiserror 2.0.17",

View File

@@ -152,6 +152,7 @@ rustls-pemfile = "2.2.0"
rustls-pki-types = "1.13.1"
sha1 = "0.11.0-rc.3"
sha2 = "0.11.0-rc.3"
subtle = "2.6"
zeroize = { version = "1.8.2", features = ["derive"] }
# Time and Date

View File

@@ -188,7 +188,7 @@ If you have any questions or need assistance:
- **Business**: [hello@rustfs.com](mailto:hello@rustfs.com)
- **Jobs**: [jobs@rustfs.com](mailto:jobs@rustfs.com)
- **General Discussion**: [GitHub Discussions](https://github.com/rustfs/rustfs/discussions)
- **Contributing**: [CONTRIBUTING.md](https://www.google.com/search?q=CONTRIBUTING.md)
- **Contributing**: [CONTRIBUTING.md](CONTRIBUTING.md)
## Contributors

View File

@@ -143,16 +143,16 @@ impl PriorityHealQueue {
format!("object:{}:{}:{}", bucket, object, version_id.as_deref().unwrap_or(""))
}
HealType::Bucket { bucket } => {
format!("bucket:{}", bucket)
format!("bucket:{bucket}")
}
HealType::ErasureSet { set_disk_id, .. } => {
format!("erasure_set:{}", set_disk_id)
format!("erasure_set:{set_disk_id}")
}
HealType::Metadata { bucket, object } => {
format!("metadata:{}:{}", bucket, object)
format!("metadata:{bucket}:{object}")
}
HealType::MRF { meta_path } => {
format!("mrf:{}", meta_path)
format!("mrf:{meta_path}")
}
HealType::ECDecode {
bucket,
@@ -173,7 +173,7 @@ impl PriorityHealQueue {
/// Check if an erasure set heal request for a specific set_disk_id exists
fn contains_erasure_set(&self, set_disk_id: &str) -> bool {
let key = format!("erasure_set:{}", set_disk_id);
let key = format!("erasure_set:{set_disk_id}");
self.dedup_keys.contains(&key)
}
}
@@ -327,7 +327,7 @@ impl HealManager {
if queue_len >= queue_capacity {
return Err(Error::ConfigurationError {
message: format!("Heal queue is full ({}/{})", queue_len, queue_capacity),
message: format!("Heal queue is full ({queue_len}/{queue_capacity})"),
});
}

View File

@@ -125,7 +125,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
0 => Ok(HealScanMode::Unknown),
1 => Ok(HealScanMode::Normal),
2 => Ok(HealScanMode::Deep),
_ => Err(E::custom(format!("invalid HealScanMode value: {}", value))),
_ => Err(E::custom(format!("invalid HealScanMode value: {value}"))),
}
}
@@ -134,7 +134,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
E: serde::de::Error,
{
if value > u8::MAX as u64 {
return Err(E::custom(format!("HealScanMode value too large: {}", value)));
return Err(E::custom(format!("HealScanMode value too large: {value}")));
}
self.visit_u8(value as u8)
}
@@ -144,7 +144,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
E: serde::de::Error,
{
if value < 0 || value > u8::MAX as i64 {
return Err(E::custom(format!("invalid HealScanMode value: {}", value)));
return Err(E::custom(format!("invalid HealScanMode value: {value}")));
}
self.visit_u8(value as u8)
}
@@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
"Unknown" | "unknown" => Ok(HealScanMode::Unknown),
"Normal" | "normal" => Ok(HealScanMode::Normal),
"Deep" | "deep" => Ok(HealScanMode::Deep),
_ => Err(E::custom(format!("invalid HealScanMode string: {}", value))),
_ => Err(E::custom(format!("invalid HealScanMode string: {value}"))),
}
}
}

View File

@@ -1985,24 +1985,20 @@ impl DiskAPI for LocalDisk {
// TODO: Healing
let search_version_id = fi.version_id.or(Some(Uuid::nil()));
// Check if there's an existing version with the same version_id that has a data_dir to clean up
// Note: For non-versioned buckets, fi.version_id is None, but in xl.meta it's stored as Some(Uuid::nil())
let has_old_data_dir = {
if let Ok((_, ver)) = xlmeta.find_version(fi.version_id) {
let has_data_dir = ver.get_data_dir();
if let Some(data_dir) = has_data_dir {
if xlmeta.shard_data_dir_count(&fi.version_id, &Some(data_dir)) == 0 {
// TODO: Healing
// remove inlinedata\
Some(data_dir)
} else {
None
}
} else {
None
}
} else {
None
}
xlmeta.find_version(search_version_id).ok().and_then(|(_, ver)| {
// shard_count == 0 means no other version shares this data_dir
ver.get_data_dir()
.filter(|&data_dir| xlmeta.shard_data_dir_count(&search_version_id, &Some(data_dir)) == 0)
})
};
if let Some(old_data_dir) = has_old_data_dir.as_ref() {
let _ = xlmeta.data.remove(vec![search_version_id.unwrap_or_default(), *old_data_dir]);
}
xlmeta.add_version(fi.clone())?;

View File

@@ -149,6 +149,12 @@ impl Erasure {
break;
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
// Check if the inner error is a checksum mismatch - if so, propagate it
if let Some(inner) = e.get_ref() {
if rustfs_rio::is_checksum_mismatch(inner) {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()));
}
}
break;
}
Err(e) => {

View File

@@ -194,6 +194,12 @@ pub enum StorageError {
#[error("Precondition failed")]
PreconditionFailed,
#[error("Not modified")]
NotModified,
#[error("Invalid part number: {0}")]
InvalidPartNumber(usize),
#[error("Invalid range specified: {0}")]
InvalidRangeSpec(String),
}
@@ -427,6 +433,8 @@ impl Clone for StorageError {
StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()),
StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()),
StorageError::PreconditionFailed => StorageError::PreconditionFailed,
StorageError::NotModified => StorageError::NotModified,
StorageError::InvalidPartNumber(a) => StorageError::InvalidPartNumber(*a),
StorageError::InvalidRangeSpec(a) => StorageError::InvalidRangeSpec(a.clone()),
}
}
@@ -496,6 +504,8 @@ impl StorageError {
StorageError::PreconditionFailed => 0x3B,
StorageError::EntityTooSmall(_, _, _) => 0x3C,
StorageError::InvalidRangeSpec(_) => 0x3D,
StorageError::NotModified => 0x3E,
StorageError::InvalidPartNumber(_) => 0x3F,
}
}
@@ -566,6 +576,8 @@ impl StorageError {
0x3B => Some(StorageError::PreconditionFailed),
0x3C => Some(StorageError::EntityTooSmall(Default::default(), Default::default(), Default::default())),
0x3D => Some(StorageError::InvalidRangeSpec(Default::default())),
0x3E => Some(StorageError::NotModified),
0x3F => Some(StorageError::InvalidPartNumber(Default::default())),
_ => None,
}
}
@@ -679,6 +691,10 @@ pub fn is_err_data_movement_overwrite(err: &Error) -> bool {
matches!(err, &StorageError::DataMovementOverwriteErr(_, _, _))
}
pub fn is_err_io(err: &Error) -> bool {
matches!(err, &StorageError::Io(_))
}
pub fn is_all_not_found(errs: &[Option<Error>]) -> bool {
for err in errs.iter() {
if let Some(err) = err {

View File

@@ -767,6 +767,12 @@ impl ECStore {
def_pool = pinfo.clone();
has_def_pool = true;
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-deletes.html
if is_err_object_not_found(err) {
if let Err(err) = opts.precondition_check(&pinfo.object_info) {
return Err(err.clone());
}
}
if !is_err_object_not_found(err) && !is_err_version_not_found(err) {
return Err(err.clone());
@@ -1392,6 +1398,7 @@ impl StorageAPI for ECStore {
let (info, _) = self.get_latest_object_info_with_idx(bucket, object.as_str(), opts).await?;
opts.precondition_check(&info)?;
Ok(info)
}

View File

@@ -356,6 +356,8 @@ impl HTTPRangeSpec {
pub struct HTTPPreconditions {
pub if_match: Option<String>,
pub if_none_match: Option<String>,
pub if_modified_since: Option<OffsetDateTime>,
pub if_unmodified_since: Option<OffsetDateTime>,
}
#[derive(Debug, Default, Clone)]
@@ -456,6 +458,76 @@ impl ObjectOptions {
..Default::default()
}
}
pub fn precondition_check(&self, obj_info: &ObjectInfo) -> Result<()> {
let has_valid_mod_time = obj_info.mod_time.is_some_and(|t| t != OffsetDateTime::UNIX_EPOCH);
if let Some(part_number) = self.part_number {
if part_number > 1 && !obj_info.parts.is_empty() {
let part_found = obj_info.parts.iter().any(|pi| pi.number == part_number);
if !part_found {
return Err(Error::InvalidPartNumber(part_number));
}
}
}
if let Some(pre) = &self.http_preconditions {
if let Some(if_none_match) = &pre.if_none_match {
if let Some(etag) = &obj_info.etag {
if is_etag_equal(etag, if_none_match) {
return Err(Error::NotModified);
}
}
}
if has_valid_mod_time {
if let Some(if_modified_since) = &pre.if_modified_since {
if let Some(mod_time) = &obj_info.mod_time {
if !is_modified_since(mod_time, if_modified_since) {
return Err(Error::NotModified);
}
}
}
}
if let Some(if_match) = &pre.if_match {
if let Some(etag) = &obj_info.etag {
if !is_etag_equal(etag, if_match) {
return Err(Error::PreconditionFailed);
}
} else {
return Err(Error::PreconditionFailed);
}
}
if has_valid_mod_time && pre.if_match.is_none() {
if let Some(if_unmodified_since) = &pre.if_unmodified_since {
if let Some(mod_time) = &obj_info.mod_time {
if is_modified_since(mod_time, if_unmodified_since) {
return Err(Error::PreconditionFailed);
}
}
}
}
}
Ok(())
}
}
fn is_etag_equal(etag1: &str, etag2: &str) -> bool {
let e1 = etag1.trim_matches('"');
let e2 = etag2.trim_matches('"');
// Handle wildcard "*" - matches any ETag (per HTTP/1.1 RFC 7232)
if e2 == "*" {
return true;
}
e1 == e2
}
fn is_modified_since(mod_time: &OffsetDateTime, given_time: &OffsetDateTime) -> bool {
let mod_secs = mod_time.unix_timestamp();
let given_secs = given_time.unix_timestamp();
mod_secs > given_secs
}
#[derive(Debug, Default, Serialize, Deserialize)]

View File

@@ -34,7 +34,7 @@ use std::{collections::HashMap, io::Cursor};
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::io::AsyncRead;
use tracing::error;
use tracing::{error, warn};
use uuid::Uuid;
use xxhash_rust::xxh64;
@@ -444,8 +444,9 @@ impl FileMeta {
// Find version
pub fn find_version(&self, vid: Option<Uuid>) -> Result<(usize, FileMetaVersion)> {
let vid = vid.unwrap_or_default();
for (i, fver) in self.versions.iter().enumerate() {
if fver.header.version_id == vid {
if fver.header.version_id == Some(vid) {
let version = self.get_idx(i)?;
return Ok((i, version));
}
@@ -456,9 +457,12 @@ impl FileMeta {
// shard_data_dir_count queries the count of data_dir under vid
pub fn shard_data_dir_count(&self, vid: &Option<Uuid>, data_dir: &Option<Uuid>) -> usize {
let vid = vid.unwrap_or_default();
self.versions
.iter()
.filter(|v| v.header.version_type == VersionType::Object && v.header.version_id != *vid && v.header.user_data_dir())
.filter(|v| {
v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.user_data_dir()
})
.map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default())
.filter(|v| v == data_dir)
.count()
@@ -890,12 +894,11 @@ impl FileMeta {
read_data: bool,
all_parts: bool,
) -> Result<FileInfo> {
let has_vid = {
let vid = {
if !version_id.is_empty() {
let id = Uuid::parse_str(version_id)?;
if !id.is_nil() { Some(id) } else { None }
Uuid::parse_str(version_id)?
} else {
None
Uuid::nil()
}
};
@@ -905,12 +908,12 @@ impl FileMeta {
for ver in self.versions.iter() {
let header = &ver.header;
if let Some(vid) = has_vid {
if header.version_id != Some(vid) {
is_latest = false;
succ_mod_time = header.mod_time;
continue;
}
// TODO: freeVersion
if !version_id.is_empty() && header.version_id != Some(vid) {
is_latest = false;
succ_mod_time = header.mod_time;
continue;
}
let mut fi = ver.into_fileinfo(volume, path, all_parts)?;
@@ -932,7 +935,7 @@ impl FileMeta {
return Ok(fi);
}
if has_vid.is_none() {
if version_id.is_empty() {
Err(Error::FileNotFound)
} else {
Err(Error::FileVersionNotFound)
@@ -1091,13 +1094,10 @@ impl FileMeta {
/// Count shared data directories
pub fn shared_data_dir_count(&self, version_id: Option<Uuid>, data_dir: Option<Uuid>) -> usize {
let version_id = version_id.unwrap_or_default();
if self.data.entries().unwrap_or_default() > 0
&& version_id.is_some()
&& self
.data
.find(version_id.unwrap().to_string().as_str())
.unwrap_or_default()
.is_some()
&& self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some()
{
return 0;
}
@@ -1105,7 +1105,9 @@ impl FileMeta {
self.versions
.iter()
.filter(|v| {
v.header.version_type == VersionType::Object && v.header.version_id != version_id && v.header.user_data_dir()
v.header.version_type == VersionType::Object
&& v.header.version_id != Some(version_id)
&& v.header.user_data_dir()
})
.filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok())
.filter(|&dir| dir == data_dir)

View File

@@ -416,76 +416,88 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
// TracerHTTP
let tracer_provider = {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(trace_ep.as_str())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Gzip)
.build()
.map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?;
if trace_ep.is_empty() {
None
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(trace_ep.as_str())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Gzip)
.build()
.map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?;
let mut builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone())
.with_batch_exporter(exporter);
let mut builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone())
.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default());
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default());
}
let provider = builder.build();
global::set_tracer_provider(provider.clone());
Some(provider)
}
let provider = builder.build();
global::set_tracer_provider(provider.clone());
provider
};
// MeterHTTP
let meter_provider = {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(metric_ep.as_str())
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Gzip)
.build()
.map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?;
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
if metric_ep.is_empty() {
None
} else {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(metric_ep.as_str())
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Gzip)
.build()
.map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?;
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let (provider, recorder) = Recorder::builder(service_name.clone())
.with_meter_provider(|b| {
let b = b.with_resource(res.clone()).with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
b.with_reader(create_periodic_reader(meter_interval))
} else {
b
}
})
.build();
global::set_meter_provider(provider.clone());
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
provider
let (provider, recorder) = Recorder::builder(service_name.clone())
.with_meter_provider(|b| {
let b = b.with_resource(res.clone()).with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
b.with_reader(create_periodic_reader(meter_interval))
} else {
b
}
})
.build();
global::set_meter_provider(provider.clone());
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
Some(provider)
}
};
// LoggerHTTP
let logger_provider = {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_endpoint(log_ep.as_str())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Gzip)
.build()
.map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?;
if log_ep.is_empty() {
None
} else {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_endpoint(log_ep.as_str())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Gzip)
.build()
.map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?;
let mut builder = SdkLoggerProvider::builder().with_resource(res);
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
let mut builder = SdkLoggerProvider::builder().with_resource(res);
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
Some(builder.build())
}
builder.build()
};
// Tracing layer
@@ -512,16 +524,21 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
};
let filter = build_env_filter(logger_level, None);
let otel_bridge = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(service_name.to_string());
let otel_bridge = logger_provider
.as_ref()
.map(|p| OpenTelemetryTracingBridge::new(p).with_filter(build_env_filter(logger_level, None)));
let tracer_layer = tracer_provider
.as_ref()
.map(|p| OpenTelemetryLayer::new(p.tracer(service_name.to_string())));
let metrics_layer = meter_provider.as_ref().map(|p| MetricsLayer::new(p.clone()));
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(fmt_layer_opt)
.with(OpenTelemetryLayer::new(tracer))
.with(tracer_layer)
.with(otel_bridge)
.with(MetricsLayer::new(meter_provider.clone()))
.with(metrics_layer)
.init();
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
@@ -532,9 +549,9 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
);
Ok(OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
tracer_provider,
meter_provider,
logger_provider,
flexi_logger_handles: None,
tracing_guard: None,
})

View File

@@ -45,7 +45,13 @@ pub async fn node_service_time_out_client(
let channel = match channel {
Some(channel) => channel,
None => {
let connector = Endpoint::from_shared(addr.to_string())?.connect_timeout(Duration::from_secs(60));
let connector = Endpoint::from_shared(addr.to_string())?
.connect_timeout(Duration::from_secs(5))
.tcp_keepalive(Some(Duration::from_secs(10)))
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(3))
.keep_alive_while_idle(true)
.timeout(Duration::from_secs(60));
let channel = connector.connect().await?;
{
@@ -55,7 +61,6 @@ pub async fn node_service_time_out_client(
}
};
// let timeout_channel = Timeout::new(channel, Duration::from_secs(60));
Ok(NodeServiceClient::with_interceptor(
channel,
Box::new(move |mut req: Request<()>| {

View File

@@ -499,17 +499,18 @@ impl AsyncRead for HashReader {
let content_hash = hasher.finalize();
if content_hash != expected_content_hash.raw {
let expected_hex = hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower);
let actual_hex = hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower);
error!(
"Content hash mismatch, type={:?}, encoded={:?}, expected={:?}, actual={:?}",
expected_content_hash.checksum_type,
expected_content_hash.encoded,
hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower),
hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower)
expected_content_hash.checksum_type, expected_content_hash.encoded, expected_hex, actual_hex
);
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Content hash mismatch",
)));
// Use ChecksumMismatch error so that API layer can return BadDigest
let checksum_err = crate::errors::ChecksumMismatch {
want: expected_hex,
got: actual_hex,
};
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, checksum_err)));
}
}

View File

@@ -32,7 +32,16 @@ use crate::{EtagResolvable, HashReaderDetector, HashReaderMut};
fn get_http_client() -> Client {
// Reuse the HTTP connection pool in the global `reqwest::Client` instance
// TODO: interact with load balancing?
static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
Client::builder()
.connect_timeout(std::time::Duration::from_secs(5))
.tcp_keepalive(std::time::Duration::from_secs(10))
.http2_keep_alive_interval(std::time::Duration::from_secs(5))
.http2_keep_alive_timeout(std::time::Duration::from_secs(3))
.http2_keep_alive_while_idle(true)
.build()
.expect("Failed to create global HTTP client")
});
CLIENT.clone()
}

View File

@@ -41,6 +41,11 @@ pub async fn read_full<R: AsyncRead + Send + Sync + Unpin>(mut reader: R, mut bu
if total == 0 {
return Err(e);
}
// If the error is InvalidData (e.g., checksum mismatch), preserve it
// instead of wrapping it as UnexpectedEof, so proper error handling can occur
if e.kind() == std::io::ErrorKind::InvalidData {
return Err(e);
}
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("read {total} bytes, error: {e}"),

65
docs/cluster_recovery.md Normal file
View File

@@ -0,0 +1,65 @@
# Resolution Report: Issue #1001 - Cluster Recovery from Abrupt Power-Off
## 1. Issue Description
**Problem**: The cluster failed to recover gracefully when a node experienced an abrupt power-off (hard failure).
**Symptoms**:
- The application became unable to upload files.
- The Console Web UI became unresponsive across the cluster.
- The system "hung" indefinitely, unlike the immediate recovery observed during a graceful process termination (`kill`).
**Root Cause**:
The standard TCP protocol does not immediately detect a silent peer disappearance (power loss) because no `FIN` or `RST` packets are sent. Without active application-layer heartbeats, the surviving nodes kept connections implementation in an `ESTABLISHED` state, waiting indefinitely for responses that would never arrive.
---
## 2. Technical Approach
To resolve this, we needed to transform the passive failure detection (waiting for TCP timeout) into an active detection mechanism.
### Key Objectives:
1. **Fail Fast**: Detect dead peers in seconds, not minutes.
2. **Accuracy**: Distinguish between network congestion and actual node failure.
3. **Safety**: Ensure no thread or task blocks forever on a remote procedure call (RPC).
---
## 3. Implemented Solution
We modified the internal gRPC client configuration in `crates/protos/src/lib.rs` to implement a multi-layered health check strategy.
### Solution Overview
The fix implements a multi-layered detection strategy covering both Control Plane (RPC) and Data Plane (Streaming):
1. **Control Plane (gRPC)**:
* Enabled `http2_keep_alive_interval` (5s) and `keep_alive_timeout` (3s) in `tonic` clients.
* Enforced `tcp_keepalive` (10s) on underlying transport.
* Context: Ensures cluster metadata operations (raft, status checks) fail fast if a node dies.
2. **Data Plane (File Uploads/Downloads)**:
* **Client (Rio)**: Updated `reqwest` client builder in `crates/rio` to enable TCP Keepalive (10s) and HTTP/2 Keepalive (5s). This prevents hangs during large file streaming (e.g., 1GB uploads).
* **Server**: Enabled `SO_KEEPALIVE` on all incoming TCP connections in `rustfs/src/server/http.rs` to forcefully close sockets from dead clients.
3. **Cross-Platform Build Stability**:
* Guarded Linux-specific profiling code (`jemalloc_pprof`) with `#[cfg(target_os = "linux")]` to fix build failures on macOS/AArch64.
### Configuration Changes
```rust
let connector = Endpoint::from_shared(addr.to_string())?
.connect_timeout(Duration::from_secs(5))
// 1. App-Layer Heartbeats (Primary Detection)
// Sends a hidden HTTP/2 PING frame every 5 seconds.
.http2_keep_alive_interval(Duration::from_secs(5))
// If PING is not acknowledged within 3 seconds, closes connection.
.keep_alive_timeout(Duration::from_secs(3))
// Ensures PINGs are sent even when no active requests are in flight.
.keep_alive_while_idle(true)
// 2. Transport-Layer Keepalive (OS Backup)
.tcp_keepalive(Some(Duration::from_secs(10)))
// 3. Global Safety Net
// Hard deadline for any RPC operation.
.timeout(Duration::from_secs(60));
```
### Outcome
- **Detection Time**: Reduced from ~15+ minutes (OS default) to **~8 seconds** (5s interval + 3s timeout).
- **Behavior**: When a node loses power, surviving peers now detect the lost connection almost immediately, throwing a protocol error that triggers standard cluster recovery/failover logic.
- **Result**: The cluster now handles power-offs with the same resilience as graceful shutdowns.

View File

@@ -21,6 +21,8 @@ RustFS helm chart supports **standalone and distributed mode**. For standalone m
| secret.rustfs.access_key | RustFS Access Key ID | `rustfsadmin` |
| secret.rustfs.secret_key | RustFS Secret Key ID | `rustfsadmin` |
| storageclass.name | The name for StorageClass. | `local-path` |
| storageclass.dataStorageSize | The storage size for data PVC. | `256Mi` |
| storageclass.logStorageSize | The storage size for log PVC. | `256Mi` |
| ingress.className | Specify the ingress class, traefik or nginx. | `nginx` |

View File

@@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.0.0-alpha.69
version: 1.0.3
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.0.0-alpha.69"
appVersion: "1.0.0-alpha.73"

View File

@@ -60,3 +60,14 @@ Create the name of the service account to use
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}
{{/*
Return the secret name
*/}}
{{- define "rustfs.secretName" -}}
{{- if .Values.secret.existingSecret }}
{{- .Values.secret.existingSecret }}
{{- else }}
{{- printf "%s-secret" (include "rustfs.fullname" .) }}
{{- end }}
{{- end }}

View File

@@ -55,7 +55,7 @@ spec:
- configMapRef:
name: {{ include "rustfs.fullname" . }}-config
- secretRef:
name: {{ include "rustfs.fullname" . }}-secret
name: {{ include "rustfs.secretName" . }}
resources:
requests:
memory: {{ .Values.resources.requests.memory }}

View File

@@ -0,0 +1,4 @@
{{- range .Values.extraManifests }}
---
{{ tpl (toYaml .) $ }}
{{- end }}

View File

@@ -15,7 +15,7 @@ spec:
{{- with .Values.ingress.className }}
ingressClassName: {{ . }}
{{- end }}
{{- if .Values.ingress.tls }}
{{- if .Values.tls.enabled }}
tls:
{{- range .Values.ingress.tls }}
- hosts:

View File

@@ -8,7 +8,7 @@ spec:
storageClassName: {{ .Values.storageclass.name }}
resources:
requests:
storage: {{ .Values.storageclass.size }}
storage: {{ .Values.storageclass.dataStorageSize }}
---
apiVersion: v1
@@ -20,5 +20,5 @@ spec:
storageClassName: {{ .Values.storageclass.name }}
resources:
requests:
storage: {{ .Values.storageclass.size }}
storage: {{ .Values.storageclass.logStorageSize }}
{{- end }}

View File

@@ -1,9 +1,10 @@
{{- if not .Values.secret.existingSecret }}
apiVersion: v1
kind: Secret
metadata:
name: {{ include "rustfs.fullname" . }}-secret
name: {{ include "rustfs.secretName" . }}
type: Opaque
data:
RUSTFS_ACCESS_KEY: {{ .Values.secret.rustfs.access_key | b64enc | quote }}
RUSTFS_SECRET_KEY: {{ .Values.secret.rustfs.secret_key | b64enc | quote }}
{{- end }}

View File

@@ -76,7 +76,7 @@ spec:
- configMapRef:
name: {{ include "rustfs.fullname" . }}-config
- secretRef:
name: {{ include "rustfs.fullname" . }}-secret
name: {{ include "rustfs.secretName" . }}
resources:
requests:
memory: {{ .Values.resources.requests.memory }}
@@ -122,7 +122,7 @@ spec:
storageClassName: {{ $.Values.storageclass.name }}
resources:
requests:
storage: {{ $.Values.storageclass.size}}
storage: {{ $.Values.storageclass.logStorageSize}}
{{- if eq (int .Values.replicaCount) 4 }}
{{- range $i := until (int .Values.replicaCount) }}
- metadata:
@@ -132,7 +132,7 @@ spec:
storageClassName: {{ $.Values.storageclass.name }}
resources:
requests:
storage: {{ $.Values.storageclass.size}}
storage: {{ $.Values.storageclass.dataStorageSize}}
{{- end }}
{{- else if eq (int .Values.replicaCount) 16 }}
- metadata:
@@ -142,6 +142,6 @@ spec:
storageClassName: {{ $.Values.storageclass.name }}
resources:
requests:
storage: {{ $.Values.storageclass.size}}
storage: {{ $.Values.storageclass.dataStorageSize}}
{{- end }}
{{- end }}

View File

@@ -27,6 +27,7 @@ mode:
enabled: true
secret:
existingSecret: ""
rustfs:
access_key: rustfsadmin
secret_key: rustfsadmin
@@ -146,4 +147,7 @@ affinity: {}
storageclass:
name: local-path
size: 256Mi
dataStorageSize: 256Mi
logStorageSize: 256Mi
extraManifests: []

View File

@@ -92,6 +92,7 @@ serde_urlencoded = { workspace = true }
# Cryptography and Security
rustls = { workspace = true }
subtle = { workspace = true }
# Time and Date
chrono = { workspace = true }

View File

@@ -29,7 +29,7 @@ use tracing::warn;
use crate::{
admin::{auth::validate_admin_request, router::Operation, utils::has_space_be},
auth::{check_key_valid, get_session_token},
auth::{check_key_valid, constant_time_eq, get_session_token},
};
#[derive(Debug, Deserialize, Default)]
@@ -240,7 +240,7 @@ impl Operation for UpdateGroupMembers {
get_global_action_cred()
.map(|cred| {
if cred.access_key == *member {
if constant_time_eq(&cred.access_key, member) {
return Err(S3Error::with_message(
S3ErrorCode::MethodNotAllowed,
format!("can't add root {member}"),

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use crate::admin::utils::has_space_be;
use crate::auth::{get_condition_values, get_session_token};
use crate::auth::{constant_time_eq, get_condition_values, get_session_token};
use crate::{admin::router::Operation, auth::check_key_valid};
use http::HeaderMap;
use hyper::StatusCode;
@@ -83,7 +83,7 @@ impl Operation for AddServiceAccount {
return Err(s3_error!(InvalidRequest, "get sys cred failed"));
};
if sys_cred.access_key == create_req.access_key {
if constant_time_eq(&sys_cred.access_key, &create_req.access_key) {
return Err(s3_error!(InvalidArgument, "can't create user with system access key"));
}
@@ -107,7 +107,7 @@ impl Operation for AddServiceAccount {
return Err(s3_error!(InvalidRequest, "iam not init"));
};
let deny_only = cred.access_key == target_user || cred.parent_user == target_user;
let deny_only = constant_time_eq(&cred.access_key, &target_user) || constant_time_eq(&cred.parent_user, &target_user);
if !iam_store
.is_allowed(&Args {

View File

@@ -14,7 +14,7 @@
use crate::{
admin::{auth::validate_admin_request, router::Operation, utils::has_space_be},
auth::{check_key_valid, get_session_token},
auth::{check_key_valid, constant_time_eq, get_session_token},
};
use http::{HeaderMap, StatusCode};
use matchit::Params;
@@ -95,7 +95,7 @@ impl Operation for AddUser {
}
if let Some(sys_cred) = get_global_action_cred() {
if sys_cred.access_key == ak {
if constant_time_eq(&sys_cred.access_key, ak) {
return Err(s3_error!(InvalidArgument, "can't create user with system access key"));
}
}
@@ -162,7 +162,7 @@ impl Operation for SetUserStatus {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
if input_cred.access_key == ak {
if constant_time_eq(&input_cred.access_key, ak) {
return Err(s3_error!(InvalidArgument, "can't change status of self"));
}

View File

@@ -29,9 +29,37 @@ use s3s::auth::SimpleAuth;
use s3s::s3_error;
use serde_json::Value;
use std::collections::HashMap;
use subtle::ConstantTimeEq;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
/// Performs constant-time string comparison to prevent timing attacks.
///
/// This function should be used when comparing sensitive values like passwords,
/// API keys, or authentication tokens. It ensures the comparison time is
/// independent of the position where strings differ and handles length differences
/// securely.
///
/// # Security Note
/// This implementation uses the `subtle` crate to provide cryptographically
/// sound constant-time guarantees. The function is resistant to timing side-channel
/// attacks and suitable for security-critical comparisons.
///
/// # Example
/// ```
/// use rustfs::auth::constant_time_eq;
///
/// let secret1 = "my-secret-key";
/// let secret2 = "my-secret-key";
/// let secret3 = "wrong-secret";
///
/// assert!(constant_time_eq(secret1, secret2));
/// assert!(!constant_time_eq(secret1, secret3));
/// ```
pub fn constant_time_eq(a: &str, b: &str) -> bool {
a.as_bytes().ct_eq(b.as_bytes()).into()
}
// Authentication type constants
const JWT_ALGORITHM: &str = "Bearer ";
const SIGN_V2_ALGORITHM: &str = "AWS ";
@@ -111,7 +139,7 @@ pub async fn check_key_valid(session_token: &str, access_key: &str) -> S3Result<
let sys_cred = cred.clone();
if cred.access_key != access_key {
if !constant_time_eq(&cred.access_key, access_key) {
let Ok(iam_store) = rustfs_iam::get() else {
return Err(S3Error::with_message(
S3ErrorCode::InternalError,
@@ -146,7 +174,8 @@ pub async fn check_key_valid(session_token: &str, access_key: &str) -> S3Result<
cred.claims = if !claims.is_empty() { Some(claims) } else { None };
let mut owner = sys_cred.access_key == cred.access_key || cred.parent_user == sys_cred.access_key;
let mut owner =
constant_time_eq(&sys_cred.access_key, &cred.access_key) || constant_time_eq(&cred.parent_user, &sys_cred.access_key);
// permitRootAccess
if let Some(claims) = &cred.claims {
@@ -225,7 +254,7 @@ pub fn get_condition_values(
let principal_type = if !username.is_empty() {
if claims.is_some() {
"AssumedRole"
} else if sys_cred.access_key == username {
} else if constant_time_eq(&sys_cred.access_key, &username) {
"Account"
} else {
"User"
@@ -1102,4 +1131,21 @@ mod tests {
assert_eq!(auth_type, AuthType::Unknown);
}
#[test]
fn test_constant_time_eq() {
assert!(constant_time_eq("test", "test"));
assert!(!constant_time_eq("test", "Test"));
assert!(!constant_time_eq("test", "test1"));
assert!(!constant_time_eq("test1", "test"));
assert!(!constant_time_eq("", "test"));
assert!(constant_time_eq("", ""));
// Test with credentials-like strings
let key1 = "AKIAIOSFODNN7EXAMPLE";
let key2 = "AKIAIOSFODNN7EXAMPLE";
let key3 = "AKIAIOSFODNN7EXAMPLF";
assert!(constant_time_eq(key1, key2));
assert!(!constant_time_eq(key1, key3));
}
}

View File

@@ -192,6 +192,21 @@ impl From<ApiError> for S3Error {
impl From<StorageError> for ApiError {
fn from(err: StorageError) -> Self {
// Special handling for Io errors that may contain ChecksumMismatch
if let StorageError::Io(ref io_err) = err {
if let Some(inner) = io_err.get_ref() {
if inner.downcast_ref::<rustfs_rio::ChecksumMismatch>().is_some()
|| inner.downcast_ref::<rustfs_rio::BadDigest>().is_some()
{
return ApiError {
code: S3ErrorCode::BadDigest,
message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest),
source: Some(Box::new(err)),
};
}
}
}
let code = match &err {
StorageError::NotImplemented => S3ErrorCode::NotImplemented,
StorageError::InvalidArgument(_, _, _) => S3ErrorCode::InvalidArgument,
@@ -239,6 +254,23 @@ impl From<StorageError> for ApiError {
impl From<std::io::Error> for ApiError {
fn from(err: std::io::Error) -> Self {
// Check if the error is a ChecksumMismatch (BadDigest)
if let Some(inner) = err.get_ref() {
if inner.downcast_ref::<rustfs_rio::ChecksumMismatch>().is_some() {
return ApiError {
code: S3ErrorCode::BadDigest,
message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest),
source: Some(Box::new(err)),
};
}
if inner.downcast_ref::<rustfs_rio::BadDigest>().is_some() {
return ApiError {
code: S3ErrorCode::BadDigest,
message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest),
source: Some(Box::new(err)),
};
}
}
ApiError {
code: S3ErrorCode::InternalError,
message: err.to_string(),

View File

@@ -60,12 +60,11 @@ use rustfs_ecstore::{
use rustfs_iam::init_iam_sys;
use rustfs_notify::notifier_global;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_targets::arn::TargetID;
use rustfs_targets::arn::{ARN, TargetIDError};
use rustfs_utils::net::parse_and_resolve_address;
use s3s::s3_error;
use std::env;
use std::io::{Error, Result};
use std::str::FromStr;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
@@ -516,9 +515,21 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
"Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
let mut event_rules = Vec::new();
process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), TargetID::from_str);
process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str);
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str);
process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules)
.await

View File

@@ -12,272 +12,291 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::Utc;
use jemalloc_pprof::PROF_CTL;
use pprof::protos::Message;
use rustfs_config::{
DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING,
DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ,
ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR,
};
use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize};
use std::fs::{File, create_dir_all};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
#[cfg(not(target_os = "linux"))]
pub async fn init_from_env() {}
static CPU_CONT_GUARD: OnceLock<Arc<Mutex<Option<pprof::ProfilerGuard<'static>>>>> = OnceLock::new();
/// CPU profiling mode
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum CpuMode {
Off,
Continuous,
Periodic,
#[cfg(not(target_os = "linux"))]
pub async fn dump_cpu_pprof_for(_duration: std::time::Duration) -> Result<std::path::PathBuf, String> {
Err("CPU profiling is only supported on Linux".to_string())
}
/// Get or create output directory
fn output_dir() -> PathBuf {
let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR);
let p = PathBuf::from(dir);
if let Err(e) = create_dir_all(&p) {
warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e);
return PathBuf::from(".");
#[cfg(not(target_os = "linux"))]
pub async fn dump_memory_pprof_now() -> Result<std::path::PathBuf, String> {
Err("Memory profiling is only supported on Linux".to_string())
}
#[cfg(target_os = "linux")]
mod linux_impl {
use chrono::Utc;
use jemalloc_pprof::PROF_CTL;
use pprof::protos::Message;
use rustfs_config::{
DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING,
DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ,
ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR,
};
use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize};
use std::fs::{File, create_dir_all};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
static CPU_CONT_GUARD: OnceLock<Arc<Mutex<Option<pprof::ProfilerGuard<'static>>>>> = OnceLock::new();
/// CPU profiling mode
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum CpuMode {
Off,
Continuous,
Periodic,
}
p
}
/// Read CPU profiling mode from env
fn read_cpu_mode() -> CpuMode {
match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() {
"continuous" => CpuMode::Continuous,
"periodic" => CpuMode::Periodic,
_ => CpuMode::Off,
/// Get or create output directory
fn output_dir() -> PathBuf {
let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR);
let p = PathBuf::from(dir);
if let Err(e) = create_dir_all(&p) {
warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e);
return PathBuf::from(".");
}
p
}
}
/// Generate timestamp string for filenames
fn ts() -> String {
Utc::now().format("%Y%m%dT%H%M%S").to_string()
}
/// Write pprof report to file in protobuf format
fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> {
let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?;
let mut buf = Vec::with_capacity(512 * 1024);
profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?;
let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?;
f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?;
Ok(())
}
/// Internal: dump CPU pprof from existing guard
async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result<PathBuf, String> {
let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?;
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
write_pprof_report_pb(&report, &out)?;
info!("CPU profile exported: {}", out.display());
Ok(out)
}
// Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately.
pub async fn dump_cpu_pprof_for(duration: Duration) -> Result<PathBuf, String> {
if let Some(cell) = CPU_CONT_GUARD.get() {
let guard_slot = cell.lock().await;
if let Some(ref guard) = *guard_slot {
debug!("profiling: using continuous profiler guard for CPU dump");
return dump_cpu_with_guard(guard).await;
/// Read CPU profiling mode from env
fn read_cpu_mode() -> CpuMode {
match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() {
"continuous" => CpuMode::Continuous,
"periodic" => CpuMode::Periodic,
_ => CpuMode::Off,
}
}
let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?;
sleep(duration).await;
dump_cpu_with_guard(&guard).await
}
// Public API: dump memory pprof now (jemalloc)
pub async fn dump_memory_pprof_now() -> Result<PathBuf, String> {
let out = output_dir().join(format!("mem_profile_{}.pb", ts()));
let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?;
let prof_ctl_cell = PROF_CTL
.as_ref()
.ok_or_else(|| "jemalloc profiling control not available".to_string())?;
let mut prof_ctl = prof_ctl_cell.lock().await;
if !prof_ctl.activated() {
return Err("jemalloc profiling is not active".to_string());
/// Generate timestamp string for filenames
fn ts() -> String {
Utc::now().format("%Y%m%dT%H%M%S").to_string()
}
let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?;
f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?;
info!("Memory profile exported: {}", out.display());
Ok(out)
}
// Jemalloc status check (No forced placement, only status observation)
pub async fn check_jemalloc_profiling() {
use tikv_jemalloc_ctl::{config, epoch, stats};
if let Err(e) = epoch::advance() {
warn!("jemalloc epoch advance failed: {e}");
/// Write pprof report to file in protobuf format
fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> {
let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?;
let mut buf = Vec::with_capacity(512 * 1024);
profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?;
let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?;
f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?;
Ok(())
}
match config::malloc_conf::read() {
Ok(conf) => debug!("jemalloc malloc_conf: {}", conf),
Err(e) => debug!("jemalloc read malloc_conf failed: {e}"),
/// Internal: dump CPU pprof from existing guard
async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result<PathBuf, String> {
let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?;
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
write_pprof_report_pb(&report, &out)?;
info!("CPU profile exported: {}", out.display());
Ok(out)
}
match std::env::var("MALLOC_CONF") {
Ok(v) => debug!("MALLOC_CONF={}", v),
Err(_) => debug!("MALLOC_CONF is not set"),
}
if let Some(lock) = PROF_CTL.as_ref() {
let ctl = lock.lock().await;
info!(activated = ctl.activated(), "jemalloc profiling status");
} else {
info!("jemalloc profiling controller is NOT available");
}
let _ = epoch::advance();
macro_rules! show {
($name:literal, $reader:expr) => {
match $reader {
Ok(v) => debug!(concat!($name, "={}"), v),
Err(e) => debug!(concat!($name, " read failed: {}"), e),
// Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately.
pub async fn dump_cpu_pprof_for(duration: Duration) -> Result<PathBuf, String> {
if let Some(cell) = CPU_CONT_GUARD.get() {
let guard_slot = cell.lock().await;
if let Some(ref guard) = *guard_slot {
debug!("profiling: using continuous profiler guard for CPU dump");
return dump_cpu_with_guard(guard).await;
}
};
}
show!("allocated", stats::allocated::read());
show!("resident", stats::resident::read());
show!("mapped", stats::mapped::read());
show!("metadata", stats::metadata::read());
show!("active", stats::active::read());
}
// Internal: start continuous CPU profiling
async fn start_cpu_continuous(freq_hz: i32) {
let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone();
let mut slot = cell.lock().await;
if slot.is_some() {
warn!("profiling: continuous CPU guard already running");
return;
}
match pprof::ProfilerGuardBuilder::default()
.frequency(freq_hz)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
{
Ok(guard) => {
*slot = Some(guard);
info!(freq = freq_hz, "start continuous CPU profiling");
}
Err(e) => warn!("start continuous CPU profiling failed: {e}"),
}
}
// Internal: start periodic CPU sampling loop
async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) {
info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling");
tokio::spawn(async move {
loop {
sleep(interval).await;
let guard = match pprof::ProfilerGuard::new(freq_hz) {
Ok(g) => g,
Err(e) => {
warn!("periodic CPU profiler create failed: {e}");
continue;
let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?;
sleep(duration).await;
dump_cpu_with_guard(&guard).await
}
// Public API: dump memory pprof now (jemalloc)
pub async fn dump_memory_pprof_now() -> Result<PathBuf, String> {
let out = output_dir().join(format!("mem_profile_{}.pb", ts()));
let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?;
let prof_ctl_cell = PROF_CTL
.as_ref()
.ok_or_else(|| "jemalloc profiling control not available".to_string())?;
let mut prof_ctl = prof_ctl_cell.lock().await;
if !prof_ctl.activated() {
return Err("jemalloc profiling is not active".to_string());
}
let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?;
f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?;
info!("Memory profile exported: {}", out.display());
Ok(out)
}
// Jemalloc status check (No forced placement, only status observation)
pub async fn check_jemalloc_profiling() {
use tikv_jemalloc_ctl::{config, epoch, stats};
if let Err(e) = epoch::advance() {
warn!("jemalloc epoch advance failed: {e}");
}
match config::malloc_conf::read() {
Ok(conf) => debug!("jemalloc malloc_conf: {}", conf),
Err(e) => debug!("jemalloc read malloc_conf failed: {e}"),
}
match std::env::var("MALLOC_CONF") {
Ok(v) => debug!("MALLOC_CONF={}", v),
Err(_) => debug!("MALLOC_CONF is not set"),
}
if let Some(lock) = PROF_CTL.as_ref() {
let ctl = lock.lock().await;
info!(activated = ctl.activated(), "jemalloc profiling status");
} else {
info!("jemalloc profiling controller is NOT available");
}
let _ = epoch::advance();
macro_rules! show {
($name:literal, $reader:expr) => {
match $reader {
Ok(v) => debug!(concat!($name, "={}"), v),
Err(e) => debug!(concat!($name, " read failed: {}"), e),
}
};
sleep(duration).await;
match guard.report().build() {
Ok(report) => {
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
if let Err(e) = write_pprof_report_pb(&report, &out) {
warn!("write periodic CPU pprof failed: {e}");
} else {
info!("periodic CPU profile exported: {}", out.display());
}
show!("allocated", stats::allocated::read());
show!("resident", stats::resident::read());
show!("mapped", stats::mapped::read());
show!("metadata", stats::metadata::read());
show!("active", stats::active::read());
}
// Internal: start continuous CPU profiling
async fn start_cpu_continuous(freq_hz: i32) {
let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone();
let mut slot = cell.lock().await;
if slot.is_some() {
warn!("profiling: continuous CPU guard already running");
return;
}
match pprof::ProfilerGuardBuilder::default()
.frequency(freq_hz)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
{
Ok(guard) => {
*slot = Some(guard);
info!(freq = freq_hz, "start continuous CPU profiling");
}
Err(e) => warn!("start continuous CPU profiling failed: {e}"),
}
}
// Internal: start periodic CPU sampling loop
async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) {
info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling");
tokio::spawn(async move {
loop {
sleep(interval).await;
let guard = match pprof::ProfilerGuard::new(freq_hz) {
Ok(g) => g,
Err(e) => {
warn!("periodic CPU profiler create failed: {e}");
continue;
}
}
Err(e) => warn!("periodic CPU report build failed: {e}"),
}
}
});
}
// Internal: start periodic memory dump when jemalloc profiling is active
async fn start_memory_periodic(interval: Duration) {
info!(?interval, "start periodic memory pprof dump");
tokio::spawn(async move {
loop {
sleep(interval).await;
let Some(lock) = PROF_CTL.as_ref() else {
debug!("skip memory dump: PROF_CTL not available");
continue;
};
let mut ctl = lock.lock().await;
if !ctl.activated() {
debug!("skip memory dump: jemalloc profiling not active");
continue;
}
let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts()));
match File::create(&out) {
Err(e) => {
error!("periodic mem dump create file failed: {}", e);
continue;
}
Ok(mut f) => match ctl.dump_pprof() {
Ok(bytes) => {
if let Err(e) = f.write_all(&bytes) {
error!("periodic mem dump write failed: {}", e);
};
sleep(duration).await;
match guard.report().build() {
Ok(report) => {
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
if let Err(e) = write_pprof_report_pb(&report, &out) {
warn!("write periodic CPU pprof failed: {e}");
} else {
info!("periodic memory profile dumped to {}", out.display());
info!("periodic CPU profile exported: {}", out.display());
}
}
Err(e) => error!("periodic mem dump failed: {}", e),
},
Err(e) => warn!("periodic CPU report build failed: {e}"),
}
}
});
}
// Internal: start periodic memory dump when jemalloc profiling is active
async fn start_memory_periodic(interval: Duration) {
info!(?interval, "start periodic memory pprof dump");
tokio::spawn(async move {
loop {
sleep(interval).await;
let Some(lock) = PROF_CTL.as_ref() else {
debug!("skip memory dump: PROF_CTL not available");
continue;
};
let mut ctl = lock.lock().await;
if !ctl.activated() {
debug!("skip memory dump: jemalloc profiling not active");
continue;
}
let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts()));
match File::create(&out) {
Err(e) => {
error!("periodic mem dump create file failed: {}", e);
continue;
}
Ok(mut f) => match ctl.dump_pprof() {
Ok(bytes) => {
if let Err(e) = f.write_all(&bytes) {
error!("periodic mem dump write failed: {}", e);
} else {
info!("periodic memory profile dumped to {}", out.display());
}
}
Err(e) => error!("periodic mem dump failed: {}", e),
},
}
}
});
}
// Public: unified init entry, avoid duplication/conflict
pub async fn init_from_env() {
let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING);
if !enabled {
debug!("profiling: disabled by env");
return;
}
});
}
// Public: unified init entry, avoid duplication/conflict
pub async fn init_from_env() {
let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING);
if !enabled {
debug!("profiling: disabled by env");
return;
}
// Jemalloc state check once (no dump)
check_jemalloc_profiling().await;
// Jemalloc state check once (no dump)
check_jemalloc_profiling().await;
// CPU
let cpu_mode = read_cpu_mode();
let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS));
let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS));
// CPU
let cpu_mode = read_cpu_mode();
let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS));
let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS));
match cpu_mode {
CpuMode::Off => debug!("profiling: CPU mode off"),
CpuMode::Continuous => start_cpu_continuous(cpu_freq).await,
CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await,
}
match cpu_mode {
CpuMode::Off => debug!("profiling: CPU mode off"),
CpuMode::Continuous => start_cpu_continuous(cpu_freq).await,
CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await,
}
// Memory
let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC);
let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS));
if mem_periodic {
start_memory_periodic(mem_interval).await;
// Memory
let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC);
let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS));
if mem_periodic {
start_memory_periodic(mem_interval).await;
}
}
}
#[cfg(target_os = "linux")]
pub use linux_impl::{dump_cpu_pprof_for, dump_memory_pprof_now, init_from_env};

View File

@@ -33,7 +33,7 @@ use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServ
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3Service, service::S3ServiceBuilder};
use socket2::SockRef;
use socket2::{SockRef, TcpKeepalive};
use std::io::{Error, Result};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -371,6 +371,20 @@ pub async fn start_http_server(
};
let socket_ref = SockRef::from(&socket);
// Enable TCP Keepalive to detect dead clients (e.g. power loss)
// Idle: 10s, Interval: 5s, Retries: 3
let ka = TcpKeepalive::new()
.with_time(Duration::from_secs(10))
.with_interval(Duration::from_secs(5));
#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))]
let ka = ka.with_retries(3);
if let Err(err) = socket_ref.set_tcp_keepalive(&ka) {
warn!(?err, "Failed to set TCP_KEEPALIVE");
}
if let Err(err) = socket_ref.set_tcp_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}

View File

@@ -108,7 +108,7 @@ use rustfs_s3select_api::{
use rustfs_s3select_query::get_global_db;
use rustfs_targets::{
EventName,
arn::{TargetID, TargetIDError},
arn::{ARN, TargetID, TargetIDError},
};
use rustfs_utils::{
CompressionAlgorithm, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent,
@@ -1418,12 +1418,17 @@ impl S3 for FS {
..Default::default()
};
let opts = ObjectOptions {
version_id: object.version_id.map(|v| v.to_string()),
versioned: version_cfg.prefix_enabled(&object.object_name),
version_suspended: version_cfg.suspended(),
..Default::default()
};
let metadata = extract_metadata(&req.headers);
let opts: ObjectOptions = del_opts(
&bucket,
&object.object_name,
object.version_id.map(|f| f.to_string()),
&req.headers,
metadata,
)
.await
.map_err(ApiError::from)?;
let mut goi = ObjectInfo::default();
let mut gerr = None;
@@ -1684,10 +1689,6 @@ impl S3 for FS {
version_id,
part_number,
range,
if_none_match,
if_match,
if_modified_since,
if_unmodified_since,
..
} = req.input.clone();
@@ -1880,35 +1881,6 @@ impl S3 for FS {
let info = reader.object_info;
if let Some(match_etag) = if_none_match {
if info.etag.as_ref().is_some_and(|etag| etag == match_etag.as_str()) {
return Err(S3Error::new(S3ErrorCode::NotModified));
}
}
if let Some(modified_since) = if_modified_since {
// obj_time < givenTime + 1s
if info.mod_time.is_some_and(|mod_time| {
let give_time: OffsetDateTime = modified_since.into();
mod_time < give_time.add(time::Duration::seconds(1))
}) {
return Err(S3Error::new(S3ErrorCode::NotModified));
}
}
if let Some(match_etag) = if_match {
if info.etag.as_ref().is_some_and(|etag| etag != match_etag.as_str()) {
return Err(S3Error::new(S3ErrorCode::PreconditionFailed));
}
} else if let Some(unmodified_since) = if_unmodified_since {
if info.mod_time.is_some_and(|mod_time| {
let give_time: OffsetDateTime = unmodified_since.into();
mod_time > give_time.add(time::Duration::seconds(1))
}) {
return Err(S3Error::new(S3ErrorCode::PreconditionFailed));
}
}
debug!(object_size = info.size, part_count = info.parts.len(), "GET object metadata snapshot");
for part in &info.parts {
debug!(
@@ -2773,7 +2745,7 @@ impl S3 for FS {
.collect::<Vec<_>>();
let output = ListObjectVersionsOutput {
// is_truncated: Some(object_infos.is_truncated),
is_truncated: Some(object_infos.is_truncated),
max_keys: Some(key_count),
delimiter,
name: Some(bucket),
@@ -4194,6 +4166,13 @@ impl S3 for FS {
..
} = req.input.clone();
if tagging.tag_set.len() > 10 {
// TOTO: Note that Amazon S3 limits the maximum number of tags to 10 tags per object.
// Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
// Reference: https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_PutObjectTagging.html
// https://github.com/minio/mint/blob/master/run/core/aws-sdk-go-v2/main.go#L1647
}
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
@@ -4890,20 +4869,24 @@ impl S3 for FS {
let parse_rules = async {
let mut event_rules = Vec::new();
process_queue_configurations(
&mut event_rules,
notification_configuration.queue_configurations.clone(),
TargetID::from_str,
);
process_topic_configurations(
&mut event_rules,
notification_configuration.topic_configurations.clone(),
TargetID::from_str,
);
process_queue_configurations(&mut event_rules, notification_configuration.queue_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
process_topic_configurations(&mut event_rules, notification_configuration.topic_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
process_lambda_configurations(
&mut event_rules,
notification_configuration.lambda_function_configurations.clone(),
TargetID::from_str,
|arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
},
);
event_rules

View File

@@ -93,6 +93,8 @@ pub async fn del_opts(
.map(|v| v.to_str().unwrap() == "true")
.unwrap_or_default();
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
Ok(opts)
}
@@ -133,6 +135,8 @@ pub async fn get_opts(
opts.version_suspended = version_suspended;
opts.versioned = versioned;
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
Ok(opts)
}