Compare commits

...

3 Commits

Author SHA1 Message Date
Copilot
ad99019749 Add complete MNMD Docker deployment example with startup coordination and VolumeNotFound fix (#642)
* Initial plan

* Add MNMD Docker deployment example with 4 nodes x 4 drives

- Create docs/examples/mnmd/ directory structure
- Add docker-compose.yml with proper disk indexing (1..4)
- Add wait-and-start.sh for startup coordination
- Add README.md with usage instructions and alternatives
- Add CHECKLIST.md with step-by-step verification
- Fixes VolumeNotFound issue by using correct volume paths
- Implements health checks and startup ordering
- Uses service names for stable inter-node addressing

Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>

* Add docs/examples README as index for deployment examples

Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>

* Add automated test script for MNMD deployment

- Add test-deployment.sh with comprehensive validation
- Test container status, health, endpoints, connectivity
- Update README to reference test script
- Make script executable

Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>

* improve code

* improve code

* improve dep crates `cargo shear --fix`

* upgrade aws-sdk-s3

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2025-10-12 13:15:14 +08:00
houseme
aac9b1edb7 chore: improve event and docker-compose ,Improve the permissions of the endpoint health interface, upgrade otel from 0.30.0 to 0.31.0 (#620)
* feat: improve code for notify

* upgrade starshard version

* upgrade version

* Fix ETag format to comply with HTTP standards by wrapping with quotes (#592)

* Initial plan

* Fix ETag format to comply with HTTP standards by wrapping with quotes

Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>

* bufigx

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
Co-authored-by: overtrue <anzhengchao@gmail.com>

* Improve lock (#596)

* improve lock

Signed-off-by: Mu junxiang <1948535941@qq.com>

* feat(tests): add wait_for_object_absence helper and improve lifecycle test reliability

Signed-off-by: Mu junxiang <1948535941@qq.com>

* chore: remove dirty docs

Signed-off-by: Mu junxiang <1948535941@qq.com>

---------

Signed-off-by: Mu junxiang <1948535941@qq.com>

* feat(append): implement object append operations with state tracking (#599)

* feat(append): implement object append operations with state tracking

Signed-off-by: junxiang Mu <1948535941@qq.com>

* chore: rebase

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>

* build(deps): upgrade s3s (#595)

Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>

* fix: validate mqtt broker

* improve code for `import`

* upgrade otel relation crates version

* fix:dep("jsonwebtoken") feature = 'rust_crypto'

* fix

* fix

* fix

* upgrade version

* improve code for ecfs

* chore: improve event and docker-compose ,Improve the permissions of the `endpoint` health interface

* fix

* fix

* fix

* fix

* improve code

* fix

---------

Signed-off-by: Mu junxiang <1948535941@qq.com>
Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
Co-authored-by: overtrue <anzhengchao@gmail.com>
Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com>
Co-authored-by: Nugine <nugine@foxmail.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-10-11 09:08:25 +08:00
weisd
5689311cff fix:#630 (#633) 2025-10-10 15:16:28 +08:00
42 changed files with 2503 additions and 872 deletions

1079
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -99,8 +99,10 @@ async-recursion = "1.1.1"
async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.6" }
aws-sdk-s3 = { version = "1.106.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-config = { version = "1.8.8" }
aws-credential-types = { version = "1.2.8" }
aws-smithy-types = { version = "1.3.3" }
aws-sdk-s3 = { version = "1.108.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
axum = "0.8.6"
axum-extra = "0.10.3"
axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"], default-features = false }
@@ -111,6 +113,7 @@ bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.1.0"
byteorder = "1.5.0"
cfg-if = "1.0.3"
convert_case = "0.8.0"
crc-fast = "1.3.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.42", features = ["serde"] }
@@ -119,18 +122,18 @@ const-str = { version = "0.7.0", features = ["std", "proc"] }
crc32fast = "1.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
dashmap = "6.1.0"
datafusion = "50.0.0"
datafusion = "50.1.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.9.23"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.4", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
flate2 = "1.1.4"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
glob = "0.3.3"
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
hickory-resolver = { version = "0.25.2", features = ["tls-ring"] }
@@ -146,8 +149,9 @@ http = "1.3.1"
http-body = "1.0.1"
humantime = "2.3.0"
ipnetwork = { version = "0.21.1", features = ["serde"] }
jsonwebtoken = "9.3.1"
jsonwebtoken = { version = "10.0.0", features = ["rust_crypto"] }
lazy_static = "1.5.0"
libc = "0.2.177"
libsystemd = { version = "0.7.2" }
local-ip-address = "0.6.5"
lz4 = "1.28.1"
@@ -158,39 +162,39 @@ mime_guess = "2.0.5"
moka = { version = "0.12.11", features = ["future"] }
netif = "0.1.6"
nix = { version = "0.30.1", features = ["fs"] }
nu-ansi-term = "0.50.1"
nu-ansi-term = "0.50.3"
num_cpus = { version = "1.17.0" }
nvml-wrapper = "0.11.0"
object_store = "0.12.4"
once_cell = "1.21.3"
opentelemetry = { version = "0.30.0" }
opentelemetry-appender-tracing = { version = "0.30.1", features = [
opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = { version = "0.31.1", features = [
"experimental_use_tracing_span_context",
"experimental_metadata_attributes",
"spec_unstable_logs_enabled"
] }
opentelemetry_sdk = { version = "0.30.0" }
opentelemetry-stdout = { version = "0.30.0" }
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-stdout = { version = "0.31.0" }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = [
"grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"
] }
opentelemetry-semantic-conventions = { version = "0.30.0", features = [
opentelemetry-semantic-conventions = { version = "0.31.0", features = [
"semconv_experimental",
] }
parking_lot = "0.12.4"
parking_lot = "0.12.5"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
blake3 = { version = "1.8.2" }
pbkdf2 = "0.12.2"
percent-encoding = "2.3.2"
pin-project-lite = "0.2.16"
prost = "0.14.1"
pretty_assertions = "1.4.1"
quick-xml = "0.38.3"
rand = "0.9.2"
rayon = "1.11.0"
rdkafka = { version = "0.38.0", features = ["tokio"] }
reed-solomon-simd = { version = "3.0.1" }
regex = { version = "1.11.3" }
regex = { version = "1.12.1" }
reqwest = { version = "0.12.23", default-features = false, features = [
"rustls-tls-webpki-roots",
"charset",
@@ -200,13 +204,13 @@ reqwest = { version = "0.12.23", default-features = false, features = [
"json",
"blocking",
] }
rmcp = { version = "0.6.4" }
rmcp = { version = "0.8.1" }
rmp = "0.8.14"
rmp-serde = "1.3.0"
rsa = "0.9.8"
rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.7.2" }
rustfs-rsc = "2025.506.1"
rustc-hash = { version = "2.1.1" }
rustls = { version = "0.23.32", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"
@@ -225,6 +229,7 @@ smartstring = "1.0.1"
snafu = "0.8.9"
snap = "1.1.1"
socket2 = "0.6.0"
starshard = { version = "0.5.0", features = ["rayon", "async", "serde"] }
strum = { version = "0.27.2", features = ["derive"] }
sysinfo = "0.37.1"
sysctl = "0.7.1"
@@ -253,7 +258,7 @@ tower-http = { version = "0.6.6", features = ["cors"] }
tracing = "0.1.41"
tracing-core = "0.1.34"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.31.0"
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.7"
@@ -269,7 +274,7 @@ wildmatch = { version = "2.5.0", features = ["serde"] }
zeroize = { version = "1.8.2", features = ["derive"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "5.1.1"
zip = "6.0.0"
zstd = "0.13.3"

View File

@@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::AuditEntry;
use crate::AuditResult;
use crate::AuditSystem;
use crate::{AuditEntry, AuditResult, AuditSystem};
use once_cell::sync::OnceCell;
use rustfs_ecstore::config::Config;
use std::sync::Arc;

View File

@@ -17,9 +17,11 @@ use crate::AuditRegistry;
use crate::observability;
use crate::{AuditError, AuditResult};
use rustfs_ecstore::config::Config;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use rustfs_targets::{StoreError, Target, TargetError};
use rustfs_targets::{
StoreError, Target, TargetError,
store::{Key, Store},
target::EntityTarget,
};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tracing::{error, info, warn};
@@ -257,7 +259,7 @@ impl AuditSystem {
let target_id_clone = target_id.clone();
// Create EntityTarget for the audit log entry
let entity_target = rustfs_targets::target::EntityTarget {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
@@ -337,7 +339,7 @@ impl AuditSystem {
let mut success_count = 0;
let mut errors = Vec::new();
for entry in entries_clone {
let entity_target = rustfs_targets::target::EntityTarget {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: rustfs_targets::EventName::ObjectCreatedPut,

View File

@@ -75,7 +75,6 @@ hyper-util.workspace = true
hyper-rustls.workspace = true
rustls.workspace = true
tokio = { workspace = true, features = ["io-util", "sync", "signal"] }
tokio-stream = { workspace = true }
tonic.workspace = true
xxhash-rust = { workspace = true, features = ["xxh64", "xxh3"] }
tower.workspace = true
@@ -89,8 +88,6 @@ rustfs-madmin.workspace = true
rustfs-workers.workspace = true
reqwest = { workspace = true }
aws-sdk-s3 = { workspace = true }
once_cell = { workspace = true }
rustfs-rsc = { workspace = true }
urlencoding = { workspace = true }
smallvec = { workspace = true }
shadow-rs.workspace = true
@@ -99,13 +96,11 @@ rustfs-utils = { workspace = true, features = ["full"] }
rustfs-rio.workspace = true
rustfs-signer.workspace = true
rustfs-checksums.workspace = true
futures-util.workspace = true
async-recursion.workspace = true
aws-credential-types = "1.2.6"
aws-smithy-types = "1.3.2"
parking_lot = "0.12"
moka = { version = "0.12", features = ["future"] }
aws-smithy-runtime-api = "1.9.0"
aws-credential-types = { workspace = true }
aws-smithy-types = { workspace = true }
parking_lot = { workspace = true }
moka = { workspace = true }
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -169,6 +169,9 @@ impl InlineData {
}
pub fn remove(&mut self, remove_keys: Vec<Uuid>) -> Result<bool> {
let buf = self.after_version();
if buf.is_empty() {
return Ok(false);
}
let mut cur = Cursor::new(buf);
let mut fields_len = rmp::decode::read_map_len(&mut cur)? as usize;

View File

@@ -87,7 +87,7 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> std::result::Resu
jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
}
pub fn extract_claims<T: DeserializeOwned>(
pub fn extract_claims<T: DeserializeOwned + Clone>(
token: &str,
secret: &str,
) -> std::result::Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {
@@ -193,7 +193,7 @@ mod tests {
assert_eq!(error.to_string(), "secret key length is too short");
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct Claims {
sub: String,
company: String,

View File

@@ -32,14 +32,17 @@ rustfs-utils = { workspace = true, features = ["path", "sys"] }
rustfs-targets = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
dashmap = { workspace = true }
futures = { workspace = true }
form_urlencoded = { workspace = true }
hashbrown = { workspace = true }
once_cell = { workspace = true }
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
rayon = { workspace = true }
rumqttc = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
starshard = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }

View File

@@ -13,9 +13,9 @@
// limitations under the License.
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use url::form_urlencoded;
/// Represents the identity of the user who triggered the event

View File

@@ -14,6 +14,7 @@
use crate::Event;
use async_trait::async_trait;
use hashbrown::HashSet;
use rumqttc::QoS;
use rustfs_config::notify::{ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS};
use rustfs_config::{
@@ -27,7 +28,6 @@ use rustfs_targets::{
error::TargetError,
target::{mqtt::MQTTArgs, webhook::WebhookArgs},
};
use std::collections::HashSet;
use std::time::Duration;
use tracing::{debug, warn};
use url::Url;

View File

@@ -15,13 +15,13 @@
use crate::{
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
};
use hashbrown::HashMap;
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use rustfs_targets::{StoreError, Target};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
@@ -212,11 +212,6 @@ impl NotificationSystem {
return Ok(());
}
// if let Err(e) = rustfs_ecstore::config::com::save_server_config(store, &new_config).await {
// error!("Failed to save config: {}", e);
// return Err(NotificationError::SaveConfig(e.to_string()));
// }
info!("Configuration updated. Reloading system...");
self.reload_config(new_config).await
}

View File

@@ -13,19 +13,20 @@
// limitations under the License.
use crate::{error::NotificationError, event::Event, rules::RulesMap};
use dashmap::DashMap;
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::Target;
use rustfs_targets::arn::TargetID;
use rustfs_targets::target::EntityTarget;
use std::{collections::HashMap, sync::Arc};
use starshard::AsyncShardedHashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};
/// Manages event notification to targets based on rules
pub struct EventNotifier {
target_list: Arc<RwLock<TargetList>>,
bucket_rules_map: Arc<DashMap<String, RulesMap>>,
bucket_rules_map: Arc<AsyncShardedHashMap<String, RulesMap, rustc_hash::FxBuildHasher>>,
}
impl Default for EventNotifier {
@@ -39,7 +40,7 @@ impl EventNotifier {
pub fn new() -> Self {
EventNotifier {
target_list: Arc::new(RwLock::new(TargetList::new())),
bucket_rules_map: Arc::new(DashMap::new()),
bucket_rules_map: Arc::new(AsyncShardedHashMap::new(0)),
}
}
@@ -58,7 +59,7 @@ impl EventNotifier {
/// This method removes all rules associated with the specified bucket name.
/// It will log a message indicating the removal of rules.
pub async fn remove_rules_map(&self, bucket_name: &str) {
if self.bucket_rules_map.remove(bucket_name).is_some() {
if self.bucket_rules_map.remove(&bucket_name.to_string()).await.is_some() {
info!("Removed all notification rules for bucket: {}", bucket_name);
}
}
@@ -76,21 +77,21 @@ impl EventNotifier {
/// Adds a rules map for a bucket
pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) {
if rules_map.is_empty() {
self.bucket_rules_map.remove(bucket_name);
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
} else {
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map);
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map).await;
}
info!("Added rules for bucket: {}", bucket_name);
}
/// Gets the rules map for a specific bucket.
pub fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
self.bucket_rules_map.get(bucket_name).map(|r| r.clone())
pub async fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
self.bucket_rules_map.get(&bucket_name.to_string()).await
}
/// Removes notification rules for a bucket
pub async fn remove_notification(&self, bucket_name: &str) {
self.bucket_rules_map.remove(bucket_name);
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
info!("Removed notification rules for bucket: {}", bucket_name);
}
@@ -113,7 +114,7 @@ impl EventNotifier {
/// Return `true` if at least one matching notification rule exists.
pub async fn has_subscriber(&self, bucket_name: &str, event_name: &EventName) -> bool {
// Rules to check if the bucket exists
if let Some(rules_map) = self.bucket_rules_map.get(bucket_name) {
if let Some(rules_map) = self.bucket_rules_map.get(&bucket_name.to_string()).await {
// A composite event (such as ObjectCreatedAll) is expanded to multiple single events.
// We need to check whether any of these single events have the rules configured.
rules_map.has_subscriber(event_name)
@@ -129,7 +130,7 @@ impl EventNotifier {
let bucket_name = &event.s3.bucket.name;
let object_key = &event.s3.object.key;
let event_name = event.event_name;
if let Some(rules) = self.bucket_rules_map.get(bucket_name) {
if let Some(rules) = self.bucket_rules_map.get(bucket_name).await {
let target_ids = rules.match_rules(event_name, object_key);
if target_ids.is_empty() {
debug!("No matching targets for event in bucket: {}", bucket_name);

View File

@@ -15,13 +15,13 @@
use crate::Event;
use crate::factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory};
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use rustfs_config::notify::NOTIFY_ROUTE_PREFIX;
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::Target;
use rustfs_targets::TargetError;
use rustfs_targets::target::ChannelTargetType;
use std::collections::{HashMap, HashSet};
use tracing::{debug, error, info, warn};
/// Registry for managing target factories

View File

@@ -17,10 +17,10 @@ use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::rules::NotificationConfiguration;
use crate::rules::pattern_rules;
use crate::rules::target_id_set;
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Read;
/// Configuration for bucket notifications.

View File

@@ -14,9 +14,10 @@
use super::pattern;
use super::target_id_set::TargetIdSet;
use hashbrown::HashMap;
use rayon::prelude::*;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// PatternRules - Event rule that maps object name patterns to TargetID collections.
/// `event.Rules` (map[string]TargetIDSet) in the Go code
@@ -43,13 +44,19 @@ impl PatternRules {
/// Returns all TargetIDs that match the object name.
pub fn match_targets(&self, object_name: &str) -> TargetIdSet {
let mut matched_targets = TargetIdSet::new();
for (pattern_str, target_set) in &self.rules {
if pattern::match_simple(pattern_str, object_name) {
matched_targets.extend(target_set.iter().cloned());
}
}
matched_targets
self.rules
.par_iter()
.filter_map(|(pattern_str, target_set)| {
if pattern::match_simple(pattern_str, object_name) {
Some(target_set.iter().cloned().collect::<TargetIdSet>())
} else {
None
}
})
.reduce(TargetIdSet::new, |mut acc, set| {
acc.extend(set);
acc
})
}
pub fn is_empty(&self) -> bool {

View File

@@ -14,10 +14,10 @@
use super::pattern_rules::PatternRules;
use super::target_id_set::TargetIdSet;
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// RulesMap - Rule mapping organized by event name。
/// `event.RulesMap` (map[Name]Rules) in the corresponding Go code

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use hashbrown::HashSet;
use rustfs_targets::arn::TargetID;
use std::collections::HashSet;
/// TargetIDSet - A collection representation of TargetID.
pub type TargetIdSet = HashSet<TargetID>;

View File

@@ -13,10 +13,10 @@
// limitations under the License.
use super::pattern;
use hashbrown::HashSet;
use rustfs_targets::EventName;
use rustfs_targets::arn::{ARN, ArnError, TargetIDError};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::Read;
use thiserror::Error;

View File

@@ -228,7 +228,7 @@ mod tests {
use jsonwebtoken::{Algorithm, DecodingKey, Validation, decode};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Claims {
sub: String,
exp: usize,

View File

@@ -59,7 +59,7 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> std::result::Resu
jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
}
pub fn extract_claims<T: DeserializeOwned>(
pub fn extract_claims<T: DeserializeOwned + Clone>(
token: &str,
secret: &str,
) -> std::result::Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {

View File

@@ -13,7 +13,7 @@ documentation = "https://docs.rs/rustfs-target/latest/rustfs_target/"
[dependencies]
rustfs-config = { workspace = true, features = ["notify", "constants", "audit"] }
rustfs-utils = { workspace = true, features = ["sys"] }
rustfs-utils = { workspace = true, features = ["sys", "notify"] }
async-trait = { workspace = true }
reqwest = { workspace = true }
rumqttc = { workspace = true }

View File

@@ -0,0 +1,70 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Check if MQTT Broker is available
/// # Arguments
/// * `broker_url` - URL of MQTT Broker, for example `mqtt://localhost:1883`
/// * `topic` - Topic for testing connections
/// # Returns
/// * `Ok(())` - If the connection is successful
/// * `Err(String)` - If the connection fails, contains an error message
///
/// # Example
/// ```rust,no_run
/// #[tokio::main]
/// async fn main() {
/// let result = rustfs_targets::check_mqtt_broker_available("mqtt://localhost:1883", "test/topic").await;
/// if result.is_ok() {
/// println!("MQTT Broker is available");
/// } else {
/// println!("MQTT Broker is not available: {}", result.err().unwrap());
/// }
/// }
/// ```
/// # Note
/// Need to add `rumqttc` and `url` dependencies in `Cargo.toml`
/// ```toml
/// [dependencies]
/// rumqttc = "0.25.0"
/// url = "2.5.7"
/// tokio = { version = "1", features = ["full"] }
/// ```
pub async fn check_mqtt_broker_available(broker_url: &str, topic: &str) -> Result<(), String> {
use rumqttc::{AsyncClient, MqttOptions, QoS};
let url = rustfs_utils::parse_url(broker_url).map_err(|e| format!("Broker URL parsing failed:{e}"))?;
let url = url.url();
match url.scheme() {
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" | "tls" | "tcps" => {}
_ => return Err("unsupported broker url scheme".to_string()),
}
let host = url.host_str().ok_or("Broker is missing host")?;
let port = url.port().unwrap_or(1883);
let mut mqtt_options = MqttOptions::new("rustfs_check", host, port);
mqtt_options.set_keep_alive(std::time::Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 1);
// Try to connect and subscribe
client
.subscribe(topic, QoS::AtLeastOnce)
.await
.map_err(|e| format!("MQTT subscription failed:{e}"))?;
// Wait for eventloop to receive at least one event
match tokio::time::timeout(std::time::Duration::from_secs(3), eventloop.poll()).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(format!("MQTT connection failed:{e}")),
Err(_) => Err("MQTT connection timeout".to_string()),
}
}

View File

@@ -13,11 +13,13 @@
// limitations under the License.
pub mod arn;
mod check;
pub mod error;
mod event_name;
pub mod store;
pub mod target;
pub use check::check_mqtt_broker_available;
pub use error::{StoreError, TargetError};
pub use event_name::EventName;
use serde::{Deserialize, Serialize};

View File

@@ -324,7 +324,7 @@ async fn run_mqtt_event_loop(
Ok(Err(e)) => Err(e),
Err(_) => {
debug!(target_id = %target_id, "MQTT poll timed out (EVENT_LOOP_POLL_TIMEOUT) while not connected or status pending.");
Err(rumqttc::ConnectionError::NetworkTimeout)
Err(ConnectionError::NetworkTimeout)
}
}
} else {
@@ -376,7 +376,7 @@ async fn run_mqtt_event_loop(
connected_status.store(false, Ordering::SeqCst);
error!(target_id = %target_id, error = %e, "Error from MQTT event loop poll");
if matches!(e, rumqttc::ConnectionError::NetworkTimeout) && (!initial_connection_established || !connected_status.load(Ordering::SeqCst)) {
if matches!(e, ConnectionError::NetworkTimeout) && (!initial_connection_established || !connected_status.load(Ordering::SeqCst)) {
warn!(target_id = %target_id, "Timeout during initial poll or pending state, will retry.");
continue;
}
@@ -395,8 +395,8 @@ async fn run_mqtt_event_loop(
error!(target_id = %target_id, error = %e, "Fatal MQTT error, terminating event loop.");
break;
}
// rumqttc's eventloop.poll() may return Err and terminate after some errors,
// Or it will handle reconnection internally. The continue here will make select! wait again.
// rumqttc's eventloop.poll() may return Err and terminate after some errors,
// Or it will handle reconnection internally. To continue here will make select! wait again.
// If the error is temporary and rumqttc is handling reconnection, poll() should eventually succeed or return a different error again.
// Sleep briefly to avoid busy cycles in case of rapid failure.
tokio::time::sleep(Duration::from_secs(1)).await;

View File

@@ -29,14 +29,16 @@ base64-simd = { workspace = true, optional = true }
blake3 = { workspace = true, optional = true }
brotli = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
crc32fast = { workspace = true }
crc32fast = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true, optional = true }
hex-simd = { workspace = true, optional = true }
highway = { workspace = true, optional = true }
hickory-resolver = { workspace = true, optional = true }
hmac = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
libc = { workspace = true, optional = true }
local-ip-address = { workspace = true, optional = true }
lz4 = { workspace = true, optional = true }
md-5 = { workspace = true, optional = true }
@@ -53,7 +55,7 @@ s3s = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
sha1 = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
convert_case = "0.8.0"
convert_case = { workspace = true, optional = true }
siphasher = { workspace = true, optional = true }
snap = { workspace = true, optional = true }
sysinfo = { workspace = true, optional = true }
@@ -83,13 +85,13 @@ tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls charac
net = ["ip", "dep:url", "dep:netif", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:hickory-resolver", "dep:moka", "dep:thiserror", "dep:tokio"] # network features with DNS resolver
io = ["dep:tokio"]
path = []
notify = ["dep:hyper", "dep:s3s"] # file system notification features
notify = ["dep:hyper", "dep:s3s", "dep:hashbrown", "dep:thiserror", "dep:serde", "dep:libc"] # file system notification features
compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"]
string = ["dep:regex", "dep:rand"]
crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper", "dep:sha1"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher", "dep:hex-simd", "dep:base64-simd"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher", "dep:hex-simd", "dep:base64-simd", "dep:crc32fast"]
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
integration = [] # integration test features
sys = ["dep:sysinfo"] # system information features
http = []
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify","http"] # all features
http = ["dep:convert_case"]
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify", "http"] # all features

View File

@@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod net;
use hashbrown::HashMap;
use hyper::HeaderMap;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
pub use net::*;
/// Extract request parameters from S3Request, mainly header information.
#[allow(dead_code)]

View File

@@ -0,0 +1,533 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::path::Path;
use std::sync::LazyLock;
use thiserror::Error;
use url::Url;
// Lazy static for the host label regex.
static HOST_LABEL_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?$").unwrap());
/// NetError represents errors that can occur in network operations.
#[derive(Error, Debug)]
pub enum NetError {
#[error("invalid argument")]
InvalidArgument,
#[error("invalid hostname")]
InvalidHost,
#[error("missing '[' in host")]
MissingBracket,
#[error("parse error: {0}")]
ParseError(String),
#[error("unexpected scheme: {0}")]
UnexpectedScheme(String),
#[error("scheme appears with empty host")]
SchemeWithEmptyHost,
}
// Host represents a network host with IP/name and port.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Host {
pub name: String,
pub port: Option<u16>, // Using Option<u16> to represent if port is set, similar to IsPortSet.
}
// Implementation of Host methods.
impl Host {
// is_empty returns true if the host name is empty.
pub fn is_empty(&self) -> bool {
self.name.is_empty()
}
// equal checks if two hosts are equal by comparing their string representations.
pub fn equal(&self, other: &Host) -> bool {
self.to_string() == other.to_string()
}
}
impl std::fmt::Display for Host {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.port {
Some(p) => write!(f, "{}:{}", self.name, p),
None => write!(f, "{}", self.name),
}
}
}
// parse_host parses a string into a Host, with validation similar to Go's ParseHost.
pub fn parse_host(s: &str) -> Result<Host, NetError> {
if s.is_empty() {
return Err(NetError::InvalidArgument);
}
// is_valid_host validates the host string, checking for IP or hostname validity.
let is_valid_host = |host: &str| -> bool {
if host.is_empty() {
return true;
}
if host.parse::<IpAddr>().is_ok() {
return true;
}
if !(1..=253).contains(&host.len()) {
return false;
}
for (i, label) in host.split('.').enumerate() {
if i + 1 == host.split('.').count() && label.is_empty() {
continue;
}
if !(1..=63).contains(&label.len()) || !HOST_LABEL_REGEX.is_match(label) {
return false;
}
}
true
};
// Split host and port, similar to net.SplitHostPort.
let (host_str, port_str) = s.rsplit_once(':').map_or((s, ""), |(h, p)| (h, p));
let port = if !port_str.is_empty() {
Some(port_str.parse().map_err(|_| NetError::ParseError(port_str.to_string()))?)
} else {
None
};
// Trim IPv6 brackets if present.
let host = trim_ipv6(host_str)?;
// Handle IPv6 zone identifier.
let trimmed_host = host.split('%').next().unwrap_or(&host);
if !is_valid_host(trimmed_host) {
return Err(NetError::InvalidHost);
}
Ok(Host { name: host, port })
}
// trim_ipv6 removes square brackets from IPv6 addresses, similar to Go's trimIPv6.
fn trim_ipv6(host: &str) -> Result<String, NetError> {
if host.ends_with(']') {
if !host.starts_with('[') {
return Err(NetError::MissingBracket);
}
Ok(host[1..host.len() - 1].to_string())
} else {
Ok(host.to_string())
}
}
// URL is a wrapper around url::Url for custom handling.
#[derive(Debug, Clone)]
pub struct ParsedURL(pub Url);
impl ParsedURL {
/// is_empty returns true if the URL is empty or "about:blank".
pub fn is_empty(&self) -> bool {
self.0.as_str() == "" || (self.0.scheme() == "about" && self.0.path() == "blank")
}
/// hostname returns the hostname of the URL.
pub fn hostname(&self) -> String {
self.0.host_str().unwrap_or("").to_string()
}
/// port returns the port of the URL as a string, defaulting to "80" for http and "443" for https if not set.
pub fn port(&self) -> String {
match self.0.port() {
Some(p) => p.to_string(),
None => match self.0.scheme() {
"http" => "80".to_string(),
"https" => "443".to_string(),
_ => "".to_string(),
},
}
}
/// scheme returns the scheme of the URL.
pub fn scheme(&self) -> &str {
self.0.scheme()
}
/// url returns a reference to the underlying Url.
pub fn url(&self) -> &Url {
&self.0
}
}
impl std::fmt::Display for ParsedURL {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut url = self.0.clone();
if let Some(host) = url.host_str().map(|h| h.to_string()) {
if let Some(port) = url.port() {
if (url.scheme() == "http" && port == 80) || (url.scheme() == "https" && port == 443) {
url.set_host(Some(&host)).unwrap();
url.set_port(None).unwrap();
}
}
}
let mut s = url.to_string();
// If the URL ends with a slash and the path is just "/", remove the trailing slash.
if s.ends_with('/') && url.path() == "/" {
s.pop();
}
write!(f, "{}", s)
}
}
impl serde::Serialize for ParsedURL {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> serde::Deserialize<'de> for ParsedURL {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = serde::Deserialize::deserialize(deserializer)?;
if s.is_empty() {
Ok(ParsedURL(Url::parse("about:blank").unwrap()))
} else {
parse_url(&s).map_err(serde::de::Error::custom)
}
}
}
// parse_url parses a string into a ParsedURL, with host validation and path cleaning.
pub fn parse_url(s: &str) -> Result<ParsedURL, NetError> {
if let Some(scheme_end) = s.find("://") {
if s[scheme_end + 3..].starts_with('/') {
let scheme = &s[..scheme_end];
if !scheme.is_empty() {
return Err(NetError::SchemeWithEmptyHost);
}
}
}
let mut uu = Url::parse(s).map_err(|e| NetError::ParseError(e.to_string()))?;
if uu.host_str().is_none_or(|h| h.is_empty()) {
if uu.scheme() != "" {
return Err(NetError::SchemeWithEmptyHost);
}
} else {
let port_str = uu.port().map(|p| p.to_string()).unwrap_or_else(|| match uu.scheme() {
"http" => "80".to_string(),
"https" => "443".to_string(),
_ => "".to_string(),
});
if !port_str.is_empty() {
let host_port = format!("{}:{}", uu.host_str().unwrap(), port_str);
parse_host(&host_port)?; // Validate host.
}
}
// Clean path: Use Url's path_segments to normalize.
if !uu.path().is_empty() {
// Url automatically cleans paths, but we ensure trailing slash if original had it.
let mut cleaned_path = String::new();
for comp in Path::new(uu.path()).components() {
use std::path::Component;
match comp {
Component::RootDir => cleaned_path.push('/'),
Component::Normal(s) => {
if !cleaned_path.ends_with('/') {
cleaned_path.push('/');
}
cleaned_path.push_str(&s.to_string_lossy());
}
_ => {}
}
}
if s.ends_with('/') && !cleaned_path.ends_with('/') {
cleaned_path.push('/');
}
if cleaned_path.is_empty() {
cleaned_path.push('/');
}
uu.set_path(&cleaned_path);
}
Ok(ParsedURL(uu))
}
#[allow(dead_code)]
/// parse_http_url parses a string into a ParsedURL, ensuring the scheme is http or https.
pub fn parse_http_url(s: &str) -> Result<ParsedURL, NetError> {
let u = parse_url(s)?;
match u.0.scheme() {
"http" | "https" => Ok(u),
_ => Err(NetError::UnexpectedScheme(u.0.scheme().to_string())),
}
}
#[allow(dead_code)]
/// is_network_or_host_down checks if an error indicates network or host down, considering timeouts.
pub fn is_network_or_host_down(err: &std::io::Error, expect_timeouts: bool) -> bool {
if err.kind() == std::io::ErrorKind::TimedOut {
return !expect_timeouts;
}
// Simplified checks based on Go logic; adapt for Rust as needed
let err_str = err.to_string().to_lowercase();
err_str.contains("connection reset by peer")
|| err_str.contains("connection timed out")
|| err_str.contains("broken pipe")
|| err_str.contains("use of closed network connection")
}
#[allow(dead_code)]
/// is_conn_reset_err checks if an error indicates a connection reset by peer.
pub fn is_conn_reset_err(err: &std::io::Error) -> bool {
err.to_string().contains("connection reset by peer") || matches!(err.raw_os_error(), Some(libc::ECONNRESET))
}
#[allow(dead_code)]
/// is_conn_refused_err checks if an error indicates a connection refused.
pub fn is_conn_refused_err(err: &std::io::Error) -> bool {
err.to_string().contains("connection refused") || matches!(err.raw_os_error(), Some(libc::ECONNREFUSED))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_host_with_empty_string_returns_error() {
let result = parse_host("");
assert!(matches!(result, Err(NetError::InvalidArgument)));
}
#[test]
fn parse_host_with_valid_ipv4() {
let result = parse_host("192.168.1.1:8080");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "192.168.1.1");
assert_eq!(host.port, Some(8080));
}
#[test]
fn parse_host_with_valid_hostname() {
let result = parse_host("example.com:443");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "example.com");
assert_eq!(host.port, Some(443));
}
#[test]
fn parse_host_with_ipv6_brackets() {
let result = parse_host("[::1]:8080");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "::1");
assert_eq!(host.port, Some(8080));
}
#[test]
fn parse_host_with_invalid_ipv6_missing_bracket() {
let result = parse_host("::1]:8080");
assert!(matches!(result, Err(NetError::MissingBracket)));
}
#[test]
fn parse_host_with_invalid_hostname() {
let result = parse_host("invalid..host:80");
assert!(matches!(result, Err(NetError::InvalidHost)));
}
#[test]
fn parse_host_without_port() {
let result = parse_host("example.com");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "example.com");
assert_eq!(host.port, None);
}
#[test]
fn host_is_empty_when_name_is_empty() {
let host = Host {
name: "".to_string(),
port: None,
};
assert!(host.is_empty());
}
#[test]
fn host_is_not_empty_when_name_present() {
let host = Host {
name: "example.com".to_string(),
port: Some(80),
};
assert!(!host.is_empty());
}
#[test]
fn host_to_string_with_port() {
let host = Host {
name: "example.com".to_string(),
port: Some(80),
};
assert_eq!(host.to_string(), "example.com:80");
}
#[test]
fn host_to_string_without_port() {
let host = Host {
name: "example.com".to_string(),
port: None,
};
assert_eq!(host.to_string(), "example.com");
}
#[test]
fn host_equal_when_same() {
let host1 = Host {
name: "example.com".to_string(),
port: Some(80),
};
let host2 = Host {
name: "example.com".to_string(),
port: Some(80),
};
assert!(host1.equal(&host2));
}
#[test]
fn host_not_equal_when_different() {
let host1 = Host {
name: "example.com".to_string(),
port: Some(80),
};
let host2 = Host {
name: "example.com".to_string(),
port: Some(443),
};
assert!(!host1.equal(&host2));
}
#[test]
fn parse_url_with_valid_http_url() {
let result = parse_url("http://example.com/path");
assert!(result.is_ok());
let parsed = result.unwrap();
assert_eq!(parsed.hostname(), "example.com");
assert_eq!(parsed.port(), "80");
}
#[test]
fn parse_url_with_valid_https_url() {
let result = parse_url("https://example.com:443/path");
assert!(result.is_ok());
let parsed = result.unwrap();
assert_eq!(parsed.hostname(), "example.com");
assert_eq!(parsed.port(), "443");
}
#[test]
fn parse_url_with_scheme_but_empty_host() {
let result = parse_url("http:///path");
assert!(matches!(result, Err(NetError::SchemeWithEmptyHost)));
}
#[test]
fn parse_url_with_invalid_host() {
let result = parse_url("http://invalid..host/path");
assert!(matches!(result, Err(NetError::InvalidHost)));
}
#[test]
fn parse_url_with_path_cleaning() {
let result = parse_url("http://example.com//path/../path/");
assert!(result.is_ok());
let parsed = result.unwrap();
assert_eq!(parsed.0.path(), "/path/");
}
#[test]
fn parse_http_url_with_http_scheme() {
let result = parse_http_url("http://example.com");
assert!(result.is_ok());
}
#[test]
fn parse_http_url_with_https_scheme() {
let result = parse_http_url("https://example.com");
assert!(result.is_ok());
}
#[test]
fn parse_http_url_with_invalid_scheme() {
let result = parse_http_url("ftp://example.com");
assert!(matches!(result, Err(NetError::UnexpectedScheme(_))));
}
#[test]
fn parsed_url_is_empty_when_url_is_empty() {
let url = ParsedURL(Url::parse("about:blank").unwrap());
assert!(url.is_empty());
}
#[test]
fn parsed_url_hostname() {
let url = ParsedURL(Url::parse("http://example.com:8080").unwrap());
assert_eq!(url.hostname(), "example.com");
}
#[test]
fn parsed_url_port() {
let url = ParsedURL(Url::parse("http://example.com:8080").unwrap());
assert_eq!(url.port(), "8080");
}
#[test]
fn parsed_url_to_string_removes_default_ports() {
let url = ParsedURL(Url::parse("http://example.com:80").unwrap());
assert_eq!(url.to_string(), "http://example.com");
}
#[test]
fn is_network_or_host_down_with_timeout() {
let err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
assert!(is_network_or_host_down(&err, false));
}
#[test]
fn is_network_or_host_down_with_expected_timeout() {
let err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
assert!(!is_network_or_host_down(&err, true));
}
#[test]
fn is_conn_reset_err_with_reset_message() {
let err = std::io::Error::other("connection reset by peer");
assert!(is_conn_reset_err(&err));
}
#[test]
fn is_conn_refused_err_with_refused_message() {
let err = std::io::Error::other("connection refused");
assert!(is_conn_refused_err(&err));
}
}

0
deploy/data/dev/.gitkeep Normal file
View File

0
deploy/data/pro/.gitkeep Normal file
View File

View File

@@ -30,7 +30,7 @@ services:
- "9000:9000" # S3 API port
- "9001:9001" # Console port
environment:
- RUSTFS_VOLUMES=/data/rustfs0,/data/rustfs1,/data/rustfs2,/data/rustfs3
- RUSTFS_VOLUMES=/data/rustfs{0..3} # Define 4 storage volumes
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_CONSOLE_ENABLE=true
@@ -43,12 +43,9 @@ services:
- RUSTFS_TLS_PATH=/opt/tls
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4317
volumes:
- rustfs_data_0:/data/rustfs0
- rustfs_data_1:/data/rustfs1
- rustfs_data_2:/data/rustfs2
- rustfs_data_3:/data/rustfs3
- logs_data:/app/logs
- .docker/tls/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
- deploy/data/pro:/data
- deploy/logs:/app/logs
- deploy/data/certs/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
networks:
- rustfs-network
restart: unless-stopped
@@ -78,7 +75,7 @@ services:
- "9010:9000" # S3 API port
- "9011:9001" # Console port
environment:
- RUSTFS_VOLUMES=/data/rustfs0,/data/rustfs1
- RUSTFS_VOLUMES=/data/rustfs{1..4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_CONSOLE_ENABLE=true
@@ -90,7 +87,7 @@ services:
- RUSTFS_LOG_LEVEL=debug
volumes:
- .:/app # Mount source code to /app for development
- rustfs_dev_data:/data
- deploy/data/dev:/data
networks:
- rustfs-network
restart: unless-stopped
@@ -98,7 +95,7 @@ services:
test:
[
"CMD",
"sh", "-c",
"sh", "-c",
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
]
interval: 30s
@@ -239,5 +236,5 @@ volumes:
driver: local
redis_data:
driver: local
logs_data:
logs:
driver: local

60
docs/examples/README.md Normal file
View File

@@ -0,0 +1,60 @@
# RustFS Deployment Examples
This directory contains practical deployment examples and configurations for RustFS.
## Available Examples
### [MNMD (Multi-Node Multi-Drive)](./mnmd/)
Complete Docker Compose example for deploying RustFS in a 4-node, 4-drive-per-node configuration.
**Features:**
- Proper disk indexing (1..4) to avoid VolumeNotFound errors
- Startup coordination via `wait-and-start.sh` script
- Service discovery using Docker service names
- Health checks with alternatives for different base images
- Comprehensive documentation and verification checklist
**Use Case:** Production-ready multi-node deployment for high availability and performance.
**Quick Start:**
```bash
cd docs/examples/mnmd
docker-compose up -d
```
**See also:**
- [MNMD README](./mnmd/README.md) - Detailed usage guide
- [MNMD CHECKLIST](./mnmd/CHECKLIST.md) - Step-by-step verification
## Other Deployment Examples
For additional deployment examples, see:
- [`examples/`](/examples/) - Root-level examples directory with:
- `docker-quickstart.sh` - Quick start script for basic deployments
- `enhanced-docker-deployment.sh` - Advanced deployment scenarios
- `docker-comprehensive.yml` - Docker Compose with multiple profiles
- [`.docker/compose/`](/.docker/compose/) - Docker Compose configurations:
- `docker-compose.cluster.yaml` - Basic cluster setup
- `docker-compose.observability.yaml` - Observability stack integration
## Related Documentation
- [Console & Endpoint Service Separation](../console-separation.md)
- [Environment Variables](../ENVIRONMENT_VARIABLES.md)
- [Performance Testing](../PERFORMANCE_TESTING.md)
## Contributing
When adding new examples:
1. Create a dedicated subdirectory under `docs/examples/`
2. Include a comprehensive README.md
3. Provide working configuration files
4. Add verification steps or checklists
5. Document common issues and troubleshooting
## Support
For issues or questions:
- GitHub Issues: https://github.com/rustfs/rustfs/issues
- Documentation: https://rustfs.io

View File

@@ -0,0 +1,329 @@
# MNMD Deployment Checklist
This checklist provides step-by-step verification for deploying RustFS in MNMD (Multi-Node Multi-Drive) mode using
Docker.
## Pre-Deployment Checks
### 1. System Requirements
- [ ] Docker Engine 20.10+ installed
- [ ] Docker Compose 2.0+ installed
- [ ] At least 8GB RAM available
- [ ] At least 40GB disk space available (for 4 nodes × 4 volumes)
Verify with:
```bash
docker --version
docker-compose --version
free -h
df -h
```
### 2. File System Checks
- [ ] Using XFS, ext4, or another suitable filesystem (not NFS for production)
- [ ] File system supports extended attributes
Verify with:
```bash
df -T | grep -E '(xfs|ext4)'
```
### 3. Permissions and SELinux
- [ ] Current user is in `docker` group or can run `sudo docker`
- [ ] SELinux is properly configured (if enabled)
Verify with:
```bash
groups | grep docker
getenforce # If enabled, should show "Permissive" or "Enforcing" with proper policies
```
### 4. Network Configuration
- [ ] Ports 9000-9031 are available
- [ ] No firewall blocking Docker bridge network
Verify with:
```bash
# Check if ports are free
netstat -tuln | grep -E ':(9000|9001|9010|9011|9020|9021|9030|9031)'
# Should return nothing if ports are free
```
### 5. Files Present
- [ ] `docker-compose.yml` exists in current directory
Verify with:
```bash
cd docs/examples/mnmd
ls -la
chmod +x wait-and-start.sh # If needed
```
## Deployment Steps
### 1. Start the Cluster
- [ ] Navigate to the example directory
- [ ] Pull the latest RustFS image
- [ ] Start the cluster
```bash
cd docs/examples/mnmd
docker-compose pull
docker-compose up -d
```
### 2. Monitor Startup
- [ ] Watch container logs during startup
- [ ] Verify no VolumeNotFound errors
- [ ] Check that peer discovery completes
```bash
# Watch all logs
docker-compose logs -f
# Watch specific node
docker-compose logs -f rustfs-node1
# Look for successful startup messages
docker-compose logs | grep -i "ready\|listening\|started"
```
### 3. Verify Container Status
- [ ] All 4 containers are running
- [ ] All 4 containers show as healthy
```bash
docker-compose ps
# Expected output: 4 containers in "Up" state with "healthy" status
```
### 4. Check Health Endpoints
- [ ] API health endpoints respond on all nodes
- [ ] Console health endpoints respond on all nodes
```bash
# Test API endpoints
curl http://localhost:9000/health
curl http://localhost:9010/health
curl http://localhost:9020/health
curl http://localhost:9030/health
# Test Console endpoints
curl http://localhost:9001/health
curl http://localhost:9011/health
curl http://localhost:9021/health
curl http://localhost:9031/health
# All should return successful health status
```
## Post-Deployment Verification
### 1. In-Container Checks
- [ ] Data directories exist
- [ ] Directories have correct permissions
- [ ] RustFS process is running
```bash
# Check node1
docker exec rustfs-node1 ls -la /data/
docker exec rustfs-node1 ps aux | grep rustfs
# Verify all 4 data directories exist
docker exec rustfs-node1 ls -d /data/rustfs{1..4}
```
### 2. DNS and Network Validation
- [ ] Service names resolve correctly
- [ ] Inter-node connectivity works
```bash
# DNS resolution test
docker exec rustfs-node1 nslookup rustfs-node2
docker exec rustfs-node1 nslookup rustfs-node3
docker exec rustfs-node1 nslookup rustfs-node4
# Connectivity test (using nc if available)
docker exec rustfs-node1 nc -zv rustfs-node2 9000
docker exec rustfs-node1 nc -zv rustfs-node3 9000
docker exec rustfs-node1 nc -zv rustfs-node4 9000
# Or using telnet/curl
docker exec rustfs-node1 curl -v http://rustfs-node2:9000/health
```
### 3. Volume Configuration Validation
- [ ] RUSTFS_VOLUMES environment variable is correct
- [ ] All 16 endpoints are configured (4 nodes × 4 drives)
```bash
# Check environment variable
docker exec rustfs-node1 env | grep RUSTFS_VOLUMES
# Expected output:
# RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
```
### 4. Cluster Functionality
- [ ] Can list buckets via API
- [ ] Can create a bucket
- [ ] Can upload an object
- [ ] Can download an object
```bash
# Configure AWS CLI or s3cmd
export AWS_ACCESS_KEY_ID=rustfsadmin
export AWS_SECRET_ACCESS_KEY=rustfsadmin
# Using AWS CLI (if installed)
aws --endpoint-url http://localhost:9000 s3 mb s3://test-bucket
aws --endpoint-url http://localhost:9000 s3 ls
echo "test content" > test.txt
aws --endpoint-url http://localhost:9000 s3 cp test.txt s3://test-bucket/
aws --endpoint-url http://localhost:9000 s3 ls s3://test-bucket/
aws --endpoint-url http://localhost:9000 s3 cp s3://test-bucket/test.txt downloaded.txt
cat downloaded.txt
# Or using curl
curl -X PUT http://localhost:9000/test-bucket \
-H "Host: localhost:9000" \
--user rustfsadmin:rustfsadmin
```
### 5. Healthcheck Verification
- [ ] Docker reports all services as healthy
- [ ] Healthcheck scripts work in containers
```bash
# Check Docker health status
docker inspect rustfs-node1 --format='{{.State.Health.Status}}'
docker inspect rustfs-node2 --format='{{.State.Health.Status}}'
docker inspect rustfs-node3 --format='{{.State.Health.Status}}'
docker inspect rustfs-node4 --format='{{.State.Health.Status}}'
# All should return "healthy"
# Test healthcheck command manually
docker exec rustfs-node1 nc -z localhost 9000
echo $? # Should be 0
```
## Troubleshooting Checks
### If VolumeNotFound Error Occurs
- [ ] Verify volume indexing starts at 1, not 0
- [ ] Check that RUSTFS_VOLUMES matches mounted paths
- [ ] Ensure all /data/rustfs{1..4} directories exist
```bash
# Check mounted volumes
docker inspect rustfs-node1 | jq '.[].Mounts'
# Verify directories in container
docker exec rustfs-node1 ls -la /data/
```
### If Healthcheck Fails
- [ ] Check if `nc` is available in the image
- [ ] Try alternative healthcheck (curl/wget)
- [ ] Increase `start_period` in docker-compose.yml
```bash
# Check if nc is available
docker exec rustfs-node1 which nc
# Test healthcheck manually
docker exec rustfs-node1 nc -z localhost 9000
# Check logs for errors
docker-compose logs rustfs-node1 | grep -i error
```
### If Startup Takes Too Long
- [ ] Check peer discovery timeout in logs
- [ ] Verify network connectivity between nodes
- [ ] Consider increasing timeout in wait-and-start.sh
```bash
# Check startup logs
docker-compose logs rustfs-node1 | grep -i "waiting\|peer\|timeout"
# Check network
docker network inspect mnmd_rustfs-mnmd
```
### If Containers Crash or Restart
- [ ] Review container logs
- [ ] Check resource usage (CPU/Memory)
- [ ] Verify no port conflicts
```bash
# View last crash logs
docker-compose logs --tail=100 rustfs-node1
# Check resource usage
docker stats --no-stream
# Check restart count
docker-compose ps
```
## Cleanup Checklist
When done testing:
- [ ] Stop the cluster: `docker-compose down`
- [ ] Remove volumes (optional, destroys data): `docker-compose down -v`
- [ ] Clean up dangling images: `docker image prune`
- [ ] Verify ports are released: `netstat -tuln | grep -E ':(9000|9001|9010|9011|9020|9021|9030|9031)'`
## Production Deployment Additional Checks
Before deploying to production:
- [ ] Change default credentials (RUSTFS_ACCESS_KEY, RUSTFS_SECRET_KEY)
- [ ] Configure TLS certificates
- [ ] Set up proper logging and monitoring
- [ ] Configure backups for volumes
- [ ] Review and adjust resource limits
- [ ] Set up external load balancer (if needed)
- [ ] Document disaster recovery procedures
- [ ] Test failover scenarios
- [ ] Verify data persistence after container restart
## Summary
This checklist ensures:
- ✓ Correct disk indexing (1..4 instead of 0..3)
- ✓ Proper startup coordination via wait-and-start.sh
- ✓ Service discovery via Docker service names
- ✓ Health checks function correctly
- ✓ All 16 endpoints (4 nodes × 4 drives) are operational
- ✓ No VolumeNotFound errors occur
For more details, see [README.md](./README.md) in this directory.

View File

@@ -0,0 +1,268 @@
# RustFS MNMD (Multi-Node Multi-Drive) Docker Example
This directory contains a complete, ready-to-use MNMD deployment example for RustFS with 4 nodes and 4 drives per node (
4x4 configuration).
## Overview
This example addresses common deployment issues including:
- **VolumeNotFound errors** - Fixed by using correct disk indexing (`/data/rustfs{1...4}` instead of
`/data/rustfs{0...3}`)
- **Startup race conditions** - Solved with a simple `sleep` command in each service.
- **Service discovery** - Uses Docker service names (`rustfs-node{1..4}`) instead of hard-coded IPs
- **Health checks** - Implements proper health monitoring with `nc` (with alternatives documented)
## Quick Start
From this directory (`docs/examples/mnmd`), run:
```bash
# Start the cluster
docker-compose up -d
# Check the status
docker-compose ps
# View logs
docker-compose logs -f
# Test the deployment
curl http://localhost:9000/health
curl http://localhost:9001/health
# Run comprehensive tests
./test-deployment.sh
# Stop the cluster
docker-compose down
# Clean up volumes (WARNING: deletes all data)
docker-compose down -v
```
## Configuration Details
### Volume Configuration
The example uses the following volume configuration:
```bash
RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
```
This expands to 16 endpoints (4 nodes × 4 drives):
- Node 1: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
- Node 2: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
- Node 3: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
- Node 4: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
**Important:** Disk indexing starts at 1 to match the mounted paths (`/data/rustfs1..4`).
### Port Mappings
| Node | API Port | Console Port |
|-------|----------|--------------|
| node1 | 9000 | 9001 |
| node2 | 9010 | 9011 |
| node3 | 9020 | 9021 |
| node4 | 9030 | 9031 |
### Startup Coordination
To prevent race conditions during startup where nodes might not find each other, a simple `sleep 3` command is added to
each service's command. This provides a brief delay, allowing the network and other services to initialize before RustFS
starts. For more complex scenarios, a more robust health-check dependency or an external entrypoint script might be
required.
### Health Checks
Default health check using `nc` (netcat):
```yaml
healthcheck:
test: [ "CMD-SHELL", "nc -z localhost 9000 || exit 1" ]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
```
#### Alternative Health Checks
If your base image lacks `nc`, use one of these alternatives:
**Using curl:**
```yaml
healthcheck:
test: [ "CMD-SHELL", "curl -f http://localhost:9000/health || exit 1" ]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
```
**Using wget:**
```yaml
healthcheck:
test: [ "CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9000/health || exit 1" ]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
```
### Brace Expansion Alternatives
If your Docker Compose runtime doesn't support brace expansion (`{1...4}`), replace with explicit endpoints:
```yaml
environment:
- RUSTFS_VOLUMES=http://rustfs-node1:9000/data/rustfs1,http://rustfs-node1:9000/data/rustfs2,http://rustfs-node1:9000/data/rustfs3,http://rustfs-node1:9000/data/rustfs4,http://rustfs-node2:9000/data/rustfs1,http://rustfs-node2:9000/data/rustfs2,http://rustfs-node2:9000/data/rustfs3,http://rustfs-node2:9000/data/rustfs4,http://rustfs-node3:9000/data/rustfs1,http://rustfs-node3:9000/data/rustfs2,http://rustfs-node3:9000/data/rustfs3,http://rustfs-node3:9000/data/rustfs4,http://rustfs-node4:9000/data/rustfs1,http://rustfs-node4:9000/data/rustfs2,http://rustfs-node4:9000/data/rustfs3,http://rustfs-node4:9000/data/rustfs4
```
## Using RUSTFS_CMD
The `RUSTFS_CMD` environment variable provides a fallback when no command is specified:
```yaml
environment:
- RUSTFS_CMD=rustfs # Default fallback command
```
This allows the entrypoint to execute the correct command when Docker doesn't provide one.
## Testing the Deployment
After starting the cluster, verify it's working:
### Automated Testing
Use the provided test script for comprehensive validation:
```bash
./test-deployment.sh
```
This script tests:
- Container status (4/4 running)
- Health checks (4/4 healthy)
- API endpoints (4 ports)
- Console endpoints (4 ports)
- Inter-node connectivity
- Data directory existence
### Manual Testing
For manual verification:
```bash
# 1. Check all containers are healthy
docker-compose ps
# 2. Test API endpoints
for port in 9000 9010 9020 9030; do
echo "Testing port $port..."
curl -s http://localhost:${port}/health | jq '.'
done
# 3. Test console endpoints
for port in 9001 9011 9021 9031; do
echo "Testing console port $port..."
curl -s http://localhost:${port}/health | jq '.'
done
# 4. Check inter-node connectivity
docker exec rustfs-node1 nc -zv rustfs-node2 9000
docker exec rustfs-node1 nc -zv rustfs-node3 9000
docker exec rustfs-node1 nc -zv rustfs-node4 9000
```
## Troubleshooting
### VolumeNotFound Error
**Symptom:** Error message about `/data/rustfs0` not found.
**Solution:** This example uses `/data/rustfs{1...4}` indexing to match the mounted Docker volumes. Ensure your
`RUSTFS_VOLUMES` configuration starts at index 1, not 0.
### Health Check Failures
**Symptom:** Containers show as unhealthy.
**Solutions:**
1. Check if `nc` is available: `docker exec rustfs-node1 which nc`
2. Use alternative health checks (curl/wget) as documented above
3. Increase `start_period` if nodes need more time to initialize
### Startup Timeouts
**Symptom:** Services timeout waiting for peers.
**Solutions:**
1. Check logs: `docker-compose logs rustfs-node1`
2. Verify network connectivity: `docker-compose exec rustfs-node1 ping rustfs-node2`
3. Consider increasing the `sleep` duration in the `docker-compose.yml` `command` directive if a longer delay is needed.
### Permission Issues
**Symptom:** Cannot create directories or write data.
**Solution:** Ensure volumes have correct permissions or set `RUSTFS_UID` and `RUSTFS_GID` environment variables.
## Advanced Configuration
### Custom Credentials
Replace default credentials in production:
```yaml
environment:
- RUSTFS_ACCESS_KEY=your_access_key
- RUSTFS_SECRET_KEY=your_secret_key
```
### TLS Configuration
Add TLS certificates:
```yaml
volumes:
- ./certs:/opt/tls:ro
environment:
- RUSTFS_TLS_PATH=/opt/tls
```
### Resource Limits
Add resource constraints:
```yaml
deploy:
resources:
limits:
cpus: '2'
memory: 4G
reservations:
cpus: '1'
memory: 2G
```
## See Also
- [CHECKLIST.md](./CHECKLIST.md) - Step-by-step verification guide
- [../../console-separation.md](../../console-separation.md) - Console & endpoint service separation guide
- [../../../examples/docker-comprehensive.yml](../../../examples/docker-comprehensive.yml) - More deployment examples
- [Issue #618](https://github.com/rustfs/rustfs/issues/618) - Original VolumeNotFound issue
## References
- RustFS Documentation: https://rustfs.io
- Docker Compose Documentation: https://docs.docker.com/compose/

View File

@@ -0,0 +1,182 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# MNMD (Multi-Node Multi-Drive) Docker Compose Example
# 4 nodes x 4 drives configuration
# This example demonstrates a complete, ready-to-use MNMD deployment
# addressing startup coordination and VolumeNotFound issues.
services:
rustfs-node1:
image: rustfs/rustfs:latest
container_name: rustfs-node1
hostname: rustfs-node1
environment:
# Use service names and correct disk indexing (1..4 to match mounted paths)
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_CMD=rustfs
ports:
- "9000:9000" # API endpoint
- "9001:9001" # Console
volumes:
- node1-data1:/data/rustfs1
- node1-data2:/data/rustfs2
- node1-data3:/data/rustfs3
- node1-data4:/data/rustfs4
command: [ "sh", "-c", "sleep 3 && rustfs" ]
healthcheck:
test:
[
"CMD",
"sh", "-c",
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
networks:
- rustfs-mnmd
rustfs-node2:
image: rustfs/rustfs:latest
container_name: rustfs-node2
hostname: rustfs-node2
environment:
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_CMD=rustfs
ports:
- "9010:9000" # API endpoint
- "9011:9001" # Console
volumes:
- node2-data1:/data/rustfs1
- node2-data2:/data/rustfs2
- node2-data3:/data/rustfs3
- node2-data4:/data/rustfs4
command: [ "sh", "-c", "sleep 3 && rustfs" ]
healthcheck:
test:
[
"CMD",
"sh", "-c",
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
networks:
- rustfs-mnmd
rustfs-node3:
image: rustfs/rustfs:latest
container_name: rustfs-node3
hostname: rustfs-node3
environment:
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_CMD=rustfs
ports:
- "9020:9000" # API endpoint
- "9021:9001" # Console
volumes:
- node3-data1:/data/rustfs1
- node3-data2:/data/rustfs2
- node3-data3:/data/rustfs3
- node3-data4:/data/rustfs4
command: [ "sh", "-c", "sleep 3 && rustfs" ]
healthcheck:
test:
[
"CMD",
"sh", "-c",
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
networks:
- rustfs-mnmd
rustfs-node4:
image: rustfs/rustfs:latest
container_name: rustfs-node4
hostname: rustfs-node4
environment:
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_CMD=rustfs
ports:
- "9030:9000" # API endpoint
- "9031:9001" # Console
volumes:
- node4-data1:/data/rustfs1
- node4-data2:/data/rustfs2
- node4-data3:/data/rustfs3
- node4-data4:/data/rustfs4
command: [ "sh", "-c", "sleep 3 && rustfs" ]
healthcheck:
test:
[
"CMD",
"sh", "-c",
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
]
interval: 10s
timeout: 5s
retries: 3
start_period: 30s
networks:
- rustfs-mnmd
networks:
rustfs-mnmd:
driver: bridge
volumes:
node1-data1:
node1-data2:
node1-data3:
node1-data4:
node2-data1:
node2-data2:
node2-data3:
node2-data4:
node3-data1:
node3-data2:
node3-data3:
node3-data4:
node4-data1:
node4-data2:
node4-data3:
node4-data4:

View File

@@ -0,0 +1,172 @@
#!/bin/bash
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# test-deployment.sh - Quick test script for MNMD deployment
# Usage: ./test-deployment.sh
set -e
# Colors for output
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo "========================================="
echo "RustFS MNMD Deployment Test"
echo "========================================="
echo ""
# Test 1: Check if all containers are running
echo "Test 1: Checking container status..."
RUNNING=$(docker-compose ps | grep -c "Up" || echo "0")
if [ "$RUNNING" -eq 4 ]; then
echo -e "${GREEN}✓ All 4 containers are running${NC}"
else
echo -e "${RED}✗ Only $RUNNING/4 containers are running${NC}"
docker-compose ps
exit 1
fi
echo ""
# Test 2: Check health status
echo "Test 2: Checking health status..."
HEALTHY=0
for node in rustfs-node1 rustfs-node2 rustfs-node3 rustfs-node4; do
STATUS=$(docker inspect "$node" --format='{{.State.Health.Status}}' 2>/dev/null || echo "unknown")
if [ "$STATUS" = "healthy" ]; then
echo -e " ${GREEN}$node is healthy${NC}"
HEALTHY=$((HEALTHY + 1))
elif [ "$STATUS" = "starting" ]; then
echo -e " ${YELLOW}$node is starting (wait a moment)${NC}"
else
echo -e " ${RED}$node status: $STATUS${NC}"
fi
done
if [ "$HEALTHY" -eq 4 ]; then
echo -e "${GREEN}✓ All containers are healthy${NC}"
elif [ "$HEALTHY" -gt 0 ]; then
echo -e "${YELLOW}$HEALTHY/4 containers are healthy (some may still be starting)${NC}"
else
echo -e "${RED}✗ No containers are healthy${NC}"
exit 1
fi
echo ""
# Test 3: Check API endpoints
echo "Test 3: Testing API endpoints..."
PORTS=(9000 9010 9020 9030)
API_SUCCESS=0
for port in "${PORTS[@]}"; do
if curl -sf http://localhost:${port}/health >/dev/null 2>&1; then
echo -e " ${GREEN}✓ API on port $port is responding${NC}"
API_SUCCESS=$((API_SUCCESS + 1))
else
echo -e " ${RED}✗ API on port $port is not responding${NC}"
fi
done
if [ "$API_SUCCESS" -eq 4 ]; then
echo -e "${GREEN}✓ All API endpoints are working${NC}"
else
echo -e "${YELLOW}$API_SUCCESS/4 API endpoints are working${NC}"
fi
echo ""
# Test 4: Check Console endpoints
echo "Test 4: Testing Console endpoints..."
CONSOLE_PORTS=(9001 9011 9021 9031)
CONSOLE_SUCCESS=0
for port in "${CONSOLE_PORTS[@]}"; do
if curl -sf http://localhost:${port}/health >/dev/null 2>&1; then
echo -e " ${GREEN}✓ Console on port $port is responding${NC}"
CONSOLE_SUCCESS=$((CONSOLE_SUCCESS + 1))
else
echo -e " ${RED}✗ Console on port $port is not responding${NC}"
fi
done
if [ "$CONSOLE_SUCCESS" -eq 4 ]; then
echo -e "${GREEN}✓ All Console endpoints are working${NC}"
else
echo -e "${YELLOW}$CONSOLE_SUCCESS/4 Console endpoints are working${NC}"
fi
echo ""
# Test 5: Check inter-node connectivity
echo "Test 5: Testing inter-node connectivity..."
CONN_SUCCESS=0
for node in rustfs-node2 rustfs-node3 rustfs-node4; do
if docker exec rustfs-node1 nc -z "$node" 9000 2>/dev/null; then
echo -e " ${GREEN}✓ node1 → $node connection OK${NC}"
CONN_SUCCESS=$((CONN_SUCCESS + 1))
else
echo -e " ${RED}✗ node1 → $node connection failed${NC}"
fi
done
if [ "$CONN_SUCCESS" -eq 3 ]; then
echo -e "${GREEN}✓ All inter-node connections are working${NC}"
else
echo -e "${YELLOW}$CONN_SUCCESS/3 inter-node connections are working${NC}"
fi
echo ""
# Test 6: Verify data directories
echo "Test 6: Verifying data directories..."
DIR_SUCCESS=0
for i in {1..4}; do
if docker exec rustfs-node1 test -d "/data/rustfs${i}"; then
DIR_SUCCESS=$((DIR_SUCCESS + 1))
else
echo -e " ${RED}✗ /data/rustfs${i} not found in node1${NC}"
fi
done
if [ "$DIR_SUCCESS" -eq 4 ]; then
echo -e "${GREEN}✓ All data directories exist${NC}"
else
echo -e "${RED}✗ Only $DIR_SUCCESS/4 data directories exist${NC}"
fi
echo ""
# Summary
echo "========================================="
echo "Test Summary"
echo "========================================="
echo "Containers running: $RUNNING/4"
echo "Healthy containers: $HEALTHY/4"
echo "API endpoints: $API_SUCCESS/4"
echo "Console endpoints: $CONSOLE_SUCCESS/4"
echo "Inter-node connections: $CONN_SUCCESS/3"
echo "Data directories: $DIR_SUCCESS/4"
echo ""
TOTAL=$((RUNNING + HEALTHY + API_SUCCESS + CONSOLE_SUCCESS + CONN_SUCCESS + DIR_SUCCESS))
MAX_SCORE=23
if [ "$TOTAL" -eq "$MAX_SCORE" ]; then
echo -e "${GREEN}✓ All tests passed! Deployment is working correctly.${NC}"
exit 0
elif [ "$TOTAL" -ge 20 ]; then
echo -e "${YELLOW}⚠ Most tests passed. Some components may still be starting up.${NC}"
echo " Try running this script again in a few moments."
exit 0
else
echo -e "${RED}✗ Some tests failed. Check the output above and logs for details.${NC}"
echo " Run 'docker-compose logs' for more information."
exit 1
fi

View File

@@ -78,7 +78,6 @@ matchit = { workspace = true }
md5.workspace = true
mime_guess = { workspace = true }
opentelemetry = { workspace = true }
percent-encoding = { workspace = true }
pin-project-lite.workspace = true
reqwest = { workspace = true }
rustls = { workspace = true }

View File

@@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::admin::router::Operation;
use crate::auth::{check_key_valid, get_session_token};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use rustfs_config::{ENABLE_KEY, EnableState};
use rustfs_notify::rules::{BucketNotificationConfig, PatternRules};
use rustfs_targets::EventName;
use rustfs_targets::check_mqtt_broker_available;
use s3s::header::CONTENT_LENGTH;
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use serde::{Deserialize, Serialize};
use serde_urlencoded::from_bytes;
use std::collections::HashMap;
use std::future::Future;
use std::io::{Error, ErrorKind};
use std::net::SocketAddr;
@@ -36,12 +31,6 @@ use tokio::time::{Duration, sleep};
use tracing::{debug, error, info, warn};
use url::Url;
#[derive(Debug, Deserialize)]
struct BucketQuery {
#[serde(rename = "bucketName")]
bucket_name: String,
}
#[derive(Debug, Deserialize)]
pub struct KeyValue {
pub key: String,
@@ -177,6 +166,7 @@ impl Operation for NotificationTarget {
let mut client_cert_val = None;
let mut client_key_val = None;
let mut qos_val = None;
let mut topic_val = String::new();
for kv in notification_body.key_values.iter() {
if !allowed_keys.contains(kv.key.as_str()) {
@@ -190,6 +180,16 @@ impl Operation for NotificationTarget {
if kv.key == "endpoint" {
endpoint_val = Some(kv.value.clone());
}
if target_type == NOTIFY_MQTT_SUB_SYS {
if kv.key == rustfs_config::MQTT_BROKER {
endpoint_val = Some(kv.value.clone());
}
if kv.key == rustfs_config::MQTT_TOPIC {
topic_val = kv.value.clone();
}
}
if kv.key == "queue_dir" {
queue_dir_val = Some(kv.value.clone());
}
@@ -236,12 +236,15 @@ impl Operation for NotificationTarget {
}
if target_type == NOTIFY_MQTT_SUB_SYS {
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?;
let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
match url.scheme() {
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {}
_ => return Err(s3_error!(InvalidArgument, "unsupported broker url scheme")),
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?;
if topic_val.is_empty() {
return Err(s3_error!(InvalidArgument, "topic is required"));
}
// Check MQTT Broker availability
if let Err(e) = check_mqtt_broker_available(&endpoint, &topic_val).await {
return Err(s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e));
}
if let Some(queue_dir) = queue_dir_val {
validate_queue_dir(&queue_dir).await?;
if let Some(qos) = qos_val {
@@ -420,113 +423,3 @@ fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, &
let target_name = extract_param(params, "target_name")?;
Ok((target_type, target_name))
}
/// Set notification rules for buckets
pub struct SetBucketNotification {}
#[async_trait::async_trait]
impl Operation for SetBucketNotification {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// 1. Analyze query parameters
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
// 2. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
// 4. The parsing request body is BucketNotificationConfig
let mut input = req.input;
let body = input.store_all_unlimited().await.map_err(|e| {
warn!("failed to read request body: {:?}", e);
s3_error!(InvalidRequest, "failed to read request body")
})?;
let config: BucketNotificationConfig = serde_json::from_slice(&body)
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for bucket notification config: {}", e))?;
// 5. Load bucket notification configuration
info!("Loading notification config for bucket '{}'", &query.bucket_name);
ns.load_bucket_notification_config(&query.bucket_name, &config)
.await
.map_err(|e| {
error!("failed to load bucket notification config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to load bucket notification config: {e}"))
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
/// Get notification rules for buckets
#[derive(Serialize)]
struct BucketRulesResponse {
rules: HashMap<EventName, PatternRules>,
}
pub struct GetBucketNotification {}
#[async_trait::async_trait]
impl Operation for GetBucketNotification {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
let rules_map = ns.notifier.get_rules_map(&query.bucket_name);
let response = BucketRulesResponse {
rules: rules_map.unwrap_or_default().inner().clone(),
};
let data = serde_json::to_vec(&response)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize rules: {e}")))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
/// Remove all notification rules for a bucket
pub struct RemoveBucketNotification {}
#[async_trait::async_trait]
impl Operation for RemoveBucketNotification {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
info!("Removing notification config for bucket '{}'", &query.bucket_name);
ns.remove_bucket_notification_config(&query.bucket_name).await;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}

View File

@@ -23,10 +23,7 @@ pub mod utils;
use handlers::{
GetReplicationMetricsHandler, HealthCheckHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler,
bucket_meta,
event::{
GetBucketNotification, ListNotificationTargets, NotificationTarget, RemoveBucketNotification, RemoveNotificationTarget,
SetBucketNotification,
},
event::{ListNotificationTargets, ListTargetsArns, NotificationTarget, RemoveNotificationTarget},
group, kms, kms_dynamic, kms_keys, policies, pools, rebalance,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
sts, tier, user,
@@ -519,25 +516,7 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/target/arns").as_str(),
AdminOperation(&ListNotificationTargets {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-set-bucket").as_str(),
AdminOperation(&SetBucketNotification {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-get-bucket").as_str(),
AdminOperation(&GetBucketNotification {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-remove-bucket").as_str(),
AdminOperation(&RemoveBucketNotification {}),
AdminOperation(&ListTargetsArns {}),
)?;
Ok(())

View File

@@ -92,6 +92,10 @@ where
T: Operation,
{
fn is_match(&self, method: &Method, uri: &Uri, headers: &HeaderMap, _: &mut Extensions) -> bool {
if method == Method::GET && uri.path() == "/health" {
return true;
}
// AssumeRole
if method == Method::POST && uri.path() == "/" {
if let Some(val) = headers.get(header::CONTENT_TYPE) {
@@ -104,36 +108,13 @@ where
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(CONSOLE_PREFIX)
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
if let Some(console_router) = &self.console_router {
let mut console_router = console_router.clone();
let req = convert_request(req);
let result = console_router.call(req).await;
return match result {
Ok(resp) => Ok(convert_response(resp)),
Err(e) => Err(s3_error!(InternalError, "{}", e)),
};
}
return Err(s3_error!(InternalError, "console is not enabled"));
}
let uri = format!("{}|{}", &req.method, req.uri.path());
// warn!("get uri {}", &uri);
if let Ok(mat) = self.router.at(&uri) {
let op: &T = mat.value;
let mut resp = op.call(req, mat.params).await?;
resp.status = Some(resp.output.0);
return Ok(resp.map_output(|x| x.1));
}
return Err(s3_error!(NotImplemented));
}
// check_access before call
async fn check_access(&self, req: &mut S3Request<Body>) -> S3Result<()> {
// Allow unauthenticated access to health check
if req.method == Method::GET && req.uri.path() == "/health" {
return Ok(());
}
// Allow unauthenticated access to console static files if console is enabled
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
return Ok(());
}
@@ -156,6 +137,31 @@ where
None => Err(s3_error!(AccessDenied, "Signature is required")),
}
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
if let Some(console_router) = &self.console_router {
let mut console_router = console_router.clone();
let req = convert_request(req);
let result = console_router.call(req).await;
return match result {
Ok(resp) => Ok(convert_response(resp)),
Err(e) => Err(s3_error!(InternalError, "{}", e)),
};
}
return Err(s3_error!(InternalError, "console is not enabled"));
}
let uri = format!("{}|{}", &req.method, req.uri.path());
if let Ok(mat) = self.router.at(&uri) {
let op: &T = mat.value;
let mut resp = op.call(req, mat.params).await?;
resp.status = Some(resp.output.0);
return Ok(resp.map_output(|x| x.1));
}
Err(s3_error!(NotImplemented))
}
}
#[async_trait::async_trait]

View File

@@ -18,6 +18,11 @@ use rustfs_config::DEFAULT_DELIMITER;
use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG;
use tracing::{error, info, warn};
/// Start the audit system.
/// This function checks if the audit subsystem is configured in the global server configuration.
/// If configured, it initializes and starts the audit system.
/// If not configured, it skips the initialization.
/// It also handles cases where the audit system is already running or if the global configuration is not loaded.
pub(crate) async fn start_audit_system() -> AuditResult<()> {
info!(
target: "rustfs::main::start_audit_system",
@@ -94,6 +99,10 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> {
}
}
/// Stop the audit system.
/// This function checks if the audit system is initialized and running.
/// If it is running, it prepares to stop the system, stops it, and records the stop time.
/// If the system is already stopped or not initialized, it logs a warning and returns.
pub(crate) async fn stop_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
let state = system.get_state().await;

View File

@@ -12,136 +12,112 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::access::authorize_request;
use super::options::del_opts;
use super::options::extract_metadata;
use super::options::put_opts;
use crate::auth::get_condition_values;
use crate::error::ApiError;
use crate::storage::access::ReqInfo;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::get_complete_multipart_upload_opts;
use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts, parse_copy_source_range};
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use http::StatusCode;
use rustfs_ecstore::bucket::metadata_sys::get_replication_config;
use rustfs_ecstore::bucket::object_lock::objectlock_sys::BucketObjectLockSys;
use rustfs_ecstore::bucket::replication::DeletedObjectReplicationInfo;
use rustfs_ecstore::bucket::replication::REPLICATE_INCOMING_DELETE;
use rustfs_ecstore::bucket::replication::ReplicationConfigurationExt;
use rustfs_ecstore::bucket::replication::check_replicate_delete;
use rustfs_ecstore::bucket::replication::get_must_replicate_options;
use rustfs_ecstore::bucket::replication::must_replicate;
use rustfs_ecstore::bucket::replication::schedule_replication;
use rustfs_ecstore::bucket::replication::schedule_replication_delete;
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::disk::error::DiskError;
use rustfs_ecstore::disk::error_reduce::is_all_buckets_not_found;
use rustfs_ecstore::error::is_err_bucket_not_found;
use rustfs_ecstore::error::is_err_object_not_found;
use rustfs_ecstore::error::is_err_version_not_found;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::ReplicationStatusType;
use rustfs_filemeta::ReplicationType;
use rustfs_filemeta::VersionPurgeStatusType;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_query::get_global_db;
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
use crate::storage::{
access::{ReqInfo, authorize_request},
options::{
copy_dst_opts, copy_src_opts, del_opts, extract_metadata, extract_metadata_from_mime_with_object_name,
get_complete_multipart_upload_opts, get_opts, parse_copy_source_range, put_opts,
},
};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::arrow::{
csv::WriterBuilder as CsvWriterBuilder, json::WriterBuilder as JsonWriterBuilder, json::writer::JsonArray,
};
use futures::StreamExt;
use http::HeaderMap;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
use rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle;
use rustfs_ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_REPLICATION_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_SSECONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_VERSIONING_CONFIG;
use rustfs_ecstore::bucket::metadata::OBJECT_LOCK_CONFIG;
use rustfs_ecstore::bucket::metadata_sys;
use rustfs_ecstore::bucket::policy_sys::PolicySys;
use rustfs_ecstore::bucket::tagging::decode_tags;
use rustfs_ecstore::bucket::tagging::encode_tags;
use rustfs_ecstore::bucket::utils::serialize;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::client::object_api_utils::format_etag;
use rustfs_ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use rustfs_ecstore::compress::is_compressible;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::{DEFAULT_READ_BUFFER_SIZE, is_valid_storage_class};
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::store_api::CompletePart;
use rustfs_ecstore::store_api::DeleteBucketOptions;
use rustfs_ecstore::store_api::HTTPRangeSpec;
use rustfs_ecstore::store_api::MakeBucketOptions;
use rustfs_ecstore::store_api::MultipartUploadResult;
use rustfs_ecstore::store_api::ObjectIO;
use rustfs_ecstore::store_api::ObjectOptions;
use rustfs_ecstore::store_api::ObjectToDelete;
use rustfs_ecstore::store_api::PutObjReader;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_filemeta::fileinfo::ObjectPartInfo;
use rustfs_kms::DataKey;
use rustfs_kms::service_manager::get_global_encryption_service;
use rustfs_kms::types::{EncryptionMetadata, ObjectEncryptionContext};
use http::{HeaderMap, StatusCode};
use rustfs_ecstore::{
bucket::{
lifecycle::{bucket_lifecycle_ops::validate_transition_tier, lifecycle::Lifecycle},
metadata::{
BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_REPLICATION_CONFIG,
BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, BUCKET_VERSIONING_CONFIG, OBJECT_LOCK_CONFIG,
},
metadata_sys,
metadata_sys::get_replication_config,
object_lock::objectlock_sys::BucketObjectLockSys,
policy_sys::PolicySys,
replication::{
DeletedObjectReplicationInfo, REPLICATE_INCOMING_DELETE, ReplicationConfigurationExt, check_replicate_delete,
get_must_replicate_options, must_replicate, schedule_replication, schedule_replication_delete,
},
tagging::{decode_tags, encode_tags},
utils::serialize,
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
},
client::object_api_utils::format_etag,
compress::{MIN_COMPRESSIBLE_SIZE, is_compressible},
disk::{error::DiskError, error_reduce::is_all_buckets_not_found},
error::{StorageError, is_err_bucket_not_found, is_err_object_not_found, is_err_version_not_found},
new_object_layer_fn,
set_disk::{DEFAULT_READ_BUFFER_SIZE, MAX_PARTS_COUNT, is_valid_storage_class},
store_api::{
BucketOptions,
CompletePart,
DeleteBucketOptions,
HTTPRangeSpec,
MakeBucketOptions,
MultipartUploadResult,
ObjectIO,
ObjectInfo,
ObjectOptions,
ObjectToDelete,
PutObjReader,
StorageAPI,
// RESERVED_METADATA_PREFIX,
},
};
use rustfs_filemeta::{ReplicationStatusType, ReplicationType, VersionPurgeStatusType, fileinfo::ObjectPartInfo};
use rustfs_kms::{
DataKey,
service_manager::get_global_encryption_service,
types::{EncryptionMetadata, ObjectEncryptionContext},
};
use rustfs_notify::global::notifier_instance;
use rustfs_policy::auth;
use rustfs_policy::policy::action::Action;
use rustfs_policy::policy::action::S3Action;
use rustfs_policy::policy::{BucketPolicy, BucketPolicyArgs, Validator};
use rustfs_rio::CompressReader;
use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_rio::{DecryptReader, EncryptReader, HardLimitReader};
use rustfs_targets::EventName;
use rustfs_targets::arn::{TargetID, TargetIDError};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::AMZ_BUCKET_REPLICATION_STATUS;
use rustfs_utils::http::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_utils::http::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_utils::path::is_dir_object;
use rustfs_utils::path::path_join_buf;
use rustfs_policy::{
auth,
policy::{
action::{Action, S3Action},
{BucketPolicy, BucketPolicyArgs, Validator},
},
};
use rustfs_rio::{CompressReader, DecryptReader, EncryptReader, EtagReader, HardLimitReader, HashReader, Reader, WarpReader};
use rustfs_s3select_api::{
object_store::bytes_stream,
query::{Context, Query},
};
use rustfs_s3select_query::get_global_db;
use rustfs_targets::{
EventName,
arn::{TargetID, TargetIDError},
};
use rustfs_utils::{
CompressionAlgorithm,
http::{
AMZ_BUCKET_REPLICATION_STATUS,
headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER},
},
path::{is_dir_object, path_join_buf},
};
use rustfs_zip::CompressionFormat;
use s3s::S3;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::dto::*;
use s3s::s3_error;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, dto::*, s3_error};
use std::{
collections::HashMap,
fmt::Debug,
path::Path,
str::FromStr,
sync::{Arc, LazyLock},
};
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tokio::{io::AsyncRead, sync::mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
macro_rules! try_ {

View File

@@ -58,7 +58,7 @@ export RUSTFS_EXTERNAL_ADDRESS=":9000"
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
#export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
export RUSTFS_OBS_ENVIRONMENT=production # Environment name
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
@@ -123,6 +123,10 @@ export RUSTFS_COMPRESSION_ENABLED=true # Whether to enable compression
#export RUSTFS_REGION="us-east-1"
export RUSTFS_ENABLE_SCANNER=false
export RUSTFS_ENABLE_HEAL=false
# Event message configuration
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"