chore: improve event and docker-compose ,Improve the permissions of the endpoint health interface, upgrade otel from 0.30.0 to 0.31.0 (#620)

* feat: improve code for notify

* upgrade starshard version

* upgrade version

* Fix ETag format to comply with HTTP standards by wrapping with quotes (#592)

* Initial plan

* Fix ETag format to comply with HTTP standards by wrapping with quotes

Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>

* bufigx

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
Co-authored-by: overtrue <anzhengchao@gmail.com>

* Improve lock (#596)

* improve lock

Signed-off-by: Mu junxiang <1948535941@qq.com>

* feat(tests): add wait_for_object_absence helper and improve lifecycle test reliability

Signed-off-by: Mu junxiang <1948535941@qq.com>

* chore: remove dirty docs

Signed-off-by: Mu junxiang <1948535941@qq.com>

---------

Signed-off-by: Mu junxiang <1948535941@qq.com>

* feat(append): implement object append operations with state tracking (#599)

* feat(append): implement object append operations with state tracking

Signed-off-by: junxiang Mu <1948535941@qq.com>

* chore: rebase

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>

* build(deps): upgrade s3s (#595)

Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>

* fix: validate mqtt broker

* improve code for `import`

* upgrade otel relation crates version

* fix:dep("jsonwebtoken") feature = 'rust_crypto'

* fix

* fix

* fix

* upgrade version

* improve code for ecfs

* chore: improve event and docker-compose ,Improve the permissions of the `endpoint` health interface

* fix

* fix

* fix

* fix

* improve code

* fix

---------

Signed-off-by: Mu junxiang <1948535941@qq.com>
Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
Co-authored-by: overtrue <anzhengchao@gmail.com>
Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com>
Co-authored-by: Nugine <nugine@foxmail.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
This commit is contained in:
houseme
2025-10-11 09:08:25 +08:00
committed by GitHub
parent 5689311cff
commit aac9b1edb7
36 changed files with 1458 additions and 773 deletions

965
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -99,7 +99,10 @@ async-recursion = "1.1.1"
async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.6" }
aws-config = { version = "1.8.8" }
aws-credential-types = { version = "1.2.8" }
aws-smithy-types = { version = "1.3.3" }
aws-smithy-runtime-api = { version = "1.9.1" }
aws-sdk-s3 = { version = "1.106.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
axum = "0.8.6"
axum-extra = "0.10.3"
@@ -111,6 +114,7 @@ bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.1.0"
byteorder = "1.5.0"
cfg-if = "1.0.3"
convert_case = "0.8.0"
crc-fast = "1.3.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.42", features = ["serde"] }
@@ -119,18 +123,18 @@ const-str = { version = "0.7.0", features = ["std", "proc"] }
crc32fast = "1.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
dashmap = "6.1.0"
datafusion = "50.0.0"
datafusion = "50.1.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.9.23"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.4", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
flate2 = "1.1.4"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
glob = "0.3.3"
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
hickory-resolver = { version = "0.25.2", features = ["tls-ring"] }
@@ -146,8 +150,9 @@ http = "1.3.1"
http-body = "1.0.1"
humantime = "2.3.0"
ipnetwork = { version = "0.21.1", features = ["serde"] }
jsonwebtoken = "9.3.1"
jsonwebtoken = { version = "10.0.0", features = ["rust_crypto"] }
lazy_static = "1.5.0"
libc = "0.2.177"
libsystemd = { version = "0.7.2" }
local-ip-address = "0.6.5"
lz4 = "1.28.1"
@@ -163,21 +168,21 @@ num_cpus = { version = "1.17.0" }
nvml-wrapper = "0.11.0"
object_store = "0.12.4"
once_cell = "1.21.3"
opentelemetry = { version = "0.30.0" }
opentelemetry-appender-tracing = { version = "0.30.1", features = [
opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = { version = "0.31.1", features = [
"experimental_use_tracing_span_context",
"experimental_metadata_attributes",
"spec_unstable_logs_enabled"
] }
opentelemetry_sdk = { version = "0.30.0" }
opentelemetry-stdout = { version = "0.30.0" }
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-stdout = { version = "0.31.0" }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = [
"grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"
] }
opentelemetry-semantic-conventions = { version = "0.30.0", features = [
opentelemetry-semantic-conventions = { version = "0.31.0", features = [
"semconv_experimental",
] }
parking_lot = "0.12.4"
parking_lot = "0.12.5"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
blake3 = { version = "1.8.2" }
@@ -188,6 +193,7 @@ prost = "0.14.1"
pretty_assertions = "1.4.1"
quick-xml = "0.38.3"
rand = "0.9.2"
rayon = "1.11.0"
rdkafka = { version = "0.38.0", features = ["tokio"] }
reed-solomon-simd = { version = "3.0.1" }
regex = { version = "1.11.3" }
@@ -200,12 +206,13 @@ reqwest = { version = "0.12.23", default-features = false, features = [
"json",
"blocking",
] }
rmcp = { version = "0.6.4" }
rmcp = { version = "0.8.1" }
rmp = "0.8.14"
rmp-serde = "1.3.0"
rsa = "0.9.8"
rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.7.2" }
rustc-hash = { version = "2.1.1" }
rustfs-rsc = "2025.506.1"
rustls = { version = "0.23.32", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pki-types = "1.12.0"
@@ -225,6 +232,7 @@ smartstring = "1.0.1"
snafu = "0.8.9"
snap = "1.1.1"
socket2 = "0.6.0"
starshard = { version = "0.5.0", features = ["rayon", "async", "serde"] }
strum = { version = "0.27.2", features = ["derive"] }
sysinfo = "0.37.1"
sysctl = "0.7.1"
@@ -253,7 +261,7 @@ tower-http = { version = "0.6.6", features = ["cors"] }
tracing = "0.1.41"
tracing-core = "0.1.34"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.31.0"
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.7"

View File

@@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::AuditEntry;
use crate::AuditResult;
use crate::AuditSystem;
use crate::{AuditEntry, AuditResult, AuditSystem};
use once_cell::sync::OnceCell;
use rustfs_ecstore::config::Config;
use std::sync::Arc;

View File

@@ -17,9 +17,11 @@ use crate::AuditRegistry;
use crate::observability;
use crate::{AuditError, AuditResult};
use rustfs_ecstore::config::Config;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use rustfs_targets::{StoreError, Target, TargetError};
use rustfs_targets::{
StoreError, Target, TargetError,
store::{Key, Store},
target::EntityTarget,
};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tracing::{error, info, warn};
@@ -257,7 +259,7 @@ impl AuditSystem {
let target_id_clone = target_id.clone();
// Create EntityTarget for the audit log entry
let entity_target = rustfs_targets::target::EntityTarget {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
@@ -337,7 +339,7 @@ impl AuditSystem {
let mut success_count = 0;
let mut errors = Vec::new();
for entry in entries_clone {
let entity_target = rustfs_targets::target::EntityTarget {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: rustfs_targets::EventName::ObjectCreatedPut,

View File

@@ -101,11 +101,11 @@ rustfs-signer.workspace = true
rustfs-checksums.workspace = true
futures-util.workspace = true
async-recursion.workspace = true
aws-credential-types = "1.2.6"
aws-smithy-types = "1.3.2"
parking_lot = "0.12"
moka = { version = "0.12", features = ["future"] }
aws-smithy-runtime-api = "1.9.0"
aws-credential-types = { workspace = true }
aws-smithy-types = { workspace = true }
parking_lot = { workspace = true }
moka = { workspace = true }
aws-smithy-runtime-api = { workspace = true }
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -87,7 +87,7 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> std::result::Resu
jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
}
pub fn extract_claims<T: DeserializeOwned>(
pub fn extract_claims<T: DeserializeOwned + Clone>(
token: &str,
secret: &str,
) -> std::result::Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {
@@ -193,7 +193,7 @@ mod tests {
assert_eq!(error.to_string(), "secret key length is too short");
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct Claims {
sub: String,
company: String,

View File

@@ -32,14 +32,17 @@ rustfs-utils = { workspace = true, features = ["path", "sys"] }
rustfs-targets = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
dashmap = { workspace = true }
futures = { workspace = true }
form_urlencoded = { workspace = true }
hashbrown = { workspace = true }
once_cell = { workspace = true }
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
rayon = { workspace = true }
rumqttc = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
starshard = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }

View File

@@ -13,9 +13,9 @@
// limitations under the License.
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use url::form_urlencoded;
/// Represents the identity of the user who triggered the event

View File

@@ -14,6 +14,7 @@
use crate::Event;
use async_trait::async_trait;
use hashbrown::HashSet;
use rumqttc::QoS;
use rustfs_config::notify::{ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS};
use rustfs_config::{
@@ -27,7 +28,6 @@ use rustfs_targets::{
error::TargetError,
target::{mqtt::MQTTArgs, webhook::WebhookArgs},
};
use std::collections::HashSet;
use std::time::Duration;
use tracing::{debug, warn};
use url::Url;

View File

@@ -15,13 +15,13 @@
use crate::{
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
};
use hashbrown::HashMap;
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use rustfs_targets::{StoreError, Target};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
@@ -212,11 +212,6 @@ impl NotificationSystem {
return Ok(());
}
// if let Err(e) = rustfs_ecstore::config::com::save_server_config(store, &new_config).await {
// error!("Failed to save config: {}", e);
// return Err(NotificationError::SaveConfig(e.to_string()));
// }
info!("Configuration updated. Reloading system...");
self.reload_config(new_config).await
}

View File

@@ -13,19 +13,20 @@
// limitations under the License.
use crate::{error::NotificationError, event::Event, rules::RulesMap};
use dashmap::DashMap;
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::Target;
use rustfs_targets::arn::TargetID;
use rustfs_targets::target::EntityTarget;
use std::{collections::HashMap, sync::Arc};
use starshard::AsyncShardedHashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};
/// Manages event notification to targets based on rules
pub struct EventNotifier {
target_list: Arc<RwLock<TargetList>>,
bucket_rules_map: Arc<DashMap<String, RulesMap>>,
bucket_rules_map: Arc<AsyncShardedHashMap<String, RulesMap, rustc_hash::FxBuildHasher>>,
}
impl Default for EventNotifier {
@@ -39,7 +40,7 @@ impl EventNotifier {
pub fn new() -> Self {
EventNotifier {
target_list: Arc::new(RwLock::new(TargetList::new())),
bucket_rules_map: Arc::new(DashMap::new()),
bucket_rules_map: Arc::new(AsyncShardedHashMap::new(0)),
}
}
@@ -58,7 +59,7 @@ impl EventNotifier {
/// This method removes all rules associated with the specified bucket name.
/// It will log a message indicating the removal of rules.
pub async fn remove_rules_map(&self, bucket_name: &str) {
if self.bucket_rules_map.remove(bucket_name).is_some() {
if self.bucket_rules_map.remove(&bucket_name.to_string()).await.is_some() {
info!("Removed all notification rules for bucket: {}", bucket_name);
}
}
@@ -76,21 +77,21 @@ impl EventNotifier {
/// Adds a rules map for a bucket
pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) {
if rules_map.is_empty() {
self.bucket_rules_map.remove(bucket_name);
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
} else {
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map);
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map).await;
}
info!("Added rules for bucket: {}", bucket_name);
}
/// Gets the rules map for a specific bucket.
pub fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
self.bucket_rules_map.get(bucket_name).map(|r| r.clone())
pub async fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
self.bucket_rules_map.get(&bucket_name.to_string()).await
}
/// Removes notification rules for a bucket
pub async fn remove_notification(&self, bucket_name: &str) {
self.bucket_rules_map.remove(bucket_name);
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
info!("Removed notification rules for bucket: {}", bucket_name);
}
@@ -113,7 +114,7 @@ impl EventNotifier {
/// Return `true` if at least one matching notification rule exists.
pub async fn has_subscriber(&self, bucket_name: &str, event_name: &EventName) -> bool {
// Rules to check if the bucket exists
if let Some(rules_map) = self.bucket_rules_map.get(bucket_name) {
if let Some(rules_map) = self.bucket_rules_map.get(&bucket_name.to_string()).await {
// A composite event (such as ObjectCreatedAll) is expanded to multiple single events.
// We need to check whether any of these single events have the rules configured.
rules_map.has_subscriber(event_name)
@@ -129,7 +130,7 @@ impl EventNotifier {
let bucket_name = &event.s3.bucket.name;
let object_key = &event.s3.object.key;
let event_name = event.event_name;
if let Some(rules) = self.bucket_rules_map.get(bucket_name) {
if let Some(rules) = self.bucket_rules_map.get(bucket_name).await {
let target_ids = rules.match_rules(event_name, object_key);
if target_ids.is_empty() {
debug!("No matching targets for event in bucket: {}", bucket_name);

View File

@@ -15,13 +15,13 @@
use crate::Event;
use crate::factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory};
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use rustfs_config::notify::NOTIFY_ROUTE_PREFIX;
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::Target;
use rustfs_targets::TargetError;
use rustfs_targets::target::ChannelTargetType;
use std::collections::{HashMap, HashSet};
use tracing::{debug, error, info, warn};
/// Registry for managing target factories

View File

@@ -17,10 +17,10 @@ use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::rules::NotificationConfiguration;
use crate::rules::pattern_rules;
use crate::rules::target_id_set;
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Read;
/// Configuration for bucket notifications.

View File

@@ -14,9 +14,10 @@
use super::pattern;
use super::target_id_set::TargetIdSet;
use hashbrown::HashMap;
use rayon::prelude::*;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// PatternRules - Event rule that maps object name patterns to TargetID collections.
/// `event.Rules` (map[string]TargetIDSet) in the Go code
@@ -43,13 +44,19 @@ impl PatternRules {
/// Returns all TargetIDs that match the object name.
pub fn match_targets(&self, object_name: &str) -> TargetIdSet {
let mut matched_targets = TargetIdSet::new();
for (pattern_str, target_set) in &self.rules {
if pattern::match_simple(pattern_str, object_name) {
matched_targets.extend(target_set.iter().cloned());
}
}
matched_targets
self.rules
.par_iter()
.filter_map(|(pattern_str, target_set)| {
if pattern::match_simple(pattern_str, object_name) {
Some(target_set.iter().cloned().collect::<TargetIdSet>())
} else {
None
}
})
.reduce(TargetIdSet::new, |mut acc, set| {
acc.extend(set);
acc
})
}
pub fn is_empty(&self) -> bool {

View File

@@ -14,10 +14,10 @@
use super::pattern_rules::PatternRules;
use super::target_id_set::TargetIdSet;
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// RulesMap - Rule mapping organized by event name。
/// `event.RulesMap` (map[Name]Rules) in the corresponding Go code

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use hashbrown::HashSet;
use rustfs_targets::arn::TargetID;
use std::collections::HashSet;
/// TargetIDSet - A collection representation of TargetID.
pub type TargetIdSet = HashSet<TargetID>;

View File

@@ -13,10 +13,10 @@
// limitations under the License.
use super::pattern;
use hashbrown::HashSet;
use rustfs_targets::EventName;
use rustfs_targets::arn::{ARN, ArnError, TargetIDError};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::Read;
use thiserror::Error;

View File

@@ -228,7 +228,7 @@ mod tests {
use jsonwebtoken::{Algorithm, DecodingKey, Validation, decode};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Claims {
sub: String,
exp: usize,

View File

@@ -59,7 +59,7 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> std::result::Resu
jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
}
pub fn extract_claims<T: DeserializeOwned>(
pub fn extract_claims<T: DeserializeOwned + Clone>(
token: &str,
secret: &str,
) -> std::result::Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {

View File

@@ -13,7 +13,7 @@ documentation = "https://docs.rs/rustfs-target/latest/rustfs_target/"
[dependencies]
rustfs-config = { workspace = true, features = ["notify", "constants", "audit"] }
rustfs-utils = { workspace = true, features = ["sys"] }
rustfs-utils = { workspace = true, features = ["sys", "notify"] }
async-trait = { workspace = true }
reqwest = { workspace = true }
rumqttc = { workspace = true }

View File

@@ -0,0 +1,70 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Check if MQTT Broker is available
/// # Arguments
/// * `broker_url` - URL of MQTT Broker, for example `mqtt://localhost:1883`
/// * `topic` - Topic for testing connections
/// # Returns
/// * `Ok(())` - If the connection is successful
/// * `Err(String)` - If the connection fails, contains an error message
///
/// # Example
/// ```rust,no_run
/// #[tokio::main]
/// async fn main() {
/// let result = rustfs_targets::check_mqtt_broker_available("mqtt://localhost:1883", "test/topic").await;
/// if result.is_ok() {
/// println!("MQTT Broker is available");
/// } else {
/// println!("MQTT Broker is not available: {}", result.err().unwrap());
/// }
/// }
/// ```
/// # Note
/// Need to add `rumqttc` and `url` dependencies in `Cargo.toml`
/// ```toml
/// [dependencies]
/// rumqttc = "0.25.0"
/// url = "2.5.7"
/// tokio = { version = "1", features = ["full"] }
/// ```
pub async fn check_mqtt_broker_available(broker_url: &str, topic: &str) -> Result<(), String> {
use rumqttc::{AsyncClient, MqttOptions, QoS};
let url = rustfs_utils::parse_url(broker_url).map_err(|e| format!("Broker URL parsing failed:{e}"))?;
let url = url.url();
match url.scheme() {
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" | "tls" | "tcps" => {}
_ => return Err("unsupported broker url scheme".to_string()),
}
let host = url.host_str().ok_or("Broker is missing host")?;
let port = url.port().unwrap_or(1883);
let mut mqtt_options = MqttOptions::new("rustfs_check", host, port);
mqtt_options.set_keep_alive(std::time::Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 1);
// Try to connect and subscribe
client
.subscribe(topic, QoS::AtLeastOnce)
.await
.map_err(|e| format!("MQTT subscription failed:{e}"))?;
// Wait for eventloop to receive at least one event
match tokio::time::timeout(std::time::Duration::from_secs(3), eventloop.poll()).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(format!("MQTT connection failed:{e}")),
Err(_) => Err("MQTT connection timeout".to_string()),
}
}

View File

@@ -13,11 +13,13 @@
// limitations under the License.
pub mod arn;
mod check;
pub mod error;
mod event_name;
pub mod store;
pub mod target;
pub use check::check_mqtt_broker_available;
pub use error::{StoreError, TargetError};
pub use event_name::EventName;
use serde::{Deserialize, Serialize};

View File

@@ -324,7 +324,7 @@ async fn run_mqtt_event_loop(
Ok(Err(e)) => Err(e),
Err(_) => {
debug!(target_id = %target_id, "MQTT poll timed out (EVENT_LOOP_POLL_TIMEOUT) while not connected or status pending.");
Err(rumqttc::ConnectionError::NetworkTimeout)
Err(ConnectionError::NetworkTimeout)
}
}
} else {
@@ -376,7 +376,7 @@ async fn run_mqtt_event_loop(
connected_status.store(false, Ordering::SeqCst);
error!(target_id = %target_id, error = %e, "Error from MQTT event loop poll");
if matches!(e, rumqttc::ConnectionError::NetworkTimeout) && (!initial_connection_established || !connected_status.load(Ordering::SeqCst)) {
if matches!(e, ConnectionError::NetworkTimeout) && (!initial_connection_established || !connected_status.load(Ordering::SeqCst)) {
warn!(target_id = %target_id, "Timeout during initial poll or pending state, will retry.");
continue;
}
@@ -395,8 +395,8 @@ async fn run_mqtt_event_loop(
error!(target_id = %target_id, error = %e, "Fatal MQTT error, terminating event loop.");
break;
}
// rumqttc's eventloop.poll() may return Err and terminate after some errors,
// Or it will handle reconnection internally. The continue here will make select! wait again.
// rumqttc's eventloop.poll() may return Err and terminate after some errors,
// Or it will handle reconnection internally. To continue here will make select! wait again.
// If the error is temporary and rumqttc is handling reconnection, poll() should eventually succeed or return a different error again.
// Sleep briefly to avoid busy cycles in case of rapid failure.
tokio::time::sleep(Duration::from_secs(1)).await;

View File

@@ -32,11 +32,13 @@ bytes = { workspace = true, optional = true }
crc32fast = { workspace = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true, optional = true }
hex-simd = { workspace = true, optional = true }
highway = { workspace = true, optional = true }
hickory-resolver = { workspace = true, optional = true }
hmac = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
libc = { workspace = true, optional = true }
local-ip-address = { workspace = true, optional = true }
lz4 = { workspace = true, optional = true }
md-5 = { workspace = true, optional = true }
@@ -53,7 +55,7 @@ s3s = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
sha1 = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
convert_case = "0.8.0"
convert_case = { workspace = true, optional = true }
siphasher = { workspace = true, optional = true }
snap = { workspace = true, optional = true }
sysinfo = { workspace = true, optional = true }
@@ -83,7 +85,7 @@ tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls charac
net = ["ip", "dep:url", "dep:netif", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:hickory-resolver", "dep:moka", "dep:thiserror", "dep:tokio"] # network features with DNS resolver
io = ["dep:tokio"]
path = []
notify = ["dep:hyper", "dep:s3s"] # file system notification features
notify = ["dep:hyper", "dep:s3s", "dep:hashbrown", "dep:thiserror", "dep:serde", "dep:libc"] # file system notification features
compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"]
string = ["dep:regex", "dep:rand"]
crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper", "dep:sha1"]
@@ -91,5 +93,5 @@ hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:s
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
integration = [] # integration test features
sys = ["dep:sysinfo"] # system information features
http = []
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify","http"] # all features
http = ["dep:convert_case"]
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify", "http"] # all features

View File

@@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod net;
use hashbrown::HashMap;
use hyper::HeaderMap;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
pub use net::*;
/// Extract request parameters from S3Request, mainly header information.
#[allow(dead_code)]

View File

@@ -0,0 +1,533 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use std::path::Path;
use std::sync::LazyLock;
use thiserror::Error;
use url::Url;
// Lazy static for the host label regex.
static HOST_LABEL_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?$").unwrap());
/// NetError represents errors that can occur in network operations.
#[derive(Error, Debug)]
pub enum NetError {
#[error("invalid argument")]
InvalidArgument,
#[error("invalid hostname")]
InvalidHost,
#[error("missing '[' in host")]
MissingBracket,
#[error("parse error: {0}")]
ParseError(String),
#[error("unexpected scheme: {0}")]
UnexpectedScheme(String),
#[error("scheme appears with empty host")]
SchemeWithEmptyHost,
}
// Host represents a network host with IP/name and port.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Host {
pub name: String,
pub port: Option<u16>, // Using Option<u16> to represent if port is set, similar to IsPortSet.
}
// Implementation of Host methods.
impl Host {
// is_empty returns true if the host name is empty.
pub fn is_empty(&self) -> bool {
self.name.is_empty()
}
// equal checks if two hosts are equal by comparing their string representations.
pub fn equal(&self, other: &Host) -> bool {
self.to_string() == other.to_string()
}
}
impl std::fmt::Display for Host {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.port {
Some(p) => write!(f, "{}:{}", self.name, p),
None => write!(f, "{}", self.name),
}
}
}
// parse_host parses a string into a Host, with validation similar to Go's ParseHost.
pub fn parse_host(s: &str) -> Result<Host, NetError> {
if s.is_empty() {
return Err(NetError::InvalidArgument);
}
// is_valid_host validates the host string, checking for IP or hostname validity.
let is_valid_host = |host: &str| -> bool {
if host.is_empty() {
return true;
}
if host.parse::<IpAddr>().is_ok() {
return true;
}
if !(1..=253).contains(&host.len()) {
return false;
}
for (i, label) in host.split('.').enumerate() {
if i + 1 == host.split('.').count() && label.is_empty() {
continue;
}
if !(1..=63).contains(&label.len()) || !HOST_LABEL_REGEX.is_match(label) {
return false;
}
}
true
};
// Split host and port, similar to net.SplitHostPort.
let (host_str, port_str) = s.rsplit_once(':').map_or((s, ""), |(h, p)| (h, p));
let port = if !port_str.is_empty() {
Some(port_str.parse().map_err(|_| NetError::ParseError(port_str.to_string()))?)
} else {
None
};
// Trim IPv6 brackets if present.
let host = trim_ipv6(host_str)?;
// Handle IPv6 zone identifier.
let trimmed_host = host.split('%').next().unwrap_or(&host);
if !is_valid_host(trimmed_host) {
return Err(NetError::InvalidHost);
}
Ok(Host { name: host, port })
}
// trim_ipv6 removes square brackets from IPv6 addresses, similar to Go's trimIPv6.
fn trim_ipv6(host: &str) -> Result<String, NetError> {
if host.ends_with(']') {
if !host.starts_with('[') {
return Err(NetError::MissingBracket);
}
Ok(host[1..host.len() - 1].to_string())
} else {
Ok(host.to_string())
}
}
// URL is a wrapper around url::Url for custom handling.
#[derive(Debug, Clone)]
pub struct ParsedURL(pub Url);
impl ParsedURL {
/// is_empty returns true if the URL is empty or "about:blank".
pub fn is_empty(&self) -> bool {
self.0.as_str() == "" || (self.0.scheme() == "about" && self.0.path() == "blank")
}
/// hostname returns the hostname of the URL.
pub fn hostname(&self) -> String {
self.0.host_str().unwrap_or("").to_string()
}
/// port returns the port of the URL as a string, defaulting to "80" for http and "443" for https if not set.
pub fn port(&self) -> String {
match self.0.port() {
Some(p) => p.to_string(),
None => match self.0.scheme() {
"http" => "80".to_string(),
"https" => "443".to_string(),
_ => "".to_string(),
},
}
}
/// scheme returns the scheme of the URL.
pub fn scheme(&self) -> &str {
self.0.scheme()
}
/// url returns a reference to the underlying Url.
pub fn url(&self) -> &Url {
&self.0
}
}
impl std::fmt::Display for ParsedURL {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut url = self.0.clone();
if let Some(host) = url.host_str().map(|h| h.to_string()) {
if let Some(port) = url.port() {
if (url.scheme() == "http" && port == 80) || (url.scheme() == "https" && port == 443) {
url.set_host(Some(&host)).unwrap();
url.set_port(None).unwrap();
}
}
}
let mut s = url.to_string();
// If the URL ends with a slash and the path is just "/", remove the trailing slash.
if s.ends_with('/') && url.path() == "/" {
s.pop();
}
write!(f, "{}", s)
}
}
impl serde::Serialize for ParsedURL {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> serde::Deserialize<'de> for ParsedURL {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = serde::Deserialize::deserialize(deserializer)?;
if s.is_empty() {
Ok(ParsedURL(Url::parse("about:blank").unwrap()))
} else {
parse_url(&s).map_err(serde::de::Error::custom)
}
}
}
// parse_url parses a string into a ParsedURL, with host validation and path cleaning.
pub fn parse_url(s: &str) -> Result<ParsedURL, NetError> {
if let Some(scheme_end) = s.find("://") {
if s[scheme_end + 3..].starts_with('/') {
let scheme = &s[..scheme_end];
if !scheme.is_empty() {
return Err(NetError::SchemeWithEmptyHost);
}
}
}
let mut uu = Url::parse(s).map_err(|e| NetError::ParseError(e.to_string()))?;
if uu.host_str().is_none_or(|h| h.is_empty()) {
if uu.scheme() != "" {
return Err(NetError::SchemeWithEmptyHost);
}
} else {
let port_str = uu.port().map(|p| p.to_string()).unwrap_or_else(|| match uu.scheme() {
"http" => "80".to_string(),
"https" => "443".to_string(),
_ => "".to_string(),
});
if !port_str.is_empty() {
let host_port = format!("{}:{}", uu.host_str().unwrap(), port_str);
parse_host(&host_port)?; // Validate host.
}
}
// Clean path: Use Url's path_segments to normalize.
if !uu.path().is_empty() {
// Url automatically cleans paths, but we ensure trailing slash if original had it.
let mut cleaned_path = String::new();
for comp in Path::new(uu.path()).components() {
use std::path::Component;
match comp {
Component::RootDir => cleaned_path.push('/'),
Component::Normal(s) => {
if !cleaned_path.ends_with('/') {
cleaned_path.push('/');
}
cleaned_path.push_str(&s.to_string_lossy());
}
_ => {}
}
}
if s.ends_with('/') && !cleaned_path.ends_with('/') {
cleaned_path.push('/');
}
if cleaned_path.is_empty() {
cleaned_path.push('/');
}
uu.set_path(&cleaned_path);
}
Ok(ParsedURL(uu))
}
#[allow(dead_code)]
/// parse_http_url parses a string into a ParsedURL, ensuring the scheme is http or https.
pub fn parse_http_url(s: &str) -> Result<ParsedURL, NetError> {
let u = parse_url(s)?;
match u.0.scheme() {
"http" | "https" => Ok(u),
_ => Err(NetError::UnexpectedScheme(u.0.scheme().to_string())),
}
}
#[allow(dead_code)]
/// is_network_or_host_down checks if an error indicates network or host down, considering timeouts.
pub fn is_network_or_host_down(err: &std::io::Error, expect_timeouts: bool) -> bool {
if err.kind() == std::io::ErrorKind::TimedOut {
return !expect_timeouts;
}
// Simplified checks based on Go logic; adapt for Rust as needed
let err_str = err.to_string().to_lowercase();
err_str.contains("connection reset by peer")
|| err_str.contains("connection timed out")
|| err_str.contains("broken pipe")
|| err_str.contains("use of closed network connection")
}
#[allow(dead_code)]
/// is_conn_reset_err checks if an error indicates a connection reset by peer.
pub fn is_conn_reset_err(err: &std::io::Error) -> bool {
err.to_string().contains("connection reset by peer") || matches!(err.raw_os_error(), Some(libc::ECONNRESET))
}
#[allow(dead_code)]
/// is_conn_refused_err checks if an error indicates a connection refused.
pub fn is_conn_refused_err(err: &std::io::Error) -> bool {
err.to_string().contains("connection refused") || matches!(err.raw_os_error(), Some(libc::ECONNREFUSED))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_host_with_empty_string_returns_error() {
let result = parse_host("");
assert!(matches!(result, Err(NetError::InvalidArgument)));
}
#[test]
fn parse_host_with_valid_ipv4() {
let result = parse_host("192.168.1.1:8080");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "192.168.1.1");
assert_eq!(host.port, Some(8080));
}
#[test]
fn parse_host_with_valid_hostname() {
let result = parse_host("example.com:443");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "example.com");
assert_eq!(host.port, Some(443));
}
#[test]
fn parse_host_with_ipv6_brackets() {
let result = parse_host("[::1]:8080");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "::1");
assert_eq!(host.port, Some(8080));
}
#[test]
fn parse_host_with_invalid_ipv6_missing_bracket() {
let result = parse_host("::1]:8080");
assert!(matches!(result, Err(NetError::MissingBracket)));
}
#[test]
fn parse_host_with_invalid_hostname() {
let result = parse_host("invalid..host:80");
assert!(matches!(result, Err(NetError::InvalidHost)));
}
#[test]
fn parse_host_without_port() {
let result = parse_host("example.com");
assert!(result.is_ok());
let host = result.unwrap();
assert_eq!(host.name, "example.com");
assert_eq!(host.port, None);
}
#[test]
fn host_is_empty_when_name_is_empty() {
let host = Host {
name: "".to_string(),
port: None,
};
assert!(host.is_empty());
}
#[test]
fn host_is_not_empty_when_name_present() {
let host = Host {
name: "example.com".to_string(),
port: Some(80),
};
assert!(!host.is_empty());
}
#[test]
fn host_to_string_with_port() {
let host = Host {
name: "example.com".to_string(),
port: Some(80),
};
assert_eq!(host.to_string(), "example.com:80");
}
#[test]
fn host_to_string_without_port() {
let host = Host {
name: "example.com".to_string(),
port: None,
};
assert_eq!(host.to_string(), "example.com");
}
#[test]
fn host_equal_when_same() {
let host1 = Host {
name: "example.com".to_string(),
port: Some(80),
};
let host2 = Host {
name: "example.com".to_string(),
port: Some(80),
};
assert!(host1.equal(&host2));
}
#[test]
fn host_not_equal_when_different() {
let host1 = Host {
name: "example.com".to_string(),
port: Some(80),
};
let host2 = Host {
name: "example.com".to_string(),
port: Some(443),
};
assert!(!host1.equal(&host2));
}
#[test]
fn parse_url_with_valid_http_url() {
let result = parse_url("http://example.com/path");
assert!(result.is_ok());
let parsed = result.unwrap();
assert_eq!(parsed.hostname(), "example.com");
assert_eq!(parsed.port(), "80");
}
#[test]
fn parse_url_with_valid_https_url() {
let result = parse_url("https://example.com:443/path");
assert!(result.is_ok());
let parsed = result.unwrap();
assert_eq!(parsed.hostname(), "example.com");
assert_eq!(parsed.port(), "443");
}
#[test]
fn parse_url_with_scheme_but_empty_host() {
let result = parse_url("http:///path");
assert!(matches!(result, Err(NetError::SchemeWithEmptyHost)));
}
#[test]
fn parse_url_with_invalid_host() {
let result = parse_url("http://invalid..host/path");
assert!(matches!(result, Err(NetError::InvalidHost)));
}
#[test]
fn parse_url_with_path_cleaning() {
let result = parse_url("http://example.com//path/../path/");
assert!(result.is_ok());
let parsed = result.unwrap();
assert_eq!(parsed.0.path(), "/path/");
}
#[test]
fn parse_http_url_with_http_scheme() {
let result = parse_http_url("http://example.com");
assert!(result.is_ok());
}
#[test]
fn parse_http_url_with_https_scheme() {
let result = parse_http_url("https://example.com");
assert!(result.is_ok());
}
#[test]
fn parse_http_url_with_invalid_scheme() {
let result = parse_http_url("ftp://example.com");
assert!(matches!(result, Err(NetError::UnexpectedScheme(_))));
}
#[test]
fn parsed_url_is_empty_when_url_is_empty() {
let url = ParsedURL(Url::parse("about:blank").unwrap());
assert!(url.is_empty());
}
#[test]
fn parsed_url_hostname() {
let url = ParsedURL(Url::parse("http://example.com:8080").unwrap());
assert_eq!(url.hostname(), "example.com");
}
#[test]
fn parsed_url_port() {
let url = ParsedURL(Url::parse("http://example.com:8080").unwrap());
assert_eq!(url.port(), "8080");
}
#[test]
fn parsed_url_to_string_removes_default_ports() {
let url = ParsedURL(Url::parse("http://example.com:80").unwrap());
assert_eq!(url.to_string(), "http://example.com");
}
#[test]
fn is_network_or_host_down_with_timeout() {
let err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
assert!(is_network_or_host_down(&err, false));
}
#[test]
fn is_network_or_host_down_with_expected_timeout() {
let err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
assert!(!is_network_or_host_down(&err, true));
}
#[test]
fn is_conn_reset_err_with_reset_message() {
let err = std::io::Error::other("connection reset by peer");
assert!(is_conn_reset_err(&err));
}
#[test]
fn is_conn_refused_err_with_refused_message() {
let err = std::io::Error::other("connection refused");
assert!(is_conn_refused_err(&err));
}
}

0
deploy/data/dev/.gitkeep Normal file
View File

0
deploy/data/pro/.gitkeep Normal file
View File

View File

@@ -30,7 +30,7 @@ services:
- "9000:9000" # S3 API port
- "9001:9001" # Console port
environment:
- RUSTFS_VOLUMES=/data/rustfs0,/data/rustfs1,/data/rustfs2,/data/rustfs3
- RUSTFS_VOLUMES=/data/rustfs{0..3} # Define 4 storage volumes
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_CONSOLE_ENABLE=true
@@ -43,12 +43,9 @@ services:
- RUSTFS_TLS_PATH=/opt/tls
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4317
volumes:
- rustfs_data_0:/data/rustfs0
- rustfs_data_1:/data/rustfs1
- rustfs_data_2:/data/rustfs2
- rustfs_data_3:/data/rustfs3
- logs_data:/app/logs
- .docker/tls/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
- deploy/data/pro:/data
- deploy/logs:/app/logs
- deploy/data/certs/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
networks:
- rustfs-network
restart: unless-stopped
@@ -78,7 +75,7 @@ services:
- "9010:9000" # S3 API port
- "9011:9001" # Console port
environment:
- RUSTFS_VOLUMES=/data/rustfs0,/data/rustfs1
- RUSTFS_VOLUMES=/data/rustfs{1..4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_CONSOLE_ENABLE=true
@@ -90,7 +87,7 @@ services:
- RUSTFS_LOG_LEVEL=debug
volumes:
- .:/app # Mount source code to /app for development
- rustfs_dev_data:/data
- deploy/data/dev:/data
networks:
- rustfs-network
restart: unless-stopped
@@ -98,7 +95,7 @@ services:
test:
[
"CMD",
"sh", "-c",
"sh", "-c",
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
]
interval: 30s
@@ -239,5 +236,5 @@ volumes:
driver: local
redis_data:
driver: local
logs_data:
logs:
driver: local

View File

@@ -70,6 +70,7 @@ clap = { workspace = true }
datafusion = { workspace = true }
const-str = { workspace = true }
futures.workspace = true
hashbrown = { workspace = true }
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true

View File

@@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::admin::router::Operation;
use crate::auth::{check_key_valid, get_session_token};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use rustfs_config::{ENABLE_KEY, EnableState};
use rustfs_notify::rules::{BucketNotificationConfig, PatternRules};
use rustfs_targets::EventName;
use rustfs_targets::check_mqtt_broker_available;
use s3s::header::CONTENT_LENGTH;
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use serde::{Deserialize, Serialize};
use serde_urlencoded::from_bytes;
use std::collections::HashMap;
use std::future::Future;
use std::io::{Error, ErrorKind};
use std::net::SocketAddr;
@@ -36,12 +31,6 @@ use tokio::time::{Duration, sleep};
use tracing::{debug, error, info, warn};
use url::Url;
#[derive(Debug, Deserialize)]
struct BucketQuery {
#[serde(rename = "bucketName")]
bucket_name: String,
}
#[derive(Debug, Deserialize)]
pub struct KeyValue {
pub key: String,
@@ -177,6 +166,7 @@ impl Operation for NotificationTarget {
let mut client_cert_val = None;
let mut client_key_val = None;
let mut qos_val = None;
let mut topic_val = String::new();
for kv in notification_body.key_values.iter() {
if !allowed_keys.contains(kv.key.as_str()) {
@@ -190,6 +180,16 @@ impl Operation for NotificationTarget {
if kv.key == "endpoint" {
endpoint_val = Some(kv.value.clone());
}
if target_type == NOTIFY_MQTT_SUB_SYS {
if kv.key == rustfs_config::MQTT_BROKER {
endpoint_val = Some(kv.value.clone());
}
if kv.key == rustfs_config::MQTT_TOPIC {
topic_val = kv.value.clone();
}
}
if kv.key == "queue_dir" {
queue_dir_val = Some(kv.value.clone());
}
@@ -236,12 +236,15 @@ impl Operation for NotificationTarget {
}
if target_type == NOTIFY_MQTT_SUB_SYS {
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?;
let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
match url.scheme() {
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {}
_ => return Err(s3_error!(InvalidArgument, "unsupported broker url scheme")),
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?;
if topic_val.is_empty() {
return Err(s3_error!(InvalidArgument, "topic is required"));
}
// Check MQTT Broker availability
if let Err(e) = check_mqtt_broker_available(&endpoint, &topic_val).await {
return Err(s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e));
}
if let Some(queue_dir) = queue_dir_val {
validate_queue_dir(&queue_dir).await?;
if let Some(qos) = qos_val {
@@ -420,113 +423,3 @@ fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, &
let target_name = extract_param(params, "target_name")?;
Ok((target_type, target_name))
}
/// Set notification rules for buckets
pub struct SetBucketNotification {}
#[async_trait::async_trait]
impl Operation for SetBucketNotification {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// 1. Analyze query parameters
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
// 2. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
// 4. The parsing request body is BucketNotificationConfig
let mut input = req.input;
let body = input.store_all_unlimited().await.map_err(|e| {
warn!("failed to read request body: {:?}", e);
s3_error!(InvalidRequest, "failed to read request body")
})?;
let config: BucketNotificationConfig = serde_json::from_slice(&body)
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for bucket notification config: {}", e))?;
// 5. Load bucket notification configuration
info!("Loading notification config for bucket '{}'", &query.bucket_name);
ns.load_bucket_notification_config(&query.bucket_name, &config)
.await
.map_err(|e| {
error!("failed to load bucket notification config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to load bucket notification config: {e}"))
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
/// Get notification rules for buckets
#[derive(Serialize)]
struct BucketRulesResponse {
rules: HashMap<EventName, PatternRules>,
}
pub struct GetBucketNotification {}
#[async_trait::async_trait]
impl Operation for GetBucketNotification {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
let rules_map = ns.notifier.get_rules_map(&query.bucket_name);
let response = BucketRulesResponse {
rules: rules_map.unwrap_or_default().inner().clone(),
};
let data = serde_json::to_vec(&response)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize rules: {e}")))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
/// Remove all notification rules for a bucket
pub struct RemoveBucketNotification {}
#[async_trait::async_trait]
impl Operation for RemoveBucketNotification {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let Some(ns) = rustfs_notify::global::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
info!("Removing notification config for bucket '{}'", &query.bucket_name);
ns.remove_bucket_notification_config(&query.bucket_name).await;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}

View File

@@ -23,10 +23,7 @@ pub mod utils;
use handlers::{
GetReplicationMetricsHandler, HealthCheckHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler,
bucket_meta,
event::{
GetBucketNotification, ListNotificationTargets, NotificationTarget, RemoveBucketNotification, RemoveNotificationTarget,
SetBucketNotification,
},
event::{ListNotificationTargets, ListTargetsArns, NotificationTarget, RemoveNotificationTarget},
group, kms, kms_dynamic, kms_keys, policies, pools, rebalance,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
sts, tier, user,
@@ -519,25 +516,7 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/target/arns").as_str(),
AdminOperation(&ListNotificationTargets {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-set-bucket").as_str(),
AdminOperation(&SetBucketNotification {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-get-bucket").as_str(),
AdminOperation(&GetBucketNotification {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-remove-bucket").as_str(),
AdminOperation(&RemoveBucketNotification {}),
AdminOperation(&ListTargetsArns {}),
)?;
Ok(())

View File

@@ -92,6 +92,10 @@ where
T: Operation,
{
fn is_match(&self, method: &Method, uri: &Uri, headers: &HeaderMap, _: &mut Extensions) -> bool {
if method == Method::GET && uri.path() == "/health" {
return true;
}
// AssumeRole
if method == Method::POST && uri.path() == "/" {
if let Some(val) = headers.get(header::CONTENT_TYPE) {
@@ -104,36 +108,13 @@ where
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(CONSOLE_PREFIX)
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
if let Some(console_router) = &self.console_router {
let mut console_router = console_router.clone();
let req = convert_request(req);
let result = console_router.call(req).await;
return match result {
Ok(resp) => Ok(convert_response(resp)),
Err(e) => Err(s3_error!(InternalError, "{}", e)),
};
}
return Err(s3_error!(InternalError, "console is not enabled"));
}
let uri = format!("{}|{}", &req.method, req.uri.path());
// warn!("get uri {}", &uri);
if let Ok(mat) = self.router.at(&uri) {
let op: &T = mat.value;
let mut resp = op.call(req, mat.params).await?;
resp.status = Some(resp.output.0);
return Ok(resp.map_output(|x| x.1));
}
return Err(s3_error!(NotImplemented));
}
// check_access before call
async fn check_access(&self, req: &mut S3Request<Body>) -> S3Result<()> {
// Allow unauthenticated access to health check
if req.method == Method::GET && req.uri.path() == "/health" {
return Ok(());
}
// Allow unauthenticated access to console static files if console is enabled
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
return Ok(());
}
@@ -156,6 +137,31 @@ where
None => Err(s3_error!(AccessDenied, "Signature is required")),
}
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
if let Some(console_router) = &self.console_router {
let mut console_router = console_router.clone();
let req = convert_request(req);
let result = console_router.call(req).await;
return match result {
Ok(resp) => Ok(convert_response(resp)),
Err(e) => Err(s3_error!(InternalError, "{}", e)),
};
}
return Err(s3_error!(InternalError, "console is not enabled"));
}
let uri = format!("{}|{}", &req.method, req.uri.path());
if let Ok(mat) = self.router.at(&uri) {
let op: &T = mat.value;
let mut resp = op.call(req, mat.params).await?;
resp.status = Some(resp.output.0);
return Ok(resp.map_output(|x| x.1));
}
Err(s3_error!(NotImplemented))
}
}
#[async_trait::async_trait]

View File

@@ -18,6 +18,11 @@ use rustfs_config::DEFAULT_DELIMITER;
use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG;
use tracing::{error, info, warn};
/// Start the audit system.
/// This function checks if the audit subsystem is configured in the global server configuration.
/// If configured, it initializes and starts the audit system.
/// If not configured, it skips the initialization.
/// It also handles cases where the audit system is already running or if the global configuration is not loaded.
pub(crate) async fn start_audit_system() -> AuditResult<()> {
info!(
target: "rustfs::main::start_audit_system",
@@ -94,6 +99,10 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> {
}
}
/// Stop the audit system.
/// This function checks if the audit system is initialized and running.
/// If it is running, it prepares to stop the system, stops it, and records the stop time.
/// If the system is already stopped or not initialized, it logs a warning and returns.
pub(crate) async fn stop_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
let state = system.get_state().await;

View File

@@ -12,136 +12,112 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::access::authorize_request;
use super::options::del_opts;
use super::options::extract_metadata;
use super::options::put_opts;
use crate::auth::get_condition_values;
use crate::error::ApiError;
use crate::storage::access::ReqInfo;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::get_complete_multipart_upload_opts;
use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts, parse_copy_source_range};
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use http::StatusCode;
use rustfs_ecstore::bucket::metadata_sys::get_replication_config;
use rustfs_ecstore::bucket::object_lock::objectlock_sys::BucketObjectLockSys;
use rustfs_ecstore::bucket::replication::DeletedObjectReplicationInfo;
use rustfs_ecstore::bucket::replication::REPLICATE_INCOMING_DELETE;
use rustfs_ecstore::bucket::replication::ReplicationConfigurationExt;
use rustfs_ecstore::bucket::replication::check_replicate_delete;
use rustfs_ecstore::bucket::replication::get_must_replicate_options;
use rustfs_ecstore::bucket::replication::must_replicate;
use rustfs_ecstore::bucket::replication::schedule_replication;
use rustfs_ecstore::bucket::replication::schedule_replication_delete;
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::disk::error::DiskError;
use rustfs_ecstore::disk::error_reduce::is_all_buckets_not_found;
use rustfs_ecstore::error::is_err_bucket_not_found;
use rustfs_ecstore::error::is_err_object_not_found;
use rustfs_ecstore::error::is_err_version_not_found;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::ReplicationStatusType;
use rustfs_filemeta::ReplicationType;
use rustfs_filemeta::VersionPurgeStatusType;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_query::get_global_db;
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
use crate::storage::{
access::{ReqInfo, authorize_request},
options::{
copy_dst_opts, copy_src_opts, del_opts, extract_metadata, extract_metadata_from_mime_with_object_name,
get_complete_multipart_upload_opts, get_opts, parse_copy_source_range, put_opts,
},
};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::arrow::{
csv::WriterBuilder as CsvWriterBuilder, json::WriterBuilder as JsonWriterBuilder, json::writer::JsonArray,
};
use futures::StreamExt;
use http::HeaderMap;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
use rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle;
use rustfs_ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_REPLICATION_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_SSECONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use rustfs_ecstore::bucket::metadata::BUCKET_VERSIONING_CONFIG;
use rustfs_ecstore::bucket::metadata::OBJECT_LOCK_CONFIG;
use rustfs_ecstore::bucket::metadata_sys;
use rustfs_ecstore::bucket::policy_sys::PolicySys;
use rustfs_ecstore::bucket::tagging::decode_tags;
use rustfs_ecstore::bucket::tagging::encode_tags;
use rustfs_ecstore::bucket::utils::serialize;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::client::object_api_utils::format_etag;
use rustfs_ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use rustfs_ecstore::compress::is_compressible;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::{DEFAULT_READ_BUFFER_SIZE, is_valid_storage_class};
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::store_api::CompletePart;
use rustfs_ecstore::store_api::DeleteBucketOptions;
use rustfs_ecstore::store_api::HTTPRangeSpec;
use rustfs_ecstore::store_api::MakeBucketOptions;
use rustfs_ecstore::store_api::MultipartUploadResult;
use rustfs_ecstore::store_api::ObjectIO;
use rustfs_ecstore::store_api::ObjectOptions;
use rustfs_ecstore::store_api::ObjectToDelete;
use rustfs_ecstore::store_api::PutObjReader;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_filemeta::fileinfo::ObjectPartInfo;
use rustfs_kms::DataKey;
use rustfs_kms::service_manager::get_global_encryption_service;
use rustfs_kms::types::{EncryptionMetadata, ObjectEncryptionContext};
use http::{HeaderMap, StatusCode};
use rustfs_ecstore::{
bucket::{
lifecycle::{bucket_lifecycle_ops::validate_transition_tier, lifecycle::Lifecycle},
metadata::{
BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_REPLICATION_CONFIG,
BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, BUCKET_VERSIONING_CONFIG, OBJECT_LOCK_CONFIG,
},
metadata_sys,
metadata_sys::get_replication_config,
object_lock::objectlock_sys::BucketObjectLockSys,
policy_sys::PolicySys,
replication::{
DeletedObjectReplicationInfo, REPLICATE_INCOMING_DELETE, ReplicationConfigurationExt, check_replicate_delete,
get_must_replicate_options, must_replicate, schedule_replication, schedule_replication_delete,
},
tagging::{decode_tags, encode_tags},
utils::serialize,
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
},
client::object_api_utils::format_etag,
compress::{MIN_COMPRESSIBLE_SIZE, is_compressible},
disk::{error::DiskError, error_reduce::is_all_buckets_not_found},
error::{StorageError, is_err_bucket_not_found, is_err_object_not_found, is_err_version_not_found},
new_object_layer_fn,
set_disk::{DEFAULT_READ_BUFFER_SIZE, MAX_PARTS_COUNT, is_valid_storage_class},
store_api::{
BucketOptions,
CompletePart,
DeleteBucketOptions,
HTTPRangeSpec,
MakeBucketOptions,
MultipartUploadResult,
ObjectIO,
ObjectInfo,
ObjectOptions,
ObjectToDelete,
PutObjReader,
StorageAPI,
// RESERVED_METADATA_PREFIX,
},
};
use rustfs_filemeta::{ReplicationStatusType, ReplicationType, VersionPurgeStatusType, fileinfo::ObjectPartInfo};
use rustfs_kms::{
DataKey,
service_manager::get_global_encryption_service,
types::{EncryptionMetadata, ObjectEncryptionContext},
};
use rustfs_notify::global::notifier_instance;
use rustfs_policy::auth;
use rustfs_policy::policy::action::Action;
use rustfs_policy::policy::action::S3Action;
use rustfs_policy::policy::{BucketPolicy, BucketPolicyArgs, Validator};
use rustfs_rio::CompressReader;
use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_rio::{DecryptReader, EncryptReader, HardLimitReader};
use rustfs_targets::EventName;
use rustfs_targets::arn::{TargetID, TargetIDError};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::AMZ_BUCKET_REPLICATION_STATUS;
use rustfs_utils::http::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_utils::http::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_utils::path::is_dir_object;
use rustfs_utils::path::path_join_buf;
use rustfs_policy::{
auth,
policy::{
action::{Action, S3Action},
{BucketPolicy, BucketPolicyArgs, Validator},
},
};
use rustfs_rio::{CompressReader, DecryptReader, EncryptReader, EtagReader, HardLimitReader, HashReader, Reader, WarpReader};
use rustfs_s3select_api::{
object_store::bytes_stream,
query::{Context, Query},
};
use rustfs_s3select_query::get_global_db;
use rustfs_targets::{
EventName,
arn::{TargetID, TargetIDError},
};
use rustfs_utils::{
CompressionAlgorithm,
http::{
AMZ_BUCKET_REPLICATION_STATUS,
headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER},
},
path::{is_dir_object, path_join_buf},
};
use rustfs_zip::CompressionFormat;
use s3s::S3;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::dto::*;
use s3s::s3_error;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, dto::*, s3_error};
use std::{
collections::HashMap,
fmt::Debug,
path::Path,
str::FromStr,
sync::{Arc, LazyLock},
};
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tokio::{io::AsyncRead, sync::mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
macro_rules! try_ {

View File

@@ -58,7 +58,7 @@ export RUSTFS_EXTERNAL_ADDRESS=":9000"
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
#export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
export RUSTFS_OBS_ENVIRONMENT=production # Environment name
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
@@ -123,6 +123,10 @@ export RUSTFS_COMPRESSION_ENABLED=true # Whether to enable compression
#export RUSTFS_REGION="us-east-1"
export RUSTFS_ENABLE_SCANNER=false
export RUSTFS_ENABLE_HEAL=false
# Event message configuration
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"