Compare commits

..

14 Commits

Author SHA1 Message Date
houseme
04bf4b0f98 feat: add S3 object legal hold and retention management APIs (#476)
* add bucket rule

* translation

* improve code for event notice add rule
2025-09-02 00:14:10 +08:00
likewu
7462be983a Feature up/ilm (#470)
* fix delete-marker expiration. add api_restore.

* time retry object upload

* lock file

* make fmt

* restore object

* serde-rs-xml -> quick-xml

* scanner_item prefix object_name

* object_path

* object_name

* fi version_purge_status

* old_dir None

Co-authored-by: houseme <housemecn@gmail.com>
2025-09-01 16:11:28 +08:00
houseme
5264503e47 build(deps): bump aws-config and clap upgrade version (#472) 2025-08-30 20:30:46 +08:00
dependabot[bot]
3b8cb0df41 build(deps): bump tracing-subscriber in the cargo group (#471)
Bumps the cargo group with 1 update: [tracing-subscriber](https://github.com/tokio-rs/tracing).


Updates `tracing-subscriber` from 0.3.19 to 0.3.20
- [Release notes](https://github.com/tokio-rs/tracing/releases)
- [Commits](https://github.com/tokio-rs/tracing/compare/tracing-subscriber-0.3.19...tracing-subscriber-0.3.20)

---
updated-dependencies:
- dependency-name: tracing-subscriber
  dependency-version: 0.3.20
  dependency-type: direct:production
  dependency-group: cargo
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-30 19:02:26 +08:00
houseme
9aebef31ff refactor(admin/event): optimize notification target routing and logic handling (#463)
* add

* fix

* add target arns list

* improve code for arns

* upgrade crates version

* fix

* improve import code mod.rs

* fix

* improve

* improve code

* improve code

* fix

* fmt
2025-08-27 09:39:25 +08:00
zzhpro
c2d782bed1 feat: support conditional writes (#409)
* feat: support conditional writes

* refactor: avoid using unwrap

* fix: obtain lock before check in CompleteMultiPartUpload

* refactor: do not obtain a lock when getting object meta

* fix: avoid using unwrap and modifying incoming arguments

* test: add e2e tests for conditional writes

---------

Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-08-25 18:35:24 -07:00
likewu
e00f5be746 Fix/addtier (#454)
* fix retry

* fmt

* fix

* fix

* fix

---------

Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-08-25 10:24:48 +08:00
shiro.lee
e23297f695 fix: add the default port number to the given server domains (#373) (#458) 2025-08-25 07:49:36 +08:00
0xdx2
d6840a6e04 feat: add support for range requests in upload_part_copy and implement parse_copy_source_range function (#453)
* feat: add support for range requests in upload_part_copy and implement parse_copy_source_range function

* style: format debug and error logging for improved readability

* feat: implement parse_copy_source_range function and improve error handling in range requests

* Update rustfs/src/storage/ecfs.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: correct return type in parse_copy_source_range function

* fix: remove unnecessary unwrap in parse_copy_source_range tests

* fix: simplify etag comparison in copy condition validation

---------

Co-authored-by: DamonXue <damonxue2@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-08-24 10:54:48 +08:00
houseme
3557a52dc4 Potential fix for code scanning alert no. 7: Workflow does not contain permissions (#457)
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2025-08-24 10:10:04 +08:00
houseme
fd2aab2bd9 fix:revet #443 #446 (#452)
* fix: revet #443 #446

* fix
2025-08-23 17:30:06 +08:00
houseme
f1c50fcb74 fix:Workflow does not contain permissions (#451) 2025-08-23 12:35:23 +08:00
houseme
bdcba3460e Potential fix for code scanning alert no. 13: Code injection (#447)
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2025-08-23 10:05:00 +08:00
houseme
8857f31b07 Comment out error log for missing subscribers (#448) 2025-08-22 21:15:46 +08:00
41 changed files with 2638 additions and 793 deletions

View File

@@ -31,6 +31,9 @@ on:
- cron: '0 0 * * 0' # Weekly on Sunday at midnight UTC
workflow_dispatch:
permissions:
contents: read
env:
CARGO_TERM_COLOR: always

View File

@@ -70,6 +70,9 @@ on:
default: true
type: boolean
permissions:
contents: read
env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1

View File

@@ -59,6 +59,9 @@ on:
- cron: "0 0 * * 0" # Weekly on Sunday at midnight UTC
workflow_dispatch:
permissions:
contents: read
env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1

View File

@@ -58,6 +58,10 @@ on:
type: boolean
env:
CONCLUSION: ${{ github.event.workflow_run.conclusion }}
HEAD_BRANCH: ${{ github.event.workflow_run.head_branch }}
HEAD_SHA: ${{ github.event.workflow_run.head_sha }}
TRIGGERING_EVENT: ${{ github.event.workflow_run.event }}
DOCKERHUB_USERNAME: rustfs
CARGO_TERM_COLOR: always
REGISTRY_DOCKERHUB: rustfs/rustfs
@@ -102,27 +106,27 @@ jobs:
# Check if the triggering workflow was successful
# If the workflow succeeded, it means ALL builds (including Linux x86_64 and aarch64) succeeded
if [[ "${{ github.event.workflow_run.conclusion }}" == "success" ]]; then
if [[ "$CONCLUSION" == "success" ]]; then
echo "✅ Build workflow succeeded, all builds including Linux are successful"
should_build=true
should_push=true
else
echo "❌ Build workflow failed (conclusion: ${{ github.event.workflow_run.conclusion }}), skipping Docker build"
echo "❌ Build workflow failed (conclusion: $CONCLUSION), skipping Docker build"
should_build=false
fi
# Extract version info from commit message or use commit SHA
# Use Git to generate consistent short SHA (ensures uniqueness like build.yml)
short_sha=$(git rev-parse --short "${{ github.event.workflow_run.head_sha }}")
short_sha=$(git rev-parse --short "$HEAD_SHA")
# Determine build type based on triggering workflow event and ref
triggering_event="${{ github.event.workflow_run.event }}"
head_branch="${{ github.event.workflow_run.head_branch }}"
triggering_event="$TRIGGERING_EVENT"
head_branch="$HEAD_BRANCH"
echo "🔍 Analyzing triggering workflow:"
echo " 📋 Event: $triggering_event"
echo " 🌿 Head branch: $head_branch"
echo " 📎 Head SHA: ${{ github.event.workflow_run.head_sha }}"
echo " 📎 Head SHA: $HEAD_SHA"
# Check if this was triggered by a tag push
if [[ "$triggering_event" == "push" ]]; then
@@ -174,10 +178,10 @@ jobs:
fi
echo "🔄 Build triggered by workflow_run:"
echo " 📋 Conclusion: ${{ github.event.workflow_run.conclusion }}"
echo " 🌿 Branch: ${{ github.event.workflow_run.head_branch }}"
echo " 📎 SHA: ${{ github.event.workflow_run.head_sha }}"
echo " 🎯 Event: ${{ github.event.workflow_run.event }}"
echo " 📋 Conclusion: $CONCLUSION"
echo " 🌿 Branch: $HEAD_BRANCH"
echo " 📎 SHA: $HEAD_SHA"
echo " 🎯 Event: $TRIGGERING_EVENT"
elif [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
# Manual trigger

View File

@@ -15,9 +15,13 @@
name: "issue-translator"
on:
issue_comment:
types: [created]
types: [ created ]
issues:
types: [opened]
types: [ opened ]
permissions:
contents: read
issues: write
jobs:
build:

View File

@@ -30,6 +30,9 @@ on:
default: "120"
type: string
permissions:
contents: read
env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1

13
.vscode/launch.json vendored
View File

@@ -85,6 +85,19 @@
"sourceLanguages": [
"rust"
],
},
{
"name": "Debug executable target/debug/test",
"type": "lldb",
"request": "launch",
"program": "${workspaceFolder}/target/debug/deps/lifecycle_integration_test-5eb7590b8f3bea55",
"args": [],
"cwd": "${workspaceFolder}",
//"stopAtEntry": false,
//"preLaunchTask": "cargo build",
"sourceLanguages": [
"rust"
],
}
]
}

414
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -97,7 +97,7 @@ async-recursion = "1.1.1"
async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.5" }
aws-config = { version = "1.8.6" }
aws-sdk-s3 = "1.101.0"
axum = "0.8.4"
base64-simd = "0.8.0"
@@ -106,22 +106,22 @@ brotli = "8.0.2"
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.0.1"
byteorder = "1.5.0"
cfg-if = "1.0.1"
cfg-if = "1.0.3"
crc-fast = "1.4.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.45", features = ["derive", "env"] }
clap = { version = "4.5.46", features = ["derive", "env"] }
const-str = { version = "0.6.4", features = ["std", "proc"] }
crc32fast = "1.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
dashmap = "6.1.0"
datafusion = "46.0.1"
derive_builder = "0.20.2"
enumset = "1.1.9"
enumset = "1.1.10"
flatbuffers = "25.2.10"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
form_urlencoded = "1.2.1"
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
@@ -175,15 +175,15 @@ path-absolutize = "3.1.1"
path-clean = "1.0.1"
blake3 = { version = "1.8.2" }
pbkdf2 = "0.12.2"
percent-encoding = "2.3.1"
percent-encoding = "2.3.2"
pin-project-lite = "0.2.16"
prost = "0.14.1"
pretty_assertions = "1.4.1"
quick-xml = "0.38.1"
quick-xml = "0.38.3"
rand = "0.9.2"
rdkafka = { version = "0.38.0", features = ["tokio"] }
reed-solomon-simd = { version = "3.0.1" }
regex = { version = "1.11.1" }
regex = { version = "1.11.2" }
reqwest = { version = "0.12.23", default-features = false, features = [
"rustls-tls",
"charset",
@@ -193,7 +193,7 @@ reqwest = { version = "0.12.23", default-features = false, features = [
"json",
"blocking",
] }
rmcp = { version = "0.5.0" }
rmcp = { version = "0.6.1" }
rmp = "0.8.14"
rmp-serde = "1.3.0"
rsa = "0.9.8"
@@ -211,20 +211,20 @@ serde_urlencoded = "0.7.1"
serial_test = "3.2.0"
sha1 = "0.10.6"
sha2 = "0.10.9"
shadow-rs = { version = "1.2.1", default-features = false }
shadow-rs = { version = "1.3.0", default-features = false }
siphasher = "1.0.1"
smallvec = { version = "1.15.1", features = ["serde"] }
snafu = "0.8.6"
snafu = "0.8.8"
snap = "1.1.1"
socket2 = "0.6.0"
strum = { version = "0.27.2", features = ["derive"] }
sysinfo = "0.37.0"
sysctl = "0.6.0"
tempfile = "3.20.0"
tempfile = "3.21.0"
temp-env = "0.3.6"
test-case = "3.3.1"
thiserror = "2.0.15"
time = { version = "0.3.41", features = [
thiserror = "2.0.16"
time = { version = "0.3.42", features = [
"std",
"parsing",
"formatting",
@@ -246,9 +246,9 @@ tracing = "0.1.41"
tracing-core = "0.1.34"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.4"
url = "2.5.7"
urlencoding = "2.1.3"
uuid = { version = "1.18.0", features = [
"v4",

View File

@@ -22,6 +22,7 @@ tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
time.workspace = true
serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }

View File

@@ -19,14 +19,12 @@ use std::{
};
use ecstore::{
disk::{DiskAPI, DiskStore, WalkDirOptions},
disk::{Disk, DiskAPI, DiskStore, WalkDirOptions},
set_disk::SetDisks,
};
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
use rustfs_filemeta::{MetacacheReader, VersionType};
use tokio::sync::{Mutex, RwLock};
use tokio::task::yield_now;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@@ -40,9 +38,11 @@ use crate::{
};
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_common::data_usage::SizeSummary;
use rustfs_common::metrics::{Metric, Metrics, globalMetrics};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
/// Custom scan mode enum for AHM scanner
@@ -72,27 +72,18 @@ pub struct ScannerConfig {
pub scan_mode: ScanMode,
/// Whether to enable data usage statistics collection
pub enable_data_usage_stats: bool,
/// Skip objects modified within this window to avoid racing with active writes
pub skip_recently_modified_within: Duration,
/// Throttle: after scanning this many objects, sleep for a short delay to reduce IO contention
pub throttle_every_n_objects: u32,
/// Throttle delay duration per throttle tick
pub throttle_delay: Duration,
}
impl Default for ScannerConfig {
fn default() -> Self {
Self {
scan_interval: Duration::from_secs(3600), // 1 hour
deep_scan_interval: Duration::from_secs(86400), // 1 day
max_concurrent_scans: 10,
scan_interval: Duration::from_secs(60), // 1 minute
deep_scan_interval: Duration::from_secs(3600), // 1 hour
max_concurrent_scans: 20,
enable_healing: true,
enable_metrics: true,
scan_mode: ScanMode::Normal,
enable_data_usage_stats: true,
skip_recently_modified_within: Duration::from_secs(600), // 10 minutes
throttle_every_n_objects: 200,
throttle_delay: Duration::from_millis(2),
}
}
}
@@ -1255,11 +1246,6 @@ impl Scanner {
let mut objects_scanned = 0u64;
let mut objects_with_issues = 0u64;
let mut object_metadata = HashMap::new();
// snapshot throttling/grace config
let cfg_snapshot = self.config.read().await.clone();
let throttle_n = cfg_snapshot.throttle_every_n_objects.max(1);
let throttle_delay = cfg_snapshot.throttle_delay;
let skip_recent = cfg_snapshot.skip_recently_modified_within;
// Process each object entry
while let Ok(Some(mut entry)) = reader.peek().await {
@@ -1270,33 +1256,11 @@ impl Scanner {
// Parse object metadata
if let Ok(file_meta) = entry.xl_meta() {
// Skip recently modified objects to avoid racing with active writes
if let Some(latest_mt) = file_meta.versions.iter().filter_map(|v| v.header.mod_time).max() {
let ts_nanos = latest_mt.unix_timestamp_nanos();
let latest_st = if ts_nanos >= 0 {
std::time::UNIX_EPOCH + Duration::from_nanos(ts_nanos as u64)
} else {
std::time::UNIX_EPOCH
};
if let Ok(elapsed) = SystemTime::now().duration_since(latest_st) {
if elapsed < skip_recent {
debug!(
"Skipping recently modified object {}/{} (elapsed {:?} < {:?})",
bucket, entry.name, elapsed, skip_recent
);
if (objects_scanned as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
}
}
if file_meta.versions.is_empty() {
objects_with_issues += 1;
warn!("Object {} has no versions", entry.name);
// 对象元数据损坏提交元数据heal任务
// 对象元数据损坏,提交元数据 heal 任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
@@ -1320,10 +1284,81 @@ impl Scanner {
} else {
// Apply lifecycle actions
if let Some(lifecycle_config) = &lifecycle_config {
let mut scanner_item =
ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await {
error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e);
if let Disk::Local(_local_disk) = &**disk {
let vcfg = BucketVersioningSys::get(bucket).await.ok();
let mut scanner_item = ScannerItem {
bucket: bucket.to_string(),
object_name: entry.name.clone(),
lifecycle: Some(lifecycle_config.clone()),
versioning: versioning_config.clone(),
};
//ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) {
Ok(fivs) => fivs,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
let mut size_s = SizeSummary::default();
let obj_infos = match scanner_item.apply_versions_actions(&fivs.versions).await {
Ok(obj_infos) => obj_infos,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
let versioned = if let Some(vcfg) = vcfg.as_ref() {
vcfg.versioned(&scanner_item.object_name)
} else {
false
};
#[allow(unused_assignments)]
let mut obj_deleted = false;
for info in obj_infos.iter() {
let sz: i64;
(obj_deleted, sz) = scanner_item.apply_actions(info, &mut size_s).await;
if obj_deleted {
break;
}
let actual_sz = match info.get_actual_size() {
Ok(size) => size,
Err(_) => continue,
};
if info.delete_marker {
size_s.delete_markers += 1;
}
if info.version_id.is_some() && sz == actual_sz {
size_s.versions += 1;
}
size_s.total_size += sz as usize;
if info.delete_marker {
continue;
}
}
for free_version in fivs.free_versions.iter() {
let _obj_info = rustfs_ecstore::store_api::ObjectInfo::from_file_info(
free_version,
&scanner_item.bucket,
&scanner_item.object_name,
versioned,
);
}
// todo: global trace
/*if obj_deleted {
return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into());
}*/
}
}
@@ -1334,7 +1369,7 @@ impl Scanner {
objects_with_issues += 1;
warn!("Failed to parse metadata for object {}", entry.name);
// 对象元数据解析失败提交元数据heal任务
// 对象元数据解析失败,提交元数据 heal 任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
@@ -1357,12 +1392,6 @@ impl Scanner {
}
}
}
// lightweight throttling to reduce IO contention
if (objects_scanned as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
}
// Update metrics
@@ -1450,14 +1479,6 @@ impl Scanner {
// Step 2: Identify missing objects and perform EC verification
let mut objects_needing_heal = 0u64;
let mut objects_with_ec_issues = 0u64;
// snapshot config for gating and throttling
let cfg_snapshot = self.config.read().await.clone();
let skip_recent = cfg_snapshot.skip_recently_modified_within;
let throttle_n = cfg_snapshot.throttle_every_n_objects.max(1);
let throttle_delay = cfg_snapshot.throttle_delay;
let deep_mode = cfg_snapshot.scan_mode == ScanMode::Deep;
// cfg_snapshot is a plain value clone; no guard to explicitly drop here.
let mut iter_count: u64 = 0;
for (bucket, objects) in &all_objects {
// Skip internal RustFS system bucket to avoid lengthy checks on temporary/trash objects
@@ -1465,7 +1486,6 @@ impl Scanner {
continue;
}
for object_name in objects {
iter_count += 1;
let key = (bucket.clone(), object_name.clone());
let empty_vec = Vec::new();
let locations = object_locations.get(&key).unwrap_or(&empty_vec);
@@ -1497,51 +1517,8 @@ impl Scanner {
continue;
}
// Skip recently modified objects to avoid racing with writes
let is_recent = (|| {
for &disk_idx in locations {
if let Some(bucket_map) = all_disk_objects.get(disk_idx) {
if let Some(file_map) = bucket_map.get(bucket) {
if let Some(fm) = file_map.get(object_name) {
if let Some(mt) = fm.versions.iter().filter_map(|v| v.header.mod_time).max() {
let ts_nanos = mt.unix_timestamp_nanos();
let mt_st = if ts_nanos >= 0 {
std::time::UNIX_EPOCH + Duration::from_nanos(ts_nanos as u64)
} else {
std::time::UNIX_EPOCH
};
if let Ok(elapsed) = SystemTime::now().duration_since(mt_st) {
if elapsed < skip_recent {
return true;
}
}
}
}
}
}
}
false
})();
if is_recent {
debug!("Skipping missing-objects heal check for recently modified {}/{}", bucket, object_name);
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
// Only attempt missing-object heal checks in Deep scan mode
if !deep_mode {
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
// Check if object is missing from some disks
if !locations.is_empty() && locations.len() < disks.len() {
if locations.len() < disks.len() {
// Before submitting heal, confirm the object still exists logically.
let should_heal = if let Some(store) = rustfs_ecstore::new_object_layer_fn() {
match store.get_object_info(bucket, object_name, &Default::default()).await {
@@ -1569,10 +1546,6 @@ impl Scanner {
};
if !should_heal {
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
objects_needing_heal += 1;
@@ -1616,11 +1589,6 @@ impl Scanner {
warn!("Object integrity verification failed for object {}/{}: {}", bucket, object_name, e);
}
}
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
}
}

View File

@@ -13,66 +13,175 @@
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use time::OffsetDateTime;
use crate::error::Result;
use rustfs_common::data_usage::SizeSummary;
use rustfs_common::metrics::IlmAction;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle};
use rustfs_ecstore::bucket::lifecycle::{
bucket_lifecycle_audit::LcEventSrc,
bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle},
lifecycle,
lifecycle::Lifecycle,
};
use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config;
use rustfs_ecstore::bucket::object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::FileMetaVersion;
use rustfs_filemeta::metacache::MetaCacheEntry;
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
use rustfs_filemeta::FileInfo;
use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig;
use tracing::info;
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB
#[derive(Clone)]
pub struct ScannerItem {
bucket: String,
lifecycle: Option<Arc<LifecycleConfig>>,
versioning: Option<Arc<VersioningConfig>>,
pub bucket: String,
pub object_name: String,
pub lifecycle: Option<Arc<LifecycleConfig>>,
pub versioning: Option<Arc<VersioningConfig>>,
}
impl ScannerItem {
pub fn new(bucket: String, lifecycle: Option<Arc<LifecycleConfig>>, versioning: Option<Arc<VersioningConfig>>) -> Self {
Self {
bucket,
object_name: "".to_string(),
lifecycle,
versioning,
}
}
pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> {
info!("apply_actions called for object: {}", object);
if self.lifecycle.is_none() {
info!("No lifecycle config for object: {}", object);
return Ok(());
pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
let obj_infos = self.apply_newer_noncurrent_version_limit(fivs).await?;
if obj_infos.len() >= SCANNER_EXCESS_OBJECT_VERSIONS.load(Ordering::SeqCst) as usize {
// todo
}
info!("Lifecycle config exists for object: {}", object);
let file_meta = match meta.xl_meta() {
Ok(meta) => meta,
Err(e) => {
tracing::error!("Failed to get xl_meta for {}: {}", object, e);
return Ok(());
let mut cumulative_size = 0;
for obj_info in obj_infos.iter() {
cumulative_size += obj_info.size;
}
if cumulative_size >= SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE.load(Ordering::SeqCst) as i64 {
//todo
}
Ok(obj_infos)
}
pub async fn apply_newer_noncurrent_version_limit(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
let lock_enabled = if let Some(rcfg) = BucketObjectLockSys::get(&self.bucket).await {
rcfg.mode.is_some()
} else {
false
};
let _vcfg = BucketVersioningSys::get(&self.bucket).await?;
let versioned = match BucketVersioningSys::get(&self.bucket).await {
Ok(vcfg) => vcfg.versioned(&self.object_name),
Err(_) => false,
};
let mut object_infos = Vec::with_capacity(fivs.len());
if self.lifecycle.is_none() {
for info in fivs.iter() {
object_infos.push(ObjectInfo::from_file_info(info, &self.bucket, &self.object_name, versioned));
}
};
return Ok(object_infos);
}
let latest_version = file_meta.versions.first().cloned().unwrap_or_default();
let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default();
let event = self
.lifecycle
.as_ref()
.expect("lifecycle err.")
.clone()
.noncurrent_versions_expiration_limit(&lifecycle::ObjectOpts {
name: self.object_name.clone(),
..Default::default()
})
.await;
let lim = event.newer_noncurrent_versions;
if lim == 0 || fivs.len() <= lim + 1 {
for fi in fivs.iter() {
object_infos.push(ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned));
}
return Ok(object_infos);
}
let obj_info = ObjectInfo {
bucket: self.bucket.clone(),
name: object.to_string(),
version_id: latest_version.header.version_id,
mod_time: latest_version.header.mod_time,
size: file_meta_version.object.as_ref().map_or(0, |o| o.size),
user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(),
..Default::default()
};
let overflow_versions = &fivs[lim + 1..];
for fi in fivs[..lim + 1].iter() {
object_infos.push(ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned));
}
self.apply_lifecycle(&obj_info).await;
let mut to_del = Vec::<ObjectToDelete>::with_capacity(overflow_versions.len());
for fi in overflow_versions.iter() {
let obj = ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned);
if lock_enabled && enforce_retention_for_deletion(&obj) {
//if enforce_retention_for_deletion(&obj) {
/*if self.debug {
if obj.version_id.is_some() {
info!("lifecycle: {} v({}) is locked, not deleting\n", obj.name, obj.version_id.expect("err"));
} else {
info!("lifecycle: {} is locked, not deleting\n", obj.name);
}
}*/
object_infos.push(obj);
continue;
}
Ok(())
if OffsetDateTime::now_utc().unix_timestamp()
< lifecycle::expected_expiry_time(obj.successor_mod_time.expect("err"), event.noncurrent_days as i32)
.unix_timestamp()
{
object_infos.push(obj);
continue;
}
to_del.push(ObjectToDelete {
object_name: obj.name,
version_id: obj.version_id,
});
}
if !to_del.is_empty() {
let mut expiry_state = GLOBAL_ExpiryState.write().await;
expiry_state.enqueue_by_newer_noncurrent(&self.bucket, to_del, event).await;
}
Ok(object_infos)
}
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
let (action, _size) = self.apply_lifecycle(oi).await;
info!(
"apply_actions {} {} {:?} {:?}",
oi.bucket.clone(),
oi.name.clone(),
oi.version_id.clone(),
oi.user_defined.clone()
);
// Create a mutable clone if you need to modify fields
/*let mut oi = oi.clone();
oi.replication_status = ReplicationStatusType::from(
oi.user_defined
.get("x-amz-bucket-replication-status")
.unwrap_or(&"PENDING".to_string()),
);
info!("apply status is: {:?}", oi.replication_status);
self.heal_replication(&oi, _size_s).await;*/
if action.delete_all() {
return (true, 0);
}
(false, oi.size)
}
async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) {

View File

@@ -1,3 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::heal::{
manager::{HealConfig, HealManager},
storage::{ECStoreHealStorage, HealStorageAPI},
@@ -108,17 +122,11 @@ async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage
/// Test helper: Create a test bucket
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
match (**ecstore).make_bucket(bucket_name, &Default::default()).await {
Ok(_) => info!("Created test bucket: {}", bucket_name),
Err(e) => {
// If the bucket already exists from a previous test run in the shared env, ignore.
if matches!(e, rustfs_ecstore::error::StorageError::BucketExists(_)) {
info!("Bucket already exists, continuing: {}", bucket_name);
} else {
panic!("Failed to create test bucket: {e:?}");
}
}
}
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object

View File

@@ -1,3 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig};
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
@@ -6,16 +20,21 @@ use rustfs_ecstore::{
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier::TierConfigMgr,
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use serial_test::serial;
use std::sync::Once;
use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tokio::sync::RwLock;
use tracing::info;
use tracing::warn;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
static GLOBAL_TIER_CONFIG_MGR: OnceLock<Arc<RwLock<TierConfigMgr>>> = OnceLock::new();
fn init_tracing() {
INIT.call_once(|| {
@@ -99,6 +118,8 @@ async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
let _ = GLOBAL_TIER_CONFIG_MGR.set(TierConfigMgr::new());
(disk_paths, ecstore)
}
@@ -144,14 +165,123 @@ async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::erro
Ok(())
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
<ExpiredObjectDeleteMarker>true</ExpiredObjectDeleteMarker>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
#[allow(dead_code)]
async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Transition>
<Days>0</Days>
<StorageClass>COLDTIER</StorageClass>
</Transition>
</Rule>
<Rule>
<ID>test-rule2</ID>
<Status>Desabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<NoncurrentVersionTransition>
<NoncurrentDays>0</NoncurrentDays>
<StorageClass>COLDTIER</StorageClass>
</NoncurrentVersionTransition>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Create a test tier
#[allow(dead_code)]
async fn create_test_tier() {
let args = TierConfig {
version: "v1".to_string(),
tier_type: TierType::MinIO,
name: "COLDTIER".to_string(),
s3: None,
rustfs: None,
minio: Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://127.0.0.1:9020".to_string(),
prefix: "mypre3/".to_string(),
region: "".to_string(),
..Default::default()
}),
};
let mut tier_config_mgr = GLOBAL_TIER_CONFIG_MGR.get().unwrap().write().await;
if let Err(err) = tier_config_mgr.add(args, false).await {
warn!("tier_config_mgr add failed, e: {:?}", err);
panic!("tier add failed. {err}");
}
if let Err(e) = tier_config_mgr.save().await {
warn!("tier_config_mgr save failed, e: {:?}", e);
panic!("tier save failed");
}
info!("Created test tier: {}", "COLDTIER");
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok()
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {:?}", oi);
oi.delete_marker
} else {
panic!("object_is_delete_marker is error");
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
info!("oi: {:?}", oi);
!oi.transitioned_object.status.is_empty()
} else {
panic!("object_is_transitioned is error");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
#[ignore = "Please run it manually."]
async fn test_lifecycle_expiry_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
@@ -208,11 +338,12 @@ async fn test_lifecycle_expiry_basic() {
// Wait a bit more for background workers to process expiry tasks
tokio::time::sleep(Duration::from_secs(5)).await;
// Check if object has been expired (deleted)
let object_still_exists = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {object_still_exists}");
// Check if object has been expired (delete_marker)
//let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await;
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object is_delete_marker after lifecycle processing: {check_result}");
if object_still_exists {
if !check_result {
println!("❌ Object was not deleted by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
@@ -233,7 +364,7 @@ async fn test_lifecycle_expiry_basic() {
println!("✅ Object was successfully deleted by lifecycle processing");
}
assert!(!object_still_exists);
assert!(check_result);
println!("✅ Object successfully expired");
// Stop scanner
@@ -242,3 +373,193 @@ async fn test_lifecycle_expiry_basic() {
println!("Lifecycle expiry basic test completed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_lifecycle_expiry_deletemarker() {
let (_disk_paths, ecstore) = setup_test_env().await;
// Create test bucket and object
let bucket_name = "test-lifecycle-bucket";
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_bucket(&ecstore, bucket_name).await;
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name, object_name).await);
println!("✅ Object exists before lifecycle processing");
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
set_bucket_lifecycle_deletemarker(bucket_name)
.await
.expect("Failed to set lifecycle configuration");
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
// Verify lifecycle configuration was set
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
Ok(bucket_meta) => {
assert!(bucket_meta.lifecycle_config.is_some());
println!("✅ Bucket metadata retrieved successfully");
}
Err(e) => {
println!("❌ Error retrieving bucket metadata: {e:?}");
}
}
// Create scanner with very short intervals for testing
let scanner_config = ScannerConfig {
scan_interval: Duration::from_millis(100),
deep_scan_interval: Duration::from_millis(500),
max_concurrent_scans: 1,
..Default::default()
};
let scanner = Scanner::new(Some(scanner_config), None);
// Start scanner
scanner.start().await.expect("Failed to start scanner");
println!("✅ Scanner started");
// Wait for scanner to process lifecycle rules
tokio::time::sleep(Duration::from_secs(2)).await;
// Manually trigger a scan cycle to ensure lifecycle processing
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
println!("✅ Manual scan cycle completed");
// Wait a bit more for background workers to process expiry tasks
tokio::time::sleep(Duration::from_secs(5)).await;
// Check if object has been expired (deleted)
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {check_result}");
if !check_result {
println!("❌ Object was not deleted by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
Ok(obj_info) => {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
}
Err(e) => {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("✅ Object was successfully deleted by lifecycle processing");
}
assert!(check_result);
println!("✅ Object successfully expired");
// Stop scanner
let _ = scanner.stop().await;
println!("✅ Scanner stopped");
println!("Lifecycle expiry basic test completed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
//create_test_tier().await;
// Create test bucket and object
let bucket_name = "test-lifecycle-bucket";
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_bucket(&ecstore, bucket_name).await;
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name, object_name).await);
println!("✅ Object exists before lifecycle processing");
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
/*set_bucket_lifecycle_transition(bucket_name)
.await
.expect("Failed to set lifecycle configuration");
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
// Verify lifecycle configuration was set
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
Ok(bucket_meta) => {
assert!(bucket_meta.lifecycle_config.is_some());
println!("✅ Bucket metadata retrieved successfully");
}
Err(e) => {
println!("❌ Error retrieving bucket metadata: {e:?}");
}
}*/
// Create scanner with very short intervals for testing
let scanner_config = ScannerConfig {
scan_interval: Duration::from_millis(100),
deep_scan_interval: Duration::from_millis(500),
max_concurrent_scans: 1,
..Default::default()
};
let scanner = Scanner::new(Some(scanner_config), None);
// Start scanner
scanner.start().await.expect("Failed to start scanner");
println!("✅ Scanner started");
// Wait for scanner to process lifecycle rules
tokio::time::sleep(Duration::from_secs(2)).await;
// Manually trigger a scan cycle to ensure lifecycle processing
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
println!("✅ Manual scan cycle completed");
// Wait a bit more for background workers to process expiry tasks
tokio::time::sleep(Duration::from_secs(5)).await;
// Check if object has been expired (deleted)
//let check_result = object_is_transitioned(&ecstore, bucket_name, object_name).await;
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {check_result}");
if check_result {
println!("✅ Object was not deleted by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
Ok(obj_info) => {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
println!("Object info: transitioned_object={:?}", obj_info.transitioned_object);
}
Err(e) => {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("❌ Object was deleted by lifecycle processing");
}
assert!(check_result);
println!("✅ Object successfully transitioned");
// Stop scanner
let _ = scanner.stop().await;
println!("✅ Scanner stopped");
println!("Lifecycle transition basic test completed");
}

View File

@@ -124,7 +124,7 @@ pub const DEFAULT_LOG_FILENAME: &str = "rustfs";
/// This is the default log filename for OBS.
/// It is used to store the logs of the application.
/// Default value: rustfs.log
pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".log");
pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".");
/// Default sink file log file for rustfs
/// This is the default sink file log file for rustfs.

View File

@@ -27,7 +27,7 @@ pub const MQTT_QUEUE_LIMIT: &str = "queue_limit";
/// A list of all valid configuration keys for an MQTT target.
pub const NOTIFY_MQTT_KEYS: &[&str] = &[
ENABLE_KEY, // "enable" is a common key
ENABLE_KEY,
MQTT_BROKER,
MQTT_TOPIC,
MQTT_QOS,

View File

@@ -24,7 +24,7 @@ pub const WEBHOOK_CLIENT_KEY: &str = "client_key";
/// A list of all valid configuration keys for a webhook target.
pub const NOTIFY_WEBHOOK_KEYS: &[&str] = &[
ENABLE_KEY, // "enable" is a common key
ENABLE_KEY,
WEBHOOK_ENDPOINT,
WEBHOOK_AUTH_TOKEN,
WEBHOOK_QUEUE_LIMIT,

View File

@@ -0,0 +1,347 @@
#![cfg(test)]
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use bytes::Bytes;
use serial_test::serial;
use std::error::Error;
const ENDPOINT: &str = "http://localhost:9000";
const ACCESS_KEY: &str = "rustfsadmin";
const SECRET_KEY: &str = "rustfsadmin";
const BUCKET: &str = "api-test";
async fn create_aws_s3_client() -> Result<Client, Box<dyn Error>> {
let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1"));
let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region_provider)
.credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static"))
.endpoint_url(ENDPOINT)
.load()
.await;
let client = Client::from_conf(
aws_sdk_s3::Config::from(&shared_config)
.to_builder()
.force_path_style(true)
.build(),
);
Ok(client)
}
/// Setup test bucket, creating it if it doesn't exist
async fn setup_test_bucket(client: &Client) -> Result<(), Box<dyn Error>> {
match client.create_bucket().bucket(BUCKET).send().await {
Ok(_) => {}
Err(SdkError::ServiceError(e)) => {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
if !error_code.eq("BucketAlreadyExists") {
return Err(e.into());
}
}
Err(e) => {
return Err(e.into());
}
}
Ok(())
}
/// Generate test data of specified size
fn generate_test_data(size: usize) -> Vec<u8> {
let pattern = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
let mut data = Vec::with_capacity(size);
for i in 0..size {
data.push(pattern[i % pattern.len()]);
}
data
}
/// Upload an object and return its ETag
async fn upload_object_with_metadata(client: &Client, bucket: &str, key: &str, data: &[u8]) -> Result<String, Box<dyn Error>> {
let response = client
.put_object()
.bucket(bucket)
.key(key)
.body(Bytes::from(data.to_vec()).into())
.send()
.await?;
let etag = response.e_tag().unwrap_or("").to_string();
Ok(etag)
}
/// Cleanup test objects from bucket
async fn cleanup_objects(client: &Client, bucket: &str, keys: &[&str]) {
for key in keys {
let _ = client.delete_object().bucket(bucket).key(*key).send().await;
}
}
/// Generate unique test object key
fn generate_test_key(prefix: &str) -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
format!("{prefix}-{timestamp}")
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_put_okay() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let test_key = generate_test_key("conditional-put-ok");
let initial_data = generate_test_data(1024); // 1KB test data
let updated_data = generate_test_data(2048); // 2KB updated data
// Upload initial object and get its ETag
let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &initial_data).await?;
// Test 1: PUT with matching If-Match condition (should succeed)
let response1 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_match(&initial_etag)
.send()
.await;
assert!(response1.is_ok(), "PUT with matching If-Match should succeed");
// Test 2: PUT with non-matching If-None-Match condition (should succeed)
let fake_etag = "\"fake-etag-12345\"";
let response2 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_none_match(fake_etag)
.send()
.await;
assert!(response2.is_ok(), "PUT with non-matching If-None-Match should succeed");
// Cleanup
cleanup_objects(&client, BUCKET, &[&test_key]).await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_put_failed() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let test_key = generate_test_key("conditional-put-failed");
let initial_data = generate_test_data(1024);
let updated_data = generate_test_data(2048);
// Upload initial object and get its ETag
let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &initial_data).await?;
// Test 1: PUT with non-matching If-Match condition (should fail with 412)
let fake_etag = "\"fake-etag-should-not-match\"";
let response1 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_match(fake_etag)
.send()
.await;
assert!(response1.is_err(), "PUT with non-matching If-Match should fail");
if let Err(e) = response1 {
if let SdkError::ServiceError(e) = e {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
assert_eq!("PreconditionFailed", error_code);
} else {
panic!("Unexpected error: {e:?}");
}
}
// Test 2: PUT with matching If-None-Match condition (should fail with 412)
let response2 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_none_match(&initial_etag)
.send()
.await;
assert!(response2.is_err(), "PUT with matching If-None-Match should fail");
if let Err(e) = response2 {
if let SdkError::ServiceError(e) = e {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
assert_eq!("PreconditionFailed", error_code);
} else {
panic!("Unexpected error: {e:?}");
}
}
// Cleanup - only need to clean up the initial object since failed PUTs shouldn't create objects
cleanup_objects(&client, BUCKET, &[&test_key]).await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_put_when_object_does_not_exist() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let key = "some_key";
cleanup_objects(&client, BUCKET, &[key]).await;
// When the object does not exist, the If-Match condition should always fail
let response1 = client
.put_object()
.bucket(BUCKET)
.key(key)
.body(Bytes::from(generate_test_data(1024)).into())
.if_match("*")
.send()
.await;
assert!(response1.is_err());
if let Err(e) = response1 {
if let SdkError::ServiceError(e) = e {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
assert_eq!("NoSuchKey", error_code);
} else {
panic!("Unexpected error: {e:?}");
}
}
// When the object does not exist, the If-None-Match condition should be able to succeed
let response2 = client
.put_object()
.bucket(BUCKET)
.key(key)
.body(Bytes::from(generate_test_data(1024)).into())
.if_none_match("*")
.send()
.await;
assert!(response2.is_ok());
cleanup_objects(&client, BUCKET, &[key]).await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_multi_part_upload() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let test_key = generate_test_key("multipart-upload-ok");
let test_data = generate_test_data(1024);
let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &test_data).await?;
let part_size = 5 * 1024 * 1024; // 5MB per part (minimum for multipart)
let num_parts = 3;
let mut parts = Vec::new();
// Initiate multipart upload
let initiate_response = client.create_multipart_upload().bucket(BUCKET).key(&test_key).send().await?;
let upload_id = initiate_response
.upload_id()
.ok_or(std::io::Error::other("No upload ID returned"))?;
// Upload parts
for part_number in 1..=num_parts {
let part_data = generate_test_data(part_size);
let upload_part_response = client
.upload_part()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.part_number(part_number)
.body(Bytes::from(part_data).into())
.send()
.await?;
let part_etag = upload_part_response
.e_tag()
.ok_or(std::io::Error::other("Do not have etag"))?
.to_string();
let completed_part = CompletedPart::builder().part_number(part_number).e_tag(part_etag).build();
parts.push(completed_part);
}
// Complete multipart upload
let completed_upload = CompletedMultipartUpload::builder().set_parts(Some(parts)).build();
// Test 1: Multipart upload with wildcard If-None-Match, should fail
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_none_match("*")
.send()
.await;
assert!(complete_response.is_err());
// Test 2: Multipart upload with matching If-None-Match, should fail
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_none_match(initial_etag.clone())
.send()
.await;
assert!(complete_response.is_err());
// Test 3: Multipart upload with unmatching If-Match, should fail
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_match("\"abcdef\"")
.send()
.await;
assert!(complete_response.is_err());
// Test 4: Multipart upload with matching If-Match, should succeed
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_match(initial_etag)
.send()
.await;
assert!(complete_response.is_ok());
// Cleanup
cleanup_objects(&client, BUCKET, &[&test_key]).await;
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod conditional_writes;
mod lifecycle;
mod lock;
mod node_interact_test;

View File

@@ -1,4 +1,3 @@
#![allow(unused_imports)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
@@ -39,7 +39,7 @@ use time::OffsetDateTime;
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{RwLock, mpsc};
use tracing::{error, info};
use tracing::{debug, error, info};
use uuid::Uuid;
use xxhash_rust::xxh64;
@@ -587,7 +587,7 @@ impl TransitionState {
pub async fn init_background_expiry(api: Arc<ECStore>) {
let mut workers = num_cpus::get() / 2;
//globalILMConfig.getExpirationWorkers()
if let Ok(env_expiration_workers) = env::var("_RUSTFS_EXPIRATION_WORKERS") {
if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") {
if let Ok(num_expirations) = env_expiration_workers.parse::<usize>() {
workers = num_expirations;
}
@@ -945,10 +945,13 @@ pub async fn apply_expiry_on_non_transitioned_objects(
// let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
//debug!("lc_event.action: {:?}", lc_event.action);
//debug!("opts: {:?}", opts);
let mut dobj = api
.delete_object(&oi.bucket, &encode_dir_object(&oi.name), opts)
.await
.unwrap();
//debug!("dobj: {:?}", dobj);
if dobj.name.is_empty() {
dobj = oi.clone();
}

View File

@@ -25,6 +25,7 @@ use s3s::dto::{
use std::cmp::Ordering;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use time::macros::{datetime, offset};
use time::{self, Duration, OffsetDateTime};
use tracing::info;
@@ -138,7 +139,7 @@ pub trait Lifecycle {
async fn eval(&self, obj: &ObjectOpts) -> Event;
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event;
//fn set_prediction_headers(&self, w: http.ResponseWriter, obj: ObjectOpts);
async fn noncurrent_versions_expiration_limit(&self, obj: &ObjectOpts) -> Event;
async fn noncurrent_versions_expiration_limit(self: Arc<Self>, obj: &ObjectOpts) -> Event;
}
#[async_trait::async_trait]
@@ -322,9 +323,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
});
break;
}
}
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.expect("err!"), days /*, date*/);
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
@@ -538,7 +537,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
Event::default()
}
async fn noncurrent_versions_expiration_limit(&self, obj: &ObjectOpts) -> Event {
async fn noncurrent_versions_expiration_limit(self: Arc<Self>, obj: &ObjectOpts) -> Event {
if let Some(filter_rules) = self.filter_rules(obj).await {
for rule in filter_rules.iter() {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
@@ -626,7 +625,7 @@ pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTi
.to_offset(offset!(-0:00:00))
.saturating_add(Duration::days(days as i64));
let mut hour = 3600;
if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_HOUR") {
if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_PROCESS_TIME") {
if let Ok(num_hour) = env_ilm_hour.parse::<usize>() {
hour = num_hour;
}

View File

@@ -317,7 +317,7 @@ impl TransitionClient {
//}
let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, self.random);
while let Some(v) = retry_timer.next().await {
while retry_timer.next().await.is_some() {
let req = self.new_request(&method, metadata).await?;
resp = self.doit(req).await?;
@@ -590,46 +590,7 @@ impl TransitionClient {
return false;
}
// AUTO
let host = match url.host_str() {
Some(h) => h,
None => return false,
};
// If endpoint is an IP address, do not use virtual host style
let is_ip = host.parse::<std::net::IpAddr>().is_ok();
if is_ip {
return false;
}
// Basic DNS bucket validation: lowercase letters, numbers, dot and hyphen; must start/end alnum
let is_dns_compatible = {
let bytes = bucket_name.as_bytes();
let start_end_ok = bucket_name
.chars()
.next()
.map(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
.unwrap_or(false)
&& bucket_name
.chars()
.last()
.map(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
.unwrap_or(false);
let middle_ok = bytes
.iter()
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || *b == b'-' || *b == b'.');
start_end_ok && middle_ok && bucket_name.len() >= 3 && bucket_name.len() <= 63
};
if !is_dns_compatible {
return false;
}
// When using TLS, avoid buckets with dots to prevent cert/SNI mismatch unless a wildcard cert is ensured.
if self.secure && bucket_name.contains('.') {
return false;
}
true
false
}
pub fn cred_context(&self) -> CredContext {
@@ -1035,56 +996,3 @@ impl tower::Service<Request<Body>> for SendRequest {
#[derive(Serialize, Deserialize)]
pub struct Document(pub String);
#[cfg(test)]
mod tests {
use super::*;
fn mk_client(endpoint: &str, secure: bool, lookup: BucketLookupType) -> TransitionClient {
let creds = Credentials::new(Static(Default::default()));
let opts = Options {
creds,
secure,
bucket_lookup: lookup,
..Default::default()
};
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { TransitionClient::new(endpoint, opts).await.unwrap() })
}
#[test]
fn test_is_virtual_host_auto_http_domain_dns_bucket() {
let cl = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupAuto);
assert!(cl.is_virtual_host_style_request(&cl.endpoint_url(), "test"));
}
#[test]
fn test_is_virtual_host_auto_http_ip() {
let cl = mk_client("127.0.0.1:9000", false, BucketLookupType::BucketLookupAuto);
assert!(!cl.is_virtual_host_style_request(&cl.endpoint_url(), "test"));
}
#[test]
fn test_is_virtual_host_auto_https_bucket_with_dot_disallowed() {
let cl = mk_client("s3.example.com:443", true, BucketLookupType::BucketLookupAuto);
assert!(!cl.is_virtual_host_style_request(&cl.endpoint_url(), "te.st"));
}
#[test]
fn test_is_virtual_host_dns_forced() {
let cl = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupDNS);
assert!(cl.is_virtual_host_style_request(&cl.endpoint_url(), "test"));
}
#[test]
fn test_target_url_vhost_and_path() {
let cl_v = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupDNS);
let url_v = cl_v.make_target_url("test", "obj.txt", "", true, &HashMap::new()).unwrap();
assert_eq!(url_v.as_str(), "http://test.s3.example.com:9000/obj.txt");
let cl_p = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupPath);
let url_p = cl_p.make_target_url("test", "obj.txt", "", false, &HashMap::new()).unwrap();
assert_eq!(url_p.as_str(), "http://s3.example.com:9000/test/obj.txt");
}
}

View File

@@ -187,6 +187,9 @@ pub enum StorageError {
#[error("Lock error: {0}")]
Lock(#[from] rustfs_lock::LockError),
#[error("Precondition failed")]
PreconditionFailed,
}
impl StorageError {
@@ -416,6 +419,7 @@ impl Clone for StorageError {
StorageError::Lock(e) => StorageError::Lock(e.clone()),
StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()),
StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()),
StorageError::PreconditionFailed => StorageError::PreconditionFailed,
}
}
}
@@ -481,6 +485,7 @@ impl StorageError {
StorageError::Lock(_) => 0x38,
StorageError::InsufficientReadQuorum(_, _) => 0x39,
StorageError::InsufficientWriteQuorum(_, _) => 0x3A,
StorageError::PreconditionFailed => 0x3B,
}
}
@@ -548,6 +553,7 @@ impl StorageError {
0x38 => Some(StorageError::Lock(rustfs_lock::LockError::internal("Generic lock error".to_string()))),
0x39 => Some(StorageError::InsufficientReadQuorum(Default::default(), Default::default())),
0x3A => Some(StorageError::InsufficientWriteQuorum(Default::default(), Default::default())),
0x3B => Some(StorageError::PreconditionFailed),
_ => None,
}
}

View File

@@ -61,6 +61,7 @@ use glob::Pattern;
use http::HeaderMap;
use md5::{Digest as Md5Digest, Md5};
use rand::{Rng, seq::SliceRandom};
use regex::Regex;
use rustfs_common::heal_channel::{DriveState, HealChannelPriority, HealItemType, HealOpts, HealScanMode, send_heal_disk};
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{
@@ -3218,6 +3219,44 @@ impl SetDisks {
obj?;
Ok(())
}
async fn check_write_precondition(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Option<StorageError> {
let mut opts = opts.clone();
let http_preconditions = opts.http_preconditions?;
opts.http_preconditions = None;
// Never claim a lock here, to avoid deadlock
// - If no_lock is false, we must have obtained the lock out side of this function
// - If no_lock is true, we should not obtain locks
opts.no_lock = true;
let oi = self.get_object_info(bucket, object, &opts).await;
match oi {
Ok(oi) => {
if should_prevent_write(&oi, http_preconditions.if_none_match, http_preconditions.if_match) {
return Some(StorageError::PreconditionFailed);
}
}
Err(StorageError::VersionNotFound(_, _, _))
| Err(StorageError::ObjectNotFound(_, _))
| Err(StorageError::ErasureReadQuorum) => {
// When the object is not found,
// - if If-Match is set, we should return 404 NotFound
// - if If-None-Match is set, we should be able to proceed with the request
if http_preconditions.if_match.is_some() {
return Some(StorageError::ObjectNotFound(bucket.to_string(), object.to_string()));
}
}
Err(e) => {
return Some(e);
}
}
None
}
}
#[async_trait::async_trait]
@@ -3335,6 +3374,12 @@ impl ObjectIO for SetDisks {
_object_lock_guard = guard_opt;
}
if let Some(http_preconditions) = opts.http_preconditions.clone() {
if let Some(err) = self.check_write_precondition(bucket, object, opts).await {
return Err(err);
}
}
let mut user_defined = opts.user_defined.clone();
let sc_parity_drives = {
@@ -5123,6 +5168,26 @@ impl StorageAPI for SetDisks {
let disks = disks.clone();
// let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
let mut _object_lock_guard: Option<rustfs_lock::LockGuard> = None;
if let Some(http_preconditions) = opts.http_preconditions.clone() {
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_object_lock_guard = guard_opt;
}
if let Some(err) = self.check_write_precondition(bucket, object, opts).await {
return Err(err);
}
}
let part_path = format!("{}/{}/", upload_id_path, fi.data_dir.unwrap_or(Uuid::nil()));
let part_meta_paths = uploaded_parts
@@ -5942,13 +6007,45 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
format!("{:x}-{}", hasher.finalize(), parts.len())
}
pub fn canonicalize_etag(etag: &str) -> String {
let re = Regex::new("\"*?([^\"]*?)\"*?$").unwrap();
re.replace_all(etag, "$1").to_string()
}
pub fn e_tag_matches(etag: &str, condition: &str) -> bool {
if condition.trim() == "*" {
return true;
}
canonicalize_etag(etag) == canonicalize_etag(condition)
}
pub fn should_prevent_write(oi: &ObjectInfo, if_none_match: Option<String>, if_match: Option<String>) -> bool {
match &oi.etag {
Some(etag) => {
if let Some(if_none_match) = if_none_match {
if e_tag_matches(etag, &if_none_match) {
return true;
}
}
if let Some(if_match) = if_match {
if !e_tag_matches(etag, &if_match) {
return true;
}
}
false
}
// If we can't obtain the etag of the object, perevent the write only when we have at least one condition
None => if_none_match.is_some() || if_match.is_some(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::disk::CHECK_PART_UNKNOWN;
use crate::disk::CHECK_PART_VOLUME_NOT_FOUND;
use crate::disk::error::DiskError;
use crate::store_api::CompletePart;
use crate::store_api::{CompletePart, ObjectInfo};
use rustfs_filemeta::ErasureInfo;
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -6373,4 +6470,62 @@ mod tests {
assert_eq!(result2.len(), 3);
assert!(result2.iter().all(|d| d.is_none()));
}
#[test]
fn test_etag_matches() {
assert!(e_tag_matches("abc", "abc"));
assert!(e_tag_matches("\"abc\"", "abc"));
assert!(e_tag_matches("\"abc\"", "*"));
}
#[test]
fn test_should_prevent_write() {
let oi = ObjectInfo {
etag: Some("abc".to_string()),
..Default::default()
};
let if_none_match = Some("abc".to_string());
let if_match = None;
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("*".to_string());
let if_match = None;
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = None;
let if_match = Some("def".to_string());
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = None;
let if_match = Some("*".to_string());
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("def".to_string());
let if_match = None;
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("def".to_string());
let if_match = Some("*".to_string());
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("def".to_string());
let if_match = Some("\"abc\"".to_string());
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("*".to_string());
let if_match = Some("\"abc\"".to_string());
assert!(should_prevent_write(&oi, if_none_match, if_match));
let oi = ObjectInfo {
etag: None,
..Default::default()
};
let if_none_match = Some("*".to_string());
let if_match = Some("\"abc\"".to_string());
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = None;
let if_match = None;
assert!(!should_prevent_write(&oi, if_none_match, if_match));
}
}

View File

@@ -132,30 +132,50 @@ impl GetObjectReader {
if is_compressed {
let actual_size = oi.get_actual_size()?;
let (off, length) = (0, oi.size);
let (_dec_off, dec_length) = (0, actual_size);
if let Some(_rs) = rs {
// TODO: range spec is not supported for compressed object
return Err(Error::other("The requested range is not satisfiable"));
// let (off, length) = rs.get_offset_length(actual_size)?;
}
let (off, length, dec_off, dec_length) = if let Some(rs) = rs {
// Support range requests for compressed objects
let (dec_off, dec_length) = rs.get_offset_length(actual_size)?;
(0, oi.size, dec_off, dec_length)
} else {
(0, oi.size, 0, actual_size)
};
let dec_reader = DecompressReader::new(reader, algo);
let actual_size = if actual_size > 0 {
let actual_size_usize = if actual_size > 0 {
actual_size as usize
} else {
return Err(Error::other(format!("invalid decompressed size {actual_size}")));
};
let dec_reader = LimitReader::new(dec_reader, actual_size);
let final_reader: Box<dyn AsyncRead + Unpin + Send + Sync> = if dec_off > 0 || dec_length != actual_size {
// Use RangedDecompressReader for streaming range processing
// The new implementation supports any offset size by streaming and skipping data
match RangedDecompressReader::new(dec_reader, dec_off, dec_length, actual_size_usize) {
Ok(ranged_reader) => {
tracing::debug!(
"Successfully created RangedDecompressReader for offset={}, length={}",
dec_off,
dec_length
);
Box::new(ranged_reader)
}
Err(e) => {
// Only fail if the range parameters are fundamentally invalid (e.g., offset >= file size)
tracing::error!("RangedDecompressReader failed with invalid range parameters: {}", e);
return Err(e);
}
}
} else {
Box::new(LimitReader::new(dec_reader, actual_size_usize))
};
let mut oi = oi.clone();
oi.size = dec_length;
return Ok((
GetObjectReader {
stream: Box::new(dec_reader),
stream: final_reader,
object_info: oi,
},
off,
@@ -283,6 +303,12 @@ impl HTTPRangeSpec {
}
}
#[derive(Debug, Default, Clone)]
pub struct HTTPPreconditions {
pub if_match: Option<String>,
pub if_none_match: Option<String>,
}
#[derive(Debug, Default, Clone)]
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files
@@ -306,6 +332,7 @@ pub struct ObjectOptions {
pub user_defined: HashMap<String, String>,
pub preserve_etag: Option<String>,
pub metadata_chg: bool,
pub http_preconditions: Option<HTTPPreconditions>,
pub replication_request: bool,
pub delete_marker: bool,
@@ -1084,3 +1111,338 @@ pub trait StorageAPI: ObjectIO {
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)>;
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;
}
/// A streaming decompression reader that supports range requests by skipping data in the decompressed stream.
/// This implementation acknowledges that compressed streams (like LZ4) must be decompressed sequentially
/// from the beginning, so it streams and discards data until reaching the target offset.
#[derive(Debug)]
pub struct RangedDecompressReader<R> {
inner: R,
target_offset: usize,
target_length: usize,
current_offset: usize,
bytes_returned: usize,
}
impl<R: AsyncRead + Unpin + Send + Sync> RangedDecompressReader<R> {
pub fn new(inner: R, offset: usize, length: i64, total_size: usize) -> Result<Self> {
// Validate the range request
if offset >= total_size {
tracing::debug!("Range offset {} exceeds total size {}", offset, total_size);
return Err(Error::other("Range offset exceeds file size"));
}
// Adjust length if it extends beyond file end
let actual_length = std::cmp::min(length as usize, total_size - offset);
tracing::debug!(
"Creating RangedDecompressReader: offset={}, length={}, total_size={}, actual_length={}",
offset,
length,
total_size,
actual_length
);
Ok(Self {
inner,
target_offset: offset,
target_length: actual_length,
current_offset: 0,
bytes_returned: 0,
})
}
}
impl<R: AsyncRead + Unpin + Send + Sync> AsyncRead for RangedDecompressReader<R> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
use std::pin::Pin;
use std::task::Poll;
use tokio::io::ReadBuf;
loop {
// If we've returned all the bytes we need, return EOF
if self.bytes_returned >= self.target_length {
return Poll::Ready(Ok(()));
}
// Read from the inner stream
let buf_capacity = buf.remaining();
if buf_capacity == 0 {
return Poll::Ready(Ok(()));
}
// Prepare a temporary buffer for reading
let mut temp_buf = vec![0u8; std::cmp::min(buf_capacity, 8192)];
let mut temp_read_buf = ReadBuf::new(&mut temp_buf);
match Pin::new(&mut self.inner).poll_read(cx, &mut temp_read_buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(())) => {
let n = temp_read_buf.filled().len();
if n == 0 {
// EOF from inner stream
if self.current_offset < self.target_offset {
// We haven't reached the target offset yet - this is an error
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"Unexpected EOF: only read {} bytes, target offset is {}",
self.current_offset, self.target_offset
),
)));
}
// Normal EOF after reaching target
return Poll::Ready(Ok(()));
}
// Update current position
let old_offset = self.current_offset;
self.current_offset += n;
// Check if we're still in the skip phase
if old_offset < self.target_offset {
// We're still skipping data
let skip_end = std::cmp::min(self.current_offset, self.target_offset);
let bytes_to_skip_in_this_read = skip_end - old_offset;
if self.current_offset <= self.target_offset {
// All data in this read should be skipped
tracing::trace!("Skipping {} bytes at offset {}", n, old_offset);
// Continue reading in the loop instead of recursive call
continue;
} else {
// Partial skip: some data should be returned
let data_start_in_buffer = bytes_to_skip_in_this_read;
let available_data = n - data_start_in_buffer;
let bytes_to_return = std::cmp::min(
available_data,
std::cmp::min(buf.remaining(), self.target_length - self.bytes_returned),
);
if bytes_to_return > 0 {
let data_slice =
&temp_read_buf.filled()[data_start_in_buffer..data_start_in_buffer + bytes_to_return];
buf.put_slice(data_slice);
self.bytes_returned += bytes_to_return;
tracing::trace!(
"Skipped {} bytes, returned {} bytes at offset {}",
bytes_to_skip_in_this_read,
bytes_to_return,
old_offset
);
}
return Poll::Ready(Ok(()));
}
} else {
// We're in the data return phase
let bytes_to_return =
std::cmp::min(n, std::cmp::min(buf.remaining(), self.target_length - self.bytes_returned));
if bytes_to_return > 0 {
buf.put_slice(&temp_read_buf.filled()[..bytes_to_return]);
self.bytes_returned += bytes_to_return;
tracing::trace!("Returned {} bytes at offset {}", bytes_to_return, old_offset);
}
return Poll::Ready(Ok(()));
}
}
}
}
}
}
/// A wrapper that ensures the inner stream is fully consumed even if the outer reader stops early.
/// This prevents broken pipe errors in erasure coding scenarios where the writer expects
/// the full stream to be consumed.
pub struct StreamConsumer<R: AsyncRead + Unpin + Send + 'static> {
inner: Option<R>,
consumer_task: Option<tokio::task::JoinHandle<()>>,
}
impl<R: AsyncRead + Unpin + Send + 'static> StreamConsumer<R> {
pub fn new(inner: R) -> Self {
Self {
inner: Some(inner),
consumer_task: None,
}
}
fn ensure_consumer_started(&mut self) {
if self.consumer_task.is_none() && self.inner.is_some() {
let mut inner = self.inner.take().unwrap();
let task = tokio::spawn(async move {
let mut buf = [0u8; 8192];
loop {
match inner.read(&mut buf).await {
Ok(0) => break, // EOF
Ok(_) => continue, // Keep consuming
Err(_) => break, // Error, stop consuming
}
}
});
self.consumer_task = Some(task);
}
}
}
impl<R: AsyncRead + Unpin + Send + 'static> AsyncRead for StreamConsumer<R> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
use std::pin::Pin;
use std::task::Poll;
if let Some(ref mut inner) = self.inner {
Pin::new(inner).poll_read(cx, buf)
} else {
Poll::Ready(Ok(())) // EOF
}
}
}
impl<R: AsyncRead + Unpin + Send + 'static> Drop for StreamConsumer<R> {
fn drop(&mut self) {
if self.consumer_task.is_none() && self.inner.is_some() {
let mut inner = self.inner.take().unwrap();
let task = tokio::spawn(async move {
let mut buf = [0u8; 8192];
loop {
match inner.read(&mut buf).await {
Ok(0) => break, // EOF
Ok(_) => continue, // Keep consuming
Err(_) => break, // Error, stop consuming
}
}
});
self.consumer_task = Some(task);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tokio::io::AsyncReadExt;
#[tokio::test]
async fn test_ranged_decompress_reader() {
// Create test data
let original_data = b"Hello, World! This is a test for range requests on compressed data.";
// For this test, we'll simulate using the original data directly as "decompressed"
let cursor = Cursor::new(original_data.to_vec());
// Test reading a range from the middle
let mut ranged_reader = RangedDecompressReader::new(cursor, 7, 5, original_data.len()).unwrap();
let mut result = Vec::new();
ranged_reader.read_to_end(&mut result).await.unwrap();
// Should read "World" (5 bytes starting from position 7)
assert_eq!(result, b"World");
}
#[tokio::test]
async fn test_ranged_decompress_reader_from_start() {
let original_data = b"Hello, World! This is a test.";
let cursor = Cursor::new(original_data.to_vec());
let mut ranged_reader = RangedDecompressReader::new(cursor, 0, 5, original_data.len()).unwrap();
let mut result = Vec::new();
ranged_reader.read_to_end(&mut result).await.unwrap();
// Should read "Hello" (5 bytes from the start)
assert_eq!(result, b"Hello");
}
#[tokio::test]
async fn test_ranged_decompress_reader_to_end() {
let original_data = b"Hello, World!";
let cursor = Cursor::new(original_data.to_vec());
let mut ranged_reader = RangedDecompressReader::new(cursor, 7, 6, original_data.len()).unwrap();
let mut result = Vec::new();
ranged_reader.read_to_end(&mut result).await.unwrap();
// Should read "World!" (6 bytes starting from position 7)
assert_eq!(result, b"World!");
}
#[tokio::test]
async fn test_http_range_spec_with_compressed_data() {
// Test that HTTPRangeSpec::get_offset_length works correctly
let range_spec = HTTPRangeSpec {
is_suffix_length: false,
start: 5,
end: 14, // inclusive
};
let total_size = 100i64;
let (offset, length) = range_spec.get_offset_length(total_size).unwrap();
assert_eq!(offset, 5);
assert_eq!(length, 10); // end - start + 1 = 14 - 5 + 1 = 10
}
#[tokio::test]
async fn test_ranged_decompress_reader_zero_length() {
let original_data = b"Hello, World!";
let cursor = Cursor::new(original_data.to_vec());
let mut ranged_reader = RangedDecompressReader::new(cursor, 5, 0, original_data.len()).unwrap();
let mut result = Vec::new();
ranged_reader.read_to_end(&mut result).await.unwrap();
// Should read nothing
assert_eq!(result, b"");
}
#[tokio::test]
async fn test_ranged_decompress_reader_skip_entire_data() {
let original_data = b"Hello, World!";
let cursor = Cursor::new(original_data.to_vec());
// Skip to end of data with length 0 - this should read nothing
let mut ranged_reader = RangedDecompressReader::new(cursor, original_data.len() - 1, 0, original_data.len()).unwrap();
let mut result = Vec::new();
ranged_reader.read_to_end(&mut result).await.unwrap();
assert_eq!(result, b"");
}
#[tokio::test]
async fn test_ranged_decompress_reader_out_of_bounds_offset() {
let original_data = b"Hello, World!";
let cursor = Cursor::new(original_data.to_vec());
// Offset beyond EOF should return error in constructor
let result = RangedDecompressReader::new(cursor, original_data.len() + 10, 5, original_data.len());
assert!(result.is_err());
// Use pattern matching to avoid requiring Debug on the error type
if let Err(e) = result {
assert!(e.to_string().contains("Range offset exceeds file size"));
}
}
#[tokio::test]
async fn test_ranged_decompress_reader_partial_read() {
let original_data = b"abcdef";
let cursor = Cursor::new(original_data.to_vec());
let mut ranged_reader = RangedDecompressReader::new(cursor, 2, 3, original_data.len()).unwrap();
let mut buf = [0u8; 2];
let n = ranged_reader.read(&mut buf).await.unwrap();
assert_eq!(n, 2);
assert_eq!(&buf, b"cd");
let mut buf2 = [0u8; 2];
let n2 = ranged_reader.read(&mut buf2).await.unwrap();
assert_eq!(n2, 1);
assert_eq!(&buf2[..1], b"e");
}
}

View File

@@ -540,6 +540,15 @@ impl FileMeta {
}
}
let mut update_version = fi.mark_deleted;
/*if fi.version_purge_status().is_empty()
{
update_version = fi.mark_deleted;
}*/
if fi.transition_status == TRANSITION_COMPLETE {
update_version = false;
}
for (i, ver) in self.versions.iter().enumerate() {
if ver.header.version_id != fi.version_id {
continue;
@@ -557,12 +566,14 @@ impl FileMeta {
return Ok(None);
}
VersionType::Object => {
let v = self.get_idx(i)?;
if update_version && !fi.deleted {
let v = self.get_idx(i)?;
self.versions.remove(i);
self.versions.remove(i);
let a = v.object.map(|v| v.data_dir).unwrap_or_default();
return Ok(a);
let a = v.object.map(|v| v.data_dir).unwrap_or_default();
return Ok(a);
}
}
}
}
@@ -581,6 +592,7 @@ impl FileMeta {
ver.object.as_mut().unwrap().set_transition(fi);
ver.object.as_mut().unwrap().reset_inline_data();
self.set_idx(i, ver.clone())?;
return Ok(None);
} else {
let vers = self.versions[i + 1..].to_vec();
self.versions.extend(vers.iter().cloned());

View File

@@ -15,8 +15,6 @@
use std::sync::Arc;
use once_cell::sync::Lazy;
use std::thread;
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use crate::{client::LockClient, types::LockId};
@@ -27,43 +25,36 @@ struct UnlockJob {
clients: Vec<Arc<dyn LockClient>>, // cloned Arcs; cheap and shares state
}
// Global unlock runtime with background worker running on a dedicated thread-bound Tokio runtime.
// This avoids depending on the application's Tokio runtime lifetimes/cancellation scopes.
static UNLOCK_TX: Lazy<mpsc::Sender<UnlockJob>> = Lazy::new(|| {
#[derive(Debug)]
struct UnlockRuntime {
tx: mpsc::Sender<UnlockJob>,
}
// Global unlock runtime with background worker
static UNLOCK_RUNTIME: Lazy<UnlockRuntime> = Lazy::new(|| {
// Larger buffer to reduce contention during bursts
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
// Spawn a dedicated OS thread that owns its own Tokio runtime to process unlock jobs.
thread::Builder::new()
.name("rustfs-lock-unlocker".to_string())
.spawn(move || {
// A lightweight current-thread runtime is sufficient here.
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build Tokio runtime for background unlock jobs (possible causes: resource exhaustion, thread limit, Tokio misconfiguration)");
rt.block_on(async move {
while let Some(job) = rx.recv().await {
// Best-effort release across clients; try all, success if any succeeds
let mut any_ok = false;
let lock_id = job.lock_id.clone();
for client in job.clients.into_iter() {
if client.release(&lock_id).await.unwrap_or(false) {
any_ok = true;
}
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
} else {
tracing::debug!("LockGuard background released {}", lock_id);
}
// Spawn background worker when first used; assumes a Tokio runtime is available
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
// Best-effort release across clients; try all, success if any succeeds
let mut any_ok = false;
let lock_id = job.lock_id.clone();
for client in job.clients.into_iter() {
if client.release(&lock_id).await.unwrap_or(false) {
any_ok = true;
}
});
})
.expect("failed to spawn unlock worker thread");
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
} else {
tracing::debug!("LockGuard background released {}", lock_id);
}
}
});
tx
UnlockRuntime { tx }
});
/// A RAII guard that releases the lock asynchronously when dropped.
@@ -108,32 +99,22 @@ impl Drop for LockGuard {
};
// Try a non-blocking send to avoid panics in Drop
if let Err(err) = UNLOCK_TX.try_send(job) {
// Channel full or closed; best-effort fallback using a dedicated thread runtime
if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
// Channel full or closed; best-effort fallback: spawn a detached task
let lock_id = self.lock_id.clone();
let clients = self.clients.clone();
tracing::warn!(
"LockGuard channel send failed ({}), spawning fallback unlock thread for {}",
err,
lock_id.clone()
);
tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id);
// Use a short-lived background thread to execute the async releases on its own runtime.
let _ = thread::Builder::new()
.name("rustfs-lock-unlock-fallback".to_string())
.spawn(move || {
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build fallback unlock runtime in LockGuard::drop fallback thread. This indicates resource exhaustion or misconfiguration (e.g., thread limits, Tokio runtime issues). Remediation: check system resource limits, ensure sufficient threads are available, and verify Tokio runtime configuration.");
rt.block_on(async move {
let futures_iter = clients.into_iter().map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
});
let _ = futures::future::join_all(futures_iter).await;
});
// If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts.
let handle = tokio::spawn(async move {
let futures_iter = clients.into_iter().map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
});
let _ = futures::future::join_all(futures_iter).await;
});
// Explicitly drop the JoinHandle to acknowledge detaching the task.
drop(handle);
}
}
}

View File

@@ -15,7 +15,7 @@
use anyhow::Result;
use rmcp::{
ErrorData, RoleServer, ServerHandler,
handler::server::{router::tool::ToolRouter, tool::Parameters},
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, ToolsCapability},
service::{NotificationContext, RequestContext},
tool, tool_handler, tool_router,

View File

@@ -86,7 +86,7 @@ impl Notifier {
// Check if any subscribers are interested in the event
if !notification_sys.has_subscriber(&args.bucket_name, &args.event_name).await {
error!("No subscribers for event: {} in bucket: {}", args.event_name, args.bucket_name);
// error!("No subscribers for event: {} in bucket: {}", args.event_name, args.bucket_name);
return;
}
@@ -162,13 +162,13 @@ impl Notifier {
&self,
bucket_name: &str,
region: &str,
event_rules: &[(Vec<EventName>, &str, &str, Vec<TargetID>)],
event_rules: &[(Vec<EventName>, String, String, Vec<TargetID>)],
) -> Result<(), NotificationError> {
let mut bucket_config = BucketNotificationConfig::new(region);
for (event_names, prefix, suffix, target_ids) in event_rules {
// Use `new_pattern` to construct a matching pattern
let pattern = crate::rules::pattern::new_pattern(Some(prefix), Some(suffix));
let pattern = crate::rules::pattern::new_pattern(Some(prefix.as_str()), Some(suffix.as_str()));
for target_id in target_ids {
bucket_config.add_rule(event_names, pattern.clone(), target_id.clone());
@@ -186,4 +186,25 @@ impl Notifier {
.load_bucket_notification_config(bucket_name, &bucket_config)
.await
}
/// Clear all notification rules for the specified bucket.
/// # Parameter
/// - `bucket_name`: The name of the target bucket.
/// # Return value
/// Returns `Result<(), NotificationError>`, Ok on success, and an error on failure.
/// # Using
/// This function allows you to clear all notification rules for a specific bucket.
/// This is useful when you want to reset the notification configuration for a bucket.
///
pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> {
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
// Clear configuration
notification_sys.remove_bucket_notification_config(bucket_name).await;
Ok(())
}
}

View File

@@ -17,14 +17,13 @@ use rustfs_config::observability::{
DEFAULT_SINKS_FILE_FLUSH_THRESHOLD, DEFAULT_SINKS_KAFKA_BATCH_SIZE, DEFAULT_SINKS_KAFKA_BATCH_TIMEOUT_MS,
DEFAULT_SINKS_KAFKA_BROKERS, DEFAULT_SINKS_KAFKA_TOPIC, DEFAULT_SINKS_WEBHOOK_AUTH_TOKEN, DEFAULT_SINKS_WEBHOOK_ENDPOINT,
DEFAULT_SINKS_WEBHOOK_MAX_RETRIES, DEFAULT_SINKS_WEBHOOK_RETRY_DELAY_MS, ENV_AUDIT_LOGGER_QUEUE_CAPACITY, ENV_OBS_ENDPOINT,
ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO,
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_SINKS_FILE_BUFFER_SIZE, ENV_SINKS_FILE_FLUSH_INTERVAL_MS,
ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE, ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS,
ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN, ENV_SINKS_WEBHOOK_ENDPOINT,
ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS,
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT, ENV_SINKS_FILE_BUFFER_SIZE,
ENV_SINKS_FILE_FLUSH_INTERVAL_MS, ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE,
ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS, ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN,
ENV_SINKS_WEBHOOK_ENDPOINT, ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS,
};
use rustfs_config::observability::{ENV_OBS_LOG_DIRECTORY, ENV_OBS_USE_STDOUT};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME,
DEFAULT_OBS_LOG_FILENAME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,

View File

@@ -26,42 +26,8 @@ use s3s::Body;
const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
const SIGN_V2_ALGORITHM: &str = "AWS";
fn encode_url2path(req: &request::Request<Body>, virtual_host: bool) -> String {
// In virtual-hosted-style, the canonical resource must include "/{bucket}" prefix
// extracted from the Host header: bucket.domain.tld -> "/bucket"
let mut path = req.uri().path().to_string();
if virtual_host {
let host = super::utils::get_host_addr(req);
// strip port if present
let host = match host.split_once(':') {
Some((h, _)) => h,
None => host.as_str(),
};
// If host has at least 3 labels (bucket + domain + tld), take first label as bucket
if let Some((bucket, _rest)) = host.split_once('.') {
if !bucket.is_empty() {
// avoid duplicating if path already starts with /bucket/
let expected_prefix = format!("/{bucket}");
if !path.starts_with(&expected_prefix) {
// Only prefix for bucket-level paths; ensure a single slash separation
if path == "/" {
path = expected_prefix;
} else {
path = format!(
"{}{}",
expected_prefix,
if path.starts_with('/') {
path.clone()
} else {
format!("/{path}")
}
);
}
}
}
}
}
path
fn encode_url2path(req: &request::Request<Body>, _virtual_host: bool) -> String {
req.uri().path().to_string()
}
pub fn pre_sign_v2(
@@ -273,39 +239,3 @@ fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Request<Body>
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use http::Request;
fn mk_req(host: &str, path: &str, query: &str) -> request::Request<Body> {
let uri = if query.is_empty() {
format!("http://{host}{path}")
} else {
format!("http://{host}{path}?{query}")
};
let mut req = Request::builder().uri(uri).method("GET").body(Body::empty()).unwrap();
// minimal headers used by signers
let h = req.headers_mut();
h.insert("Content-Md5", "".parse().unwrap());
h.insert("Content-Type", "".parse().unwrap());
h.insert("Date", "Thu, 21 Aug 2025 00:00:00 +0000".parse().unwrap());
h.insert("host", host.parse().unwrap());
req
}
#[test]
fn test_encode_url2path_vhost_prefixes_bucket() {
let req = mk_req("test.example.com", "/obj.txt", "");
let path = encode_url2path(&req, true);
assert_eq!(path, "/test/obj.txt");
}
#[test]
fn test_encode_url2path_path_style_unchanged() {
let req = mk_req("example.com", "/test/obj.txt", "uploads=");
let path = encode_url2path(&req, false);
assert_eq!(path, "/test/obj.txt");
}
}

View File

@@ -15,8 +15,6 @@
use bytes::Bytes;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use hyper::client::conn::http2::Builder;
use hyper_util::rt::TokioExecutor;
use std::net::Ipv6Addr;
use std::sync::LazyLock;
use std::{
@@ -144,10 +142,6 @@ pub fn get_endpoint_url(endpoint: &str, secure: bool) -> Result<Url, std::io::Er
pub const DEFAULT_DIAL_TIMEOUT: i64 = 5;
pub fn new_remotetarget_http_transport(_insecure: bool) -> Builder<TokioExecutor> {
todo!();
}
const ALLOWED_CUSTOM_QUERY_PREFIX: &str = "x-";
pub fn is_custom_query_value(qs_key: &str) -> bool {

View File

@@ -20,7 +20,7 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::time::interval;
use tokio::time::{Interval, MissedTickBehavior, interval};
pub const MAX_RETRY: i64 = 10;
pub const MAX_JITTER: f64 = 1.0;
@@ -29,22 +29,28 @@ pub const NO_JITTER: f64 = 0.0;
pub const DEFAULT_RETRY_UNIT: Duration = Duration::from_millis(200);
pub const DEFAULT_RETRY_CAP: Duration = Duration::from_secs(1);
#[derive(Debug)]
pub struct RetryTimer {
base_sleep: Duration,
max_sleep: Duration,
jitter: f64,
random: u64,
max_retry: i64,
rem: i64,
timer: Option<Interval>,
}
impl RetryTimer {
pub fn new(max_retry: i64, base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64) -> Self {
//println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
Self {
base_sleep,
max_sleep,
jitter,
random,
max_retry,
rem: max_retry,
timer: None,
}
}
}
@@ -53,26 +59,52 @@ impl Stream for RetryTimer {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER);
if self.rem == 0 {
return Poll::Ready(None);
}
let attempt = MAX_RETRY - self.rem;
let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER);
let attempt = self.max_retry - self.rem;
let mut sleep = self.base_sleep * (1 << attempt);
if sleep > self.max_sleep {
sleep = self.max_sleep;
}
if (jitter - NO_JITTER).abs() > 1e-9 {
sleep -= sleep * self.random as u32 * jitter as u32;
//println!("\njitter: {:?}", jitter);
//println!("sleep: {sleep:?}");
//println!("0000: {:?}", self.random as f64 * jitter / 100_f64);
let sleep_ms = sleep.as_millis() as u64;
sleep = Duration::from_millis(sleep_ms - (sleep_ms as f64 * (self.random as f64 * jitter / 100_f64)) as u64);
}
//println!("sleep: {sleep:?}");
if self.timer.is_none() {
let mut timer = interval(sleep);
timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.timer = Some(timer);
//println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
}
if self.rem == 0 {
return Poll::Ready(None);
}
self.rem -= 1;
let mut t = interval(sleep);
match t.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Pending,
let mut timer = self.timer.as_mut().unwrap();
match Pin::new(&mut timer).poll_tick(cx) {
Poll::Ready(_) => {
//println!("ready");
//println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
self.rem -= 1;
if self.rem > 0 {
let mut new_timer = interval(sleep);
new_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
new_timer.reset();
self.timer = Some(new_timer);
//println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
}
Poll::Ready(Some(()))
}
Poll::Pending => {
//println!("pending");
//println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
Poll::Pending
}
}
}
}
@@ -136,3 +168,27 @@ pub fn is_request_error_retryable(_err: std::io::Error) -> bool {
true*/
todo!();
}
#[cfg(test)]
#[allow(unused_imports)]
mod tests {
use super::*;
use futures::{Future, StreamExt};
use rand::Rng;
use std::time::UNIX_EPOCH;
#[tokio::test]
async fn test_retry() {
let req_retry = 10;
let random = rand::rng().random_range(0..=100);
let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, random);
println!("retry_timer: {retry_timer:?}");
while retry_timer.next().await.is_some() {
println!(
"\ntime: {:?}",
std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
);
}
}
}

View File

@@ -103,6 +103,7 @@ tower-http = { workspace = true, features = [
"cors",
"catch-panic",
] }
url = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true }
zip = { workspace = true }

View File

@@ -27,15 +27,14 @@ use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::C
use serde::{Deserialize, Serialize};
use serde_urlencoded::from_bytes;
use std::collections::HashMap;
use std::future::Future;
use std::io::{Error, ErrorKind};
use std::net::SocketAddr;
use std::path::Path;
use tokio::net::lookup_host;
use tokio::time::{Duration, sleep};
use tracing::{debug, error, info, warn};
#[derive(Debug, Deserialize)]
struct TargetQuery {
#[serde(rename = "targetType")]
target_type: String,
#[serde(rename = "targetName")]
target_name: String,
}
use url::Url;
#[derive(Debug, Deserialize)]
struct BucketQuery {
@@ -43,19 +42,104 @@ struct BucketQuery {
bucket_name: String,
}
/// Set (create or update) a notification target
pub struct SetNotificationTarget {}
#[async_trait::async_trait]
impl Operation for SetNotificationTarget {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// 1. Analyze query parameters
let query: TargetQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
#[derive(Debug, Deserialize)]
pub struct KeyValue {
pub key: String,
pub value: String,
}
let target_type = query.target_type.to_lowercase();
if target_type != *NOTIFY_WEBHOOK_SUB_SYS && target_type != *NOTIFY_MQTT_SUB_SYS {
return Err(s3_error!(InvalidArgument, "unsupported target type: {}", query.target_type));
#[derive(Debug, Deserialize)]
pub struct NotificationTargetBody {
pub key_values: Vec<KeyValue>,
}
#[derive(Serialize, Debug)]
struct NotificationEndpoint {
account_id: String,
service: String,
status: String,
}
#[derive(Serialize, Debug)]
struct NotificationEndpointsResponse {
notification_endpoints: Vec<NotificationEndpoint>,
}
async fn retry_with_backoff<F, Fut, T>(mut operation: F, max_attempts: usize, base_delay: Duration) -> Result<T, Error>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, Error>>,
{
assert!(max_attempts > 0, "max_attempts must be greater than 0");
let mut attempts = 0;
let mut delay = base_delay;
let mut last_err = None;
while attempts < max_attempts {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
last_err = Some(e);
attempts += 1;
if attempts < max_attempts {
warn!(
"Retry attempt {}/{} failed: {}. Retrying in {:?}",
attempts,
max_attempts,
last_err.as_ref().unwrap(),
delay
);
sleep(delay).await;
delay = delay.saturating_mul(2);
}
}
}
}
Err(last_err.unwrap_or_else(|| Error::other("retry_with_backoff: unknown error")))
}
async fn retry_metadata(path: &str) -> Result<(), Error> {
retry_with_backoff(|| async { tokio::fs::metadata(path).await.map(|_| ()) }, 3, Duration::from_millis(100)).await
}
async fn validate_queue_dir(queue_dir: &str) -> S3Result<()> {
if !queue_dir.is_empty() {
if !Path::new(queue_dir).is_absolute() {
return Err(s3_error!(InvalidArgument, "queue_dir must be absolute path"));
}
if let Err(e) = retry_metadata(queue_dir).await {
match e.kind() {
ErrorKind::NotFound => {
return Err(s3_error!(InvalidArgument, "queue_dir does not exist"));
}
ErrorKind::PermissionDenied => {
return Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied"));
}
_ => {
return Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e));
}
}
}
}
Ok(())
}
fn validate_cert_key_pair(cert: &Option<String>, key: &Option<String>) -> S3Result<()> {
if cert.is_some() != key.is_some() {
return Err(s3_error!(InvalidArgument, "client_cert and client_key must be specified as a pair"));
}
Ok(())
}
/// Set (create or update) a notification target
pub struct NotificationTarget {}
#[async_trait::async_trait]
impl Operation for NotificationTarget {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// 1. Analyze query parameters
let (target_type, target_name) = extract_target_params(&params)?;
// 2. Permission verification
let Some(input_cred) = &req.credentials else {
@@ -82,25 +166,119 @@ impl Operation for SetNotificationTarget {
kvs_map.insert(ENABLE_KEY.to_string(), EnableState::On.to_string());
}
let kvs = rustfs_ecstore::config::KVS(
kvs_map
.into_iter()
.map(|(key, value)| rustfs_ecstore::config::KV {
key,
value,
hidden_if_empty: false, // Set a default value
})
.collect(),
);
// 1. Get the allowed key range
let allowed_keys: std::collections::HashSet<&str> = match target_type {
NOTIFY_WEBHOOK_SUB_SYS => rustfs_config::notify::NOTIFY_WEBHOOK_KEYS.iter().cloned().collect(),
NOTIFY_MQTT_SUB_SYS => rustfs_config::notify::NOTIFY_MQTT_KEYS.iter().cloned().collect(),
_ => unreachable!(),
};
let notification_body: NotificationTargetBody = serde_json::from_slice(&body)
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?;
// 2. Filter and verify keys, and splice target_name
let mut kvs_vec = Vec::new();
let mut endpoint_val = None;
let mut queue_dir_val = None;
let mut client_cert_val = None;
let mut client_key_val = None;
let mut qos_val = None;
for kv in notification_body.key_values.iter() {
if !allowed_keys.contains(kv.key.as_str()) {
return Err(s3_error!(
InvalidArgument,
"key '{}' not allowed for target type '{}'",
kv.key,
target_type
));
}
if kv.key == "endpoint" {
endpoint_val = Some(kv.value.clone());
}
if kv.key == "queue_dir" {
queue_dir_val = Some(kv.value.clone());
}
if kv.key == "client_cert" {
client_cert_val = Some(kv.value.clone());
}
if kv.key == "client_key" {
client_key_val = Some(kv.value.clone());
}
if kv.key == "qos" {
qos_val = Some(kv.value.clone());
}
kvs_vec.push(rustfs_ecstore::config::KV {
key: kv.key.clone(),
value: kv.value.clone(),
hidden_if_empty: false,
});
}
if target_type == NOTIFY_WEBHOOK_SUB_SYS {
let endpoint = endpoint_val
.clone()
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?;
let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
let host = url
.host_str()
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing host"))?;
let port = url
.port_or_known_default()
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing port"))?;
let addr = format!("{}:{}", host, port);
// First, try to parse as SocketAddr (IP:port)
if addr.parse::<SocketAddr>().is_err() {
// If not an IP:port, try DNS resolution
if lookup_host(&addr).await.is_err() {
return Err(s3_error!(InvalidArgument, "invalid or unresolvable endpoint address"));
}
}
if let Some(queue_dir) = queue_dir_val.clone() {
validate_queue_dir(&queue_dir).await?;
}
validate_cert_key_pair(&client_cert_val, &client_key_val)?;
}
if target_type == NOTIFY_MQTT_SUB_SYS {
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?;
let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
match url.scheme() {
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {}
_ => return Err(s3_error!(InvalidArgument, "unsupported broker url scheme")),
}
if let Some(queue_dir) = queue_dir_val {
validate_queue_dir(&queue_dir).await?;
if let Some(qos) = qos_val {
match qos.parse::<u8>() {
Ok(qos_int) if qos_int == 1 || qos_int == 2 => {}
Ok(0) => {
return Err(s3_error!(InvalidArgument, "qos should be 1 or 2 if queue_dir is set"));
}
_ => {
return Err(s3_error!(InvalidArgument, "qos must be an integer 0, 1, or 2"));
}
}
}
}
}
// 3. Add ENABLE_KEY
kvs_vec.push(rustfs_ecstore::config::KV {
key: ENABLE_KEY.to_string(),
value: EnableState::On.to_string(),
hidden_if_empty: false,
});
let kvs = rustfs_ecstore::config::KVS(kvs_vec);
// 5. Call notification system to set target configuration
info!("Setting target config for type '{}', name '{}'", &query.target_type, &query.target_name);
ns.set_target_config(&query.target_type, &query.target_name, kvs)
.await
.map_err(|e| {
error!("failed to set target config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to set target config: {e}"))
})?;
info!("Setting target config for type '{}', name '{}'", target_type, target_name);
ns.set_target_config(target_type, target_name, kvs).await.map_err(|e| {
error!("failed to set target config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to set target config: {e}"))
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
@@ -131,20 +309,68 @@ impl Operation for ListNotificationTargets {
// 3. Get the list of activity targets
let active_targets = ns.get_active_targets().await;
debug!("ListNotificationTargets call found {} active targets", active_targets.len());
let mut notification_endpoints = Vec::new();
for target_id in active_targets.iter() {
notification_endpoints.push(NotificationEndpoint {
account_id: target_id.id.clone(),
service: target_id.name.to_string(),
status: "online".to_string(),
});
}
let response = NotificationEndpointsResponse { notification_endpoints };
// 4. Serialize and return the result
let data = serde_json::to_vec(&response).map_err(|e| {
error!("Failed to serialize notification targets response: {:?}", response);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {e}"))
})?;
debug!("ListNotificationTargets call end, response data length: {}", data.len(),);
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
/// Get a list of notification targets for all activities
pub struct ListTargetsArns {}
#[async_trait::async_trait]
impl Operation for ListTargetsArns {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
debug!("ListTargetsArns call start request params: {:?}", req.uri.query());
// 1. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 2. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
// 3. Get the list of activity targets
let active_targets = ns.get_active_targets().await;
debug!("ListTargetsArns call found {} active targets", active_targets.len());
let region = match req.region.clone() {
Some(region) => region,
None => return Err(s3_error!(InvalidRequest, "region not found")),
};
let mut data_target_arn_list = Vec::new();
for target_id in active_targets.iter() {
let target_arn = target_id.to_arn(&region);
data_target_arn_list.push(target_arn.to_string());
data_target_arn_list.push(target_id.to_arn(&region).to_string());
}
// 4. Serialize and return the result
let data = serde_json::to_vec(&data_target_arn_list)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {e}")))?;
debug!("ListNotificationTargets call end, response data length: {}", data.len(),);
debug!("ListTargetsArns call end, response data length: {}", data.len(),);
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
@@ -155,10 +381,9 @@ impl Operation for ListNotificationTargets {
pub struct RemoveNotificationTarget {}
#[async_trait::async_trait]
impl Operation for RemoveNotificationTarget {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// 1. Analyze query parameters
let query: TargetQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
let (target_type, target_name) = extract_target_params(&params)?;
// 2. Permission verification
let Some(input_cred) = &req.credentials else {
@@ -173,13 +398,11 @@ impl Operation for RemoveNotificationTarget {
};
// 4. Call notification system to remove target configuration
info!("Removing target config for type '{}', name '{}'", &query.target_type, &query.target_name);
ns.remove_target_config(&query.target_type, &query.target_name)
.await
.map_err(|e| {
error!("failed to remove target config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to remove target config: {e}"))
})?;
info!("Removing target config for type '{}', name '{}'", target_type, target_name);
ns.remove_target_config(target_type, target_name).await.map_err(|e| {
error!("failed to remove target config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to remove target config: {e}"))
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
@@ -188,6 +411,22 @@ impl Operation for RemoveNotificationTarget {
}
}
fn extract_param<'a>(params: &'a Params<'_, '_>, key: &str) -> S3Result<&'a str> {
params
.get(key)
.ok_or_else(|| s3_error!(InvalidArgument, "missing required parameter: '{}'", key))
}
fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, &'a str)> {
let target_type = extract_param(params, "target_type")?;
if target_type != NOTIFY_WEBHOOK_SUB_SYS && target_type != NOTIFY_MQTT_SUB_SYS {
return Err(s3_error!(InvalidArgument, "unsupported target type: '{}'", target_type));
}
let target_name = extract_param(params, "target_name")?;
Ok((target_type, target_name))
}
/// Set notification rules for buckets
pub struct SetBucketNotification {}
#[async_trait::async_trait]

View File

@@ -20,16 +20,15 @@ pub mod utils;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::{
bucket_meta, group, policies, pools, rebalance,
GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler, bucket_meta,
event::{
GetBucketNotification, ListNotificationTargets, NotificationTarget, RemoveBucketNotification, RemoveNotificationTarget,
SetBucketNotification,
},
group, policies, pools, rebalance,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
sts, tier, user,
};
use crate::admin::handlers::event::{
GetBucketNotification, ListNotificationTargets, RemoveBucketNotification, RemoveNotificationTarget, SetBucketNotification,
SetNotificationTarget,
};
use handlers::{GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler};
use hyper::Method;
use router::{AdminOperation, S3Router};
use rpc::register_rpc_route;
@@ -371,14 +370,14 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/target-list").as_str(),
format!("{}{}", ADMIN_PREFIX, "/v3/target/list").as_str(),
AdminOperation(&ListNotificationTargets {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-set").as_str(),
AdminOperation(&SetNotificationTarget {}),
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/target/{target_type}/{target_name}").as_str(),
AdminOperation(&NotificationTarget {}),
)?;
// Remove notification target
@@ -388,9 +387,15 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
// * `target_name` - A unique name for a Target, such as "1".
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/target-remove").as_str(),
format!("{}{}", ADMIN_PREFIX, "/v3/target/{target_type}/{target_name}/reset").as_str(),
AdminOperation(&RemoveNotificationTarget {}),
)?;
// arns
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/target/arns").as_str(),
AdminOperation(&ListNotificationTargets {}),
)?;
r.insert(
Method::POST,

View File

@@ -81,6 +81,7 @@ impl From<StorageError> for ApiError {
StorageError::DataMovementOverwriteErr(_, _, _) => S3ErrorCode::InvalidArgument,
StorageError::ObjectExistsAsDirectory(_, _) => S3ErrorCode::InvalidArgument,
StorageError::InvalidPart(_, _, _) => S3ErrorCode::InvalidPart,
StorageError::PreconditionFailed => S3ErrorCode::PreconditionFailed,
_ => S3ErrorCode::InternalError,
};

View File

@@ -137,8 +137,21 @@ pub async fn start_http_server(
b.set_route(admin::make_admin_route(opt.console_enable)?);
if !opt.server_domains.is_empty() {
info!("virtual-hosted-style requests are enabled use domain_name {:?}", &opt.server_domains);
b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?);
MultiDomain::new(&opt.server_domains).map_err(Error::other)?; // validate domains
// add the default port number to the given server domains
let mut domain_sets = std::collections::HashSet::new();
for domain in &opt.server_domains {
domain_sets.insert(domain.to_string());
if let Some((host, _)) = domain.split_once(':') {
domain_sets.insert(format!("{}:{}", host, server_port));
} else {
domain_sets.insert(format!("{}:{}", domain, server_port));
}
}
info!("virtual-hosted-style requests are enabled use domain_name {:?}", &domain_sets);
b.set_host(MultiDomain::new(domain_sets).map_err(Error::other)?);
}
b.build()

View File

@@ -21,19 +21,14 @@ use crate::error::ApiError;
use crate::storage::access::ReqInfo;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts};
use crate::storage::options::get_complete_multipart_upload_opts;
use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts, parse_copy_source_range};
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_query::get_global_db;
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
use futures::StreamExt;
use http::HeaderMap;
@@ -63,6 +58,7 @@ use rustfs_ecstore::compress::is_compressible;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::store_api::CompletePart;
use rustfs_ecstore::store_api::DeleteBucketOptions;
@@ -76,6 +72,7 @@ use rustfs_ecstore::store_api::PutObjReader;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_notify::global::notifier_instance;
use rustfs_policy::auth;
use rustfs_policy::policy::action::Action;
use rustfs_policy::policy::action::S3Action;
@@ -85,7 +82,12 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_query::get_global_db;
use rustfs_targets::EventName;
use rustfs_targets::arn::{TargetID, TargetIDError};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
@@ -261,7 +263,7 @@ impl FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
}
}
@@ -289,6 +291,7 @@ impl FS {
Ok(S3Response::new(output))
}
}
#[async_trait::async_trait]
impl S3 for FS {
#[tracing::instrument(
@@ -334,7 +337,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -480,7 +483,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -680,7 +683,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteBucketOutput {}))
@@ -755,7 +758,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -840,7 +843,7 @@ impl S3 for FS {
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
}
});
@@ -960,11 +963,11 @@ impl S3 for FS {
}
}
let mut content_length = info.size as i64;
let mut content_length = info.size;
let content_range = if let Some(rs) = rs {
let total_size = info.get_actual_size().map_err(ApiError::from)?;
let (start, length) = rs.get_offset_length(total_size as i64).map_err(ApiError::from)?;
let (start, length) = rs.get_offset_length(total_size).map_err(ApiError::from)?;
content_length = length;
Some(format!("bytes {}-{}/{}", start, start as i64 + length - 1, total_size))
} else {
@@ -1005,7 +1008,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1127,7 +1130,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1511,7 +1514,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1589,7 +1592,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1677,11 +1680,180 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
let _input = req.input;
let UploadPartCopyInput {
bucket,
key,
copy_source,
copy_source_range,
part_number,
upload_id,
copy_source_if_match,
copy_source_if_none_match,
..
} = req.input;
let _output = UploadPartCopyOutput { ..Default::default() };
// Parse source bucket, object and version from copy_source
let (src_bucket, src_key, src_version_id) = match copy_source {
CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
CopySource::Bucket {
bucket: ref src_bucket,
key: ref src_key,
version_id,
} => (src_bucket.to_string(), src_key.to_string(), version_id.map(|v| v.to_string())),
};
unimplemented!("upload_part_copy");
// Parse range if provided (format: "bytes=start-end")
let rs = if let Some(range_str) = copy_source_range {
Some(parse_copy_source_range(&range_str)?)
} else {
None
};
let part_id = part_number as usize;
// Note: In a real implementation, you would properly validate access
// For now, we'll skip the detailed authorization check
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// Check if multipart upload exists and get its info
let mp_info = store
.get_multipart_info(&bucket, &key, &upload_id, &ObjectOptions::default())
.await
.map_err(ApiError::from)?;
// Set up source options
let mut src_opts = copy_src_opts(&src_bucket, &src_key, &req.headers).map_err(ApiError::from)?;
src_opts.version_id = src_version_id.clone();
// Get source object info to validate conditions
let h = HeaderMap::new();
let get_opts = ObjectOptions {
version_id: src_opts.version_id.clone(),
versioned: src_opts.versioned,
version_suspended: src_opts.version_suspended,
..Default::default()
};
let src_reader = store
.get_object_reader(&src_bucket, &src_key, rs.clone(), h, &get_opts)
.await
.map_err(ApiError::from)?;
let src_info = src_reader.object_info;
// Validate copy conditions (simplified for now)
if let Some(if_match) = copy_source_if_match {
if let Some(ref etag) = src_info.etag {
if etag != &if_match {
return Err(s3_error!(PreconditionFailed));
}
} else {
return Err(s3_error!(PreconditionFailed));
}
}
if let Some(if_none_match) = copy_source_if_none_match {
if let Some(ref etag) = src_info.etag {
if etag == &if_none_match {
return Err(s3_error!(PreconditionFailed));
}
}
}
// TODO: Implement proper time comparison for if_modified_since and if_unmodified_since
// For now, we'll skip these conditions
// Calculate actual range and length
// Note: These values are used implicitly through the range specification (rs)
// passed to get_object_reader, which handles the offset and length internally
let (_start_offset, length) = if let Some(ref range_spec) = rs {
// For range validation, use the actual logical size of the file
// For compressed files, this means using the uncompressed size
let validation_size = match src_info.is_compressed_ok() {
Ok((_, true)) => {
// For compressed files, use actual uncompressed size for range validation
src_info.get_actual_size().unwrap_or(src_info.size)
}
_ => {
// For non-compressed files, use the stored size
src_info.size
}
};
range_spec
.get_offset_length(validation_size)
.map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, format!("Invalid range: {e}")))?
} else {
(0, src_info.size)
};
// Create a new reader from the source data with the correct range
// We need to re-read from the source with the correct range specification
let h = HeaderMap::new();
let get_opts = ObjectOptions {
version_id: src_opts.version_id.clone(),
versioned: src_opts.versioned,
version_suspended: src_opts.version_suspended,
..Default::default()
};
// Get the source object reader once with the validated range
let src_reader = store
.get_object_reader(&src_bucket, &src_key, rs.clone(), h, &get_opts)
.await
.map_err(ApiError::from)?;
// Use the same reader for streaming
let src_stream = src_reader.stream;
// Check if compression is enabled for this multipart upload
let is_compressible = mp_info
.user_defined
.contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}compression").as_str());
let mut reader: Box<dyn Reader> = Box::new(WarpReader::new(src_stream));
let actual_size = length;
let mut size = length;
if is_compressible {
let hrd = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?;
reader = Box::new(CompressReader::new(hrd, CompressionAlgorithm::default()));
size = -1;
}
// TODO: md5 check
let reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?;
let mut reader = PutObjReader::new(reader);
// Set up destination options (inherit from multipart upload)
let dst_opts = ObjectOptions {
user_defined: mp_info.user_defined.clone(),
..Default::default()
};
// Write the copied data as a new part
let part_info = store
.put_object_part(&bucket, &key, &upload_id, part_id, &mut reader, &dst_opts)
.await
.map_err(ApiError::from)?;
// Create response
let copy_part_result = CopyPartResult {
e_tag: part_info.etag,
last_modified: part_info.last_mod.map(Timestamp::from),
..Default::default()
};
let output = UploadPartCopyOutput {
copy_part_result: Some(copy_part_result),
copy_source_version_id: src_version_id,
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument(level = "debug", skip(self, req))]
@@ -1813,7 +1985,7 @@ impl S3 for FS {
let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
let opts = &ObjectOptions::default();
let opts = &get_complete_multipart_upload_opts(&req.headers).map_err(ApiError::from)?;
let mut uploaded_parts = Vec::new();
@@ -1977,7 +2149,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
@@ -2044,7 +2216,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
@@ -2621,20 +2793,56 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// Verify that the bucket exists
store
.get_bucket_info(&bucket, &BucketOptions::default())
.await
.map_err(ApiError::from)?;
// Persist the new notification configuration
let data = try_!(serialize(&notification_configuration));
metadata_sys::update(&bucket, BUCKET_NOTIFICATION_CONFIG, data)
.await
.map_err(ApiError::from)?;
// TODO: event notice add rule
// Determine region (BucketInfo has no region field) -> use global region or default
let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default());
Ok(S3Response::new(PutBucketNotificationConfigurationOutput::default()))
// Purge old rules and resolve new rules in parallel
let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket);
let parse_rules = async {
let mut event_rules = Vec::new();
process_queue_configurations(
&mut event_rules,
notification_configuration.queue_configurations.clone(),
TargetID::from_str,
);
process_topic_configurations(
&mut event_rules,
notification_configuration.topic_configurations.clone(),
TargetID::from_str,
);
process_lambda_configurations(
&mut event_rules,
notification_configuration.lambda_function_configurations.clone(),
TargetID::from_str,
);
event_rules
};
let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules);
clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?;
// Add a new notification rule
notifier_instance()
.add_event_specific_rules(&bucket, &region, &event_rules)
.await
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?;
Ok(S3Response::new(PutBucketNotificationConfigurationOutput {}))
}
async fn get_bucket_acl(&self, req: S3Request<GetBucketAclInput>) -> S3Result<S3Response<GetBucketAclOutput>> {
@@ -2781,7 +2989,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -2959,7 +3167,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3038,7 +3246,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3099,7 +3307,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3174,13 +3382,91 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
}
/// Auxiliary functions: extract prefixes and suffixes
fn extract_prefix_suffix(filter: Option<&NotificationConfigurationFilter>) -> (String, String) {
if let Some(filter) = filter {
if let Some(filter_rules) = &filter.key {
let mut prefix = String::new();
let mut suffix = String::new();
if let Some(rules) = &filter_rules.filter_rules {
for rule in rules {
if let (Some(name), Some(value)) = (rule.name.as_ref(), rule.value.as_ref()) {
match name.as_str() {
"prefix" => prefix = value.clone(),
"suffix" => suffix = value.clone(),
_ => {}
}
}
}
}
return (prefix, suffix);
}
}
(String::new(), String::new())
}
/// Auxiliary functions: Handle configuration
fn process_queue_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<QueueConfiguration>>,
target_id_parser: F,
) where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.queue_arn).ok()].into_iter().flatten().collect();
event_rules.push((events, prefix, suffix, target_ids));
}
}
}
fn process_topic_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<TopicConfiguration>>,
target_id_parser: F,
) where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.topic_arn).ok()].into_iter().flatten().collect();
event_rules.push((events, prefix, suffix, target_ids));
}
}
}
fn process_lambda_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<LambdaFunctionConfiguration>>,
target_id_parser: F,
) where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.lambda_function_arn).ok()]
.into_iter()
.flatten()
.collect();
event_rules.push((events, prefix, suffix, target_ids));
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -16,8 +16,10 @@ use http::{HeaderMap, HeaderValue};
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::error::Result;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::store_api::ObjectOptions;
use rustfs_ecstore::store_api::{HTTPPreconditions, HTTPRangeSpec, ObjectOptions};
use rustfs_utils::path::is_dir_object;
use s3s::{S3Result, s3_error};
use std::collections::HashMap;
use std::sync::LazyLock;
use uuid::Uuid;
@@ -105,6 +107,32 @@ pub async fn get_opts(
Ok(opts)
}
fn fill_conditional_writes_opts_from_header(headers: &HeaderMap<HeaderValue>, opts: &mut ObjectOptions) -> Result<()> {
if headers.contains_key("If-None-Match") || headers.contains_key("If-Match") {
let mut preconditions = HTTPPreconditions::default();
if let Some(if_none_match) = headers.get("If-None-Match") {
preconditions.if_none_match = Some(
if_none_match
.to_str()
.map_err(|_| StorageError::other("Invalid If-None-Match header"))?
.to_string(),
);
}
if let Some(if_match) = headers.get("If-Match") {
preconditions.if_match = Some(
if_match
.to_str()
.map_err(|_| StorageError::other("Invalid If-Match header"))?
.to_string(),
);
}
opts.http_preconditions = Some(preconditions);
}
Ok(())
}
/// Creates options for putting an object in a bucket.
pub async fn put_opts(
bucket: &str,
@@ -141,6 +169,14 @@ pub async fn put_opts(
opts.version_suspended = version_suspended;
opts.versioned = versioned;
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
Ok(opts)
}
pub fn get_complete_multipart_upload_opts(headers: &HeaderMap<HeaderValue>) -> Result<ObjectOptions> {
let mut opts = ObjectOptions::default();
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
Ok(opts)
}
@@ -270,6 +306,61 @@ static SUPPORTED_HEADERS: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
]
});
/// Parse copy source range string in format "bytes=start-end"
pub fn parse_copy_source_range(range_str: &str) -> S3Result<HTTPRangeSpec> {
if !range_str.starts_with("bytes=") {
return Err(s3_error!(InvalidArgument, "Invalid range format"));
}
let range_part = &range_str[6..]; // Remove "bytes=" prefix
if let Some(dash_pos) = range_part.find('-') {
let start_str = &range_part[..dash_pos];
let end_str = &range_part[dash_pos + 1..];
if start_str.is_empty() && end_str.is_empty() {
return Err(s3_error!(InvalidArgument, "Invalid range format"));
}
if start_str.is_empty() {
// Suffix range: bytes=-500 (last 500 bytes)
let length = end_str
.parse::<i64>()
.map_err(|_| s3_error!(InvalidArgument, "Invalid range format"))?;
Ok(HTTPRangeSpec {
is_suffix_length: true,
start: -length,
end: -1,
})
} else {
let start = start_str
.parse::<i64>()
.map_err(|_| s3_error!(InvalidArgument, "Invalid range format"))?;
let end = if end_str.is_empty() {
-1 // Open-ended range: bytes=500-
} else {
end_str
.parse::<i64>()
.map_err(|_| s3_error!(InvalidArgument, "Invalid range format"))?
};
if start < 0 || (end != -1 && end < start) {
return Err(s3_error!(InvalidArgument, "Invalid range format"));
}
Ok(HTTPRangeSpec {
is_suffix_length: false,
start,
end,
})
}
} else {
Err(s3_error!(InvalidArgument, "Invalid range format"))
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -762,4 +853,31 @@ mod tests {
// 测试没有扩展名的文件
assert_eq!(detect_content_type_from_object_name("noextension"), "application/octet-stream");
}
#[test]
fn test_parse_copy_source_range() {
// Test complete range: bytes=0-1023
let result = parse_copy_source_range("bytes=0-1023").unwrap();
assert!(!result.is_suffix_length);
assert_eq!(result.start, 0);
assert_eq!(result.end, 1023);
// Test open-ended range: bytes=500-
let result = parse_copy_source_range("bytes=500-").unwrap();
assert!(!result.is_suffix_length);
assert_eq!(result.start, 500);
assert_eq!(result.end, -1);
// Test suffix range: bytes=-500 (last 500 bytes)
let result = parse_copy_source_range("bytes=-500").unwrap();
assert!(result.is_suffix_length);
assert_eq!(result.start, -500);
assert_eq!(result.end, -1);
// Test invalid format
assert!(parse_copy_source_range("invalid").is_err());
assert!(parse_copy_source_range("bytes=").is_err());
assert!(parse_copy_source_range("bytes=abc-def").is_err());
assert!(parse_copy_source_range("bytes=100-50").is_err()); // start > end
}
}

View File

@@ -45,7 +45,7 @@ export RUSTFS_VOLUMES="./target/volume/test{1...4}"
# export RUSTFS_VOLUMES="./target/volume/test"
export RUSTFS_ADDRESS=":9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS=":9001"
# export RUSTFS_CONSOLE_ADDRESS=":9001"
# export RUSTFS_SERVER_DOMAINS="localhost:9000"
# HTTPS certificate directory
# export RUSTFS_TLS_PATH="./deploy/certs"