mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
6 Commits
1.0.0-alph
...
chore/appe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ad99019749 | ||
|
|
aac9b1edb7 | ||
|
|
5689311cff | ||
|
|
007d9c0b21 | ||
|
|
626c7ed34a | ||
|
|
0e680eae31 |
1238
Cargo.lock
generated
1238
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
65
Cargo.toml
65
Cargo.toml
@@ -99,10 +99,12 @@ async-recursion = "1.1.1"
|
||||
async-trait = "0.1.89"
|
||||
async-compression = { version = "0.4.19" }
|
||||
atomic_enum = "0.3.0"
|
||||
aws-config = { version = "1.8.6" }
|
||||
aws-sdk-s3 = { version = "1.106.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
|
||||
axum = "0.8.4"
|
||||
axum-extra = "0.10.1"
|
||||
aws-config = { version = "1.8.8" }
|
||||
aws-credential-types = { version = "1.2.8" }
|
||||
aws-smithy-types = { version = "1.3.3" }
|
||||
aws-sdk-s3 = { version = "1.108.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
|
||||
axum = "0.8.6"
|
||||
axum-extra = "0.10.3"
|
||||
axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"], default-features = false }
|
||||
base64-simd = "0.8.0"
|
||||
base64 = "0.22.1"
|
||||
@@ -111,6 +113,7 @@ bytes = { version = "1.10.1", features = ["serde"] }
|
||||
bytesize = "2.1.0"
|
||||
byteorder = "1.5.0"
|
||||
cfg-if = "1.0.3"
|
||||
convert_case = "0.8.0"
|
||||
crc-fast = "1.3.0"
|
||||
chacha20poly1305 = { version = "0.10.1" }
|
||||
chrono = { version = "0.4.42", features = ["serde"] }
|
||||
@@ -119,18 +122,18 @@ const-str = { version = "0.7.0", features = ["std", "proc"] }
|
||||
crc32fast = "1.5.0"
|
||||
criterion = { version = "0.7", features = ["html_reports"] }
|
||||
crossbeam-queue = "0.3.12"
|
||||
dashmap = "6.1.0"
|
||||
datafusion = "50.0.0"
|
||||
datafusion = "50.1.0"
|
||||
derive_builder = "0.20.2"
|
||||
enumset = "1.1.10"
|
||||
flatbuffers = "25.9.23"
|
||||
flate2 = "1.1.2"
|
||||
flexi_logger = { version = "0.31.4", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
|
||||
flate2 = "1.1.4"
|
||||
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
|
||||
form_urlencoded = "1.2.2"
|
||||
futures = "0.3.31"
|
||||
futures-core = "0.3.31"
|
||||
futures-util = "0.3.31"
|
||||
glob = "0.3.3"
|
||||
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
|
||||
hex-simd = "0.8.0"
|
||||
highway = { version = "1.3.0" }
|
||||
hickory-resolver = { version = "0.25.2", features = ["tls-ring"] }
|
||||
@@ -146,8 +149,9 @@ http = "1.3.1"
|
||||
http-body = "1.0.1"
|
||||
humantime = "2.3.0"
|
||||
ipnetwork = { version = "0.21.1", features = ["serde"] }
|
||||
jsonwebtoken = "9.3.1"
|
||||
jsonwebtoken = { version = "10.0.0", features = ["rust_crypto"] }
|
||||
lazy_static = "1.5.0"
|
||||
libc = "0.2.177"
|
||||
libsystemd = { version = "0.7.2" }
|
||||
local-ip-address = "0.6.5"
|
||||
lz4 = "1.28.1"
|
||||
@@ -158,39 +162,39 @@ mime_guess = "2.0.5"
|
||||
moka = { version = "0.12.11", features = ["future"] }
|
||||
netif = "0.1.6"
|
||||
nix = { version = "0.30.1", features = ["fs"] }
|
||||
nu-ansi-term = "0.50.1"
|
||||
nu-ansi-term = "0.50.3"
|
||||
num_cpus = { version = "1.17.0" }
|
||||
nvml-wrapper = "0.11.0"
|
||||
object_store = "0.12.4"
|
||||
once_cell = "1.21.3"
|
||||
opentelemetry = { version = "0.30.0" }
|
||||
opentelemetry-appender-tracing = { version = "0.30.1", features = [
|
||||
opentelemetry = { version = "0.31.0" }
|
||||
opentelemetry-appender-tracing = { version = "0.31.1", features = [
|
||||
"experimental_use_tracing_span_context",
|
||||
"experimental_metadata_attributes",
|
||||
"spec_unstable_logs_enabled"
|
||||
] }
|
||||
opentelemetry_sdk = { version = "0.30.0" }
|
||||
opentelemetry-stdout = { version = "0.30.0" }
|
||||
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
|
||||
opentelemetry_sdk = { version = "0.31.0" }
|
||||
opentelemetry-stdout = { version = "0.31.0" }
|
||||
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = [
|
||||
"grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"
|
||||
] }
|
||||
opentelemetry-semantic-conventions = { version = "0.30.0", features = [
|
||||
opentelemetry-semantic-conventions = { version = "0.31.0", features = [
|
||||
"semconv_experimental",
|
||||
] }
|
||||
parking_lot = "0.12.4"
|
||||
parking_lot = "0.12.5"
|
||||
path-absolutize = "3.1.1"
|
||||
path-clean = "1.0.1"
|
||||
blake3 = { version = "1.8.2" }
|
||||
pbkdf2 = "0.12.2"
|
||||
percent-encoding = "2.3.2"
|
||||
pin-project-lite = "0.2.16"
|
||||
prost = "0.14.1"
|
||||
pretty_assertions = "1.4.1"
|
||||
quick-xml = "0.38.3"
|
||||
rand = "0.9.2"
|
||||
rayon = "1.11.0"
|
||||
rdkafka = { version = "0.38.0", features = ["tokio"] }
|
||||
reed-solomon-simd = { version = "3.0.1" }
|
||||
regex = { version = "1.11.2" }
|
||||
regex = { version = "1.12.1" }
|
||||
reqwest = { version = "0.12.23", default-features = false, features = [
|
||||
"rustls-tls-webpki-roots",
|
||||
"charset",
|
||||
@@ -200,38 +204,39 @@ reqwest = { version = "0.12.23", default-features = false, features = [
|
||||
"json",
|
||||
"blocking",
|
||||
] }
|
||||
rmcp = { version = "0.6.4" }
|
||||
rmcp = { version = "0.8.1" }
|
||||
rmp = "0.8.14"
|
||||
rmp-serde = "1.3.0"
|
||||
rsa = "0.9.8"
|
||||
rumqttc = { version = "0.25.0" }
|
||||
rust-embed = { version = "8.7.2" }
|
||||
rustfs-rsc = "2025.506.1"
|
||||
rustc-hash = { version = "2.1.1" }
|
||||
rustls = { version = "0.23.32", features = ["ring", "logging", "std", "tls12"], default-features = false }
|
||||
rustls-pki-types = "1.12.0"
|
||||
rustls-pemfile = "2.2.0"
|
||||
s3s = { version = "0.12.0-rc.1", features = ["minio"] }
|
||||
s3s = { version = "0.12.0-rc.2", features = ["minio"] }
|
||||
schemars = "1.0.4"
|
||||
serde = { version = "1.0.226", features = ["derive"] }
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = { version = "1.0.145", features = ["raw_value"] }
|
||||
serde_urlencoded = "0.7.1"
|
||||
serial_test = "3.2.0"
|
||||
sha1 = "0.10.6"
|
||||
sha2 = "0.10.9"
|
||||
shadow-rs = { version = "1.3.0", default-features = false }
|
||||
shadow-rs = { version = "1.4.0", default-features = false }
|
||||
siphasher = "1.0.1"
|
||||
smallvec = { version = "1.15.1", features = ["serde"] }
|
||||
smartstring = "1.0.1"
|
||||
snafu = "0.8.9"
|
||||
snap = "1.1.1"
|
||||
socket2 = "0.6.0"
|
||||
starshard = { version = "0.5.0", features = ["rayon", "async", "serde"] }
|
||||
strum = { version = "0.27.2", features = ["derive"] }
|
||||
sysinfo = "0.37.0"
|
||||
sysinfo = "0.37.1"
|
||||
sysctl = "0.7.1"
|
||||
tempfile = "3.23.0"
|
||||
temp-env = "0.3.6"
|
||||
test-case = "3.3.1"
|
||||
thiserror = "2.0.16"
|
||||
thiserror = "2.0.17"
|
||||
time = { version = "0.3.44", features = [
|
||||
"std",
|
||||
"parsing",
|
||||
@@ -240,7 +245,7 @@ time = { version = "0.3.44", features = [
|
||||
"serde",
|
||||
] }
|
||||
tokio = { version = "1.47.1", features = ["fs", "rt-multi-thread"] }
|
||||
tokio-rustls = { version = "0.26.3", default-features = false, features = ["logging", "tls12", "ring"] }
|
||||
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "ring"] }
|
||||
tokio-stream = { version = "0.1.17" }
|
||||
tokio-tar = "0.3.1"
|
||||
tokio-test = "0.4.4"
|
||||
@@ -253,7 +258,7 @@ tower-http = { version = "0.6.6", features = ["cors"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-core = "0.1.34"
|
||||
tracing-error = "0.2.1"
|
||||
tracing-opentelemetry = "0.31.0"
|
||||
tracing-opentelemetry = "0.32.0"
|
||||
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
|
||||
transform-stream = "0.3.1"
|
||||
url = "2.5.7"
|
||||
@@ -266,10 +271,10 @@ uuid = { version = "1.18.1", features = [
|
||||
vaultrs = { version = "0.7.4" }
|
||||
walkdir = "2.5.0"
|
||||
wildmatch = { version = "2.5.0", features = ["serde"] }
|
||||
zeroize = { version = "1.8.1", features = ["derive"] }
|
||||
zeroize = { version = "1.8.2", features = ["derive"] }
|
||||
winapi = { version = "0.3.9" }
|
||||
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
|
||||
zip = "5.1.1"
|
||||
zip = "6.0.0"
|
||||
zstd = "0.13.3"
|
||||
|
||||
|
||||
|
||||
@@ -225,7 +225,7 @@ async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dy
|
||||
</Rule>
|
||||
<Rule>
|
||||
<ID>test-rule2</ID>
|
||||
<Status>Desabled</Status>
|
||||
<Status>Disabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
|
||||
@@ -12,9 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::AuditEntry;
|
||||
use crate::AuditResult;
|
||||
use crate::AuditSystem;
|
||||
use crate::{AuditEntry, AuditResult, AuditSystem};
|
||||
use once_cell::sync::OnceCell;
|
||||
use rustfs_ecstore::config::Config;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -17,9 +17,11 @@ use crate::AuditRegistry;
|
||||
use crate::observability;
|
||||
use crate::{AuditError, AuditResult};
|
||||
use rustfs_ecstore::config::Config;
|
||||
use rustfs_targets::store::{Key, Store};
|
||||
use rustfs_targets::target::EntityTarget;
|
||||
use rustfs_targets::{StoreError, Target, TargetError};
|
||||
use rustfs_targets::{
|
||||
StoreError, Target, TargetError,
|
||||
store::{Key, Store},
|
||||
target::EntityTarget,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tracing::{error, info, warn};
|
||||
@@ -257,7 +259,7 @@ impl AuditSystem {
|
||||
let target_id_clone = target_id.clone();
|
||||
|
||||
// Create EntityTarget for the audit log entry
|
||||
let entity_target = rustfs_targets::target::EntityTarget {
|
||||
let entity_target = EntityTarget {
|
||||
object_name: entry.api.name.clone().unwrap_or_default(),
|
||||
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
|
||||
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
|
||||
@@ -337,7 +339,7 @@ impl AuditSystem {
|
||||
let mut success_count = 0;
|
||||
let mut errors = Vec::new();
|
||||
for entry in entries_clone {
|
||||
let entity_target = rustfs_targets::target::EntityTarget {
|
||||
let entity_target = EntityTarget {
|
||||
object_name: entry.api.name.clone().unwrap_or_default(),
|
||||
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
|
||||
event_name: rustfs_targets::EventName::ObjectCreatedPut,
|
||||
|
||||
@@ -388,23 +388,11 @@ async fn test_bucket_default_encryption_multipart_upload() -> Result<(), Box<dyn
|
||||
complete_multipart_response.ssekms_key_id()
|
||||
);
|
||||
|
||||
// Verify: complete_multipart_upload response should contain encryption information
|
||||
// KNOWN BUG: s3s library bug where CompleteMultipartUploadOutput encryption fields serialize as None
|
||||
// even when properly set. Our server implementation is correct (see server logs above).
|
||||
// TODO: Remove this workaround when s3s library is fixed
|
||||
warn!("KNOWN BUG: s3s library - complete_multipart_upload response encryption fields return None even when set");
|
||||
|
||||
if complete_multipart_response.server_side_encryption().is_some() {
|
||||
// If s3s library is fixed, verify the encryption info
|
||||
assert_eq!(
|
||||
complete_multipart_response.server_side_encryption(),
|
||||
Some(&ServerSideEncryption::AwsKms),
|
||||
"complete_multipart_upload response should contain SSE-KMS encryption information"
|
||||
);
|
||||
} else {
|
||||
// Expected behavior due to s3s library bug - log and continue
|
||||
warn!("Skipping assertion due to known s3s library bug - server logs confirm correct encryption handling");
|
||||
}
|
||||
assert_eq!(
|
||||
complete_multipart_response.server_side_encryption(),
|
||||
Some(&ServerSideEncryption::AwsKms),
|
||||
"complete_multipart_upload response should contain SSE-KMS encryption information"
|
||||
);
|
||||
|
||||
// Step 4: Download file and verify encryption status
|
||||
info!("Downloading file and verifying encryption status");
|
||||
|
||||
@@ -75,7 +75,6 @@ hyper-util.workspace = true
|
||||
hyper-rustls.workspace = true
|
||||
rustls.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util", "sync", "signal"] }
|
||||
tokio-stream = { workspace = true }
|
||||
tonic.workspace = true
|
||||
xxhash-rust = { workspace = true, features = ["xxh64", "xxh3"] }
|
||||
tower.workspace = true
|
||||
@@ -89,8 +88,6 @@ rustfs-madmin.workspace = true
|
||||
rustfs-workers.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
aws-sdk-s3 = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
rustfs-rsc = { workspace = true }
|
||||
urlencoding = { workspace = true }
|
||||
smallvec = { workspace = true }
|
||||
shadow-rs.workspace = true
|
||||
@@ -99,13 +96,11 @@ rustfs-utils = { workspace = true, features = ["full"] }
|
||||
rustfs-rio.workspace = true
|
||||
rustfs-signer.workspace = true
|
||||
rustfs-checksums.workspace = true
|
||||
futures-util.workspace = true
|
||||
async-recursion.workspace = true
|
||||
aws-credential-types = "1.2.6"
|
||||
aws-smithy-types = "1.3.2"
|
||||
parking_lot = "0.12"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
aws-smithy-runtime-api = "1.9.0"
|
||||
aws-credential-types = { workspace = true }
|
||||
aws-smithy-types = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
moka = { workspace = true }
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
nix = { workspace = true }
|
||||
|
||||
@@ -2312,7 +2312,11 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
|
||||
|
||||
let size = oi1.get_actual_size().unwrap_or_default();
|
||||
|
||||
if oi1.etag != oi2.e_tag
|
||||
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
|
||||
let oi1_etag = oi1.etag.as_ref().map(|e| rustfs_utils::path::trim_etag(e));
|
||||
let oi2_etag = oi2.e_tag.as_ref().map(|e| rustfs_utils::path::trim_etag(e));
|
||||
|
||||
if oi1_etag != oi2_etag
|
||||
|| oi1.version_id.map(|v| v.to_string()) != oi2.version_id
|
||||
|| size != oi2.content_length.unwrap_or_default()
|
||||
|| oi1.delete_marker != oi2.delete_marker.unwrap_or_default()
|
||||
|
||||
@@ -5668,8 +5668,11 @@ impl StorageAPI for SetDisks {
|
||||
}
|
||||
return Err(to_object_err(ERR_METHOD_NOT_ALLOWED, vec![bucket, object]));
|
||||
}*/
|
||||
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
|
||||
let transition_etag = rustfs_utils::path::trim_etag(&opts.transition.etag);
|
||||
let stored_etag = rustfs_utils::path::trim_etag(&extract_etag(&fi.metadata));
|
||||
if !opts.mod_time.expect("err").unix_timestamp() == fi.mod_time.as_ref().expect("err").unix_timestamp()
|
||||
|| opts.transition.etag != extract_etag(&fi.metadata)
|
||||
|| transition_etag != stored_etag
|
||||
{
|
||||
return Err(to_object_err(Error::from(DiskError::FileNotFound), vec![bucket, object]));
|
||||
}
|
||||
@@ -6598,10 +6601,13 @@ impl StorageAPI for SetDisks {
|
||||
let ext_part = &curr_fi.parts[i];
|
||||
tracing::info!(target:"rustfs_ecstore::set_disk", part_number = p.part_num, part_size = ext_part.size, part_actual_size = ext_part.actual_size, "Completing multipart part");
|
||||
|
||||
if p.etag != Some(ext_part.etag.clone()) {
|
||||
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
|
||||
let client_etag = p.etag.as_ref().map(|e| rustfs_utils::path::trim_etag(e));
|
||||
let stored_etag = Some(rustfs_utils::path::trim_etag(&ext_part.etag));
|
||||
if client_etag != stored_etag {
|
||||
error!(
|
||||
"complete_multipart_upload etag err {:?}, part_id={}, bucket={}, object={}",
|
||||
p.etag, p.part_num, bucket, object
|
||||
"complete_multipart_upload etag err client={:?}, stored={:?}, part_id={}, bucket={}, object={}",
|
||||
p.etag, ext_part.etag, p.part_num, bucket, object
|
||||
);
|
||||
return Err(Error::InvalidPart(p.part_num, ext_part.etag.clone(), p.etag.clone().unwrap_or_default()));
|
||||
}
|
||||
|
||||
@@ -169,6 +169,9 @@ impl InlineData {
|
||||
}
|
||||
pub fn remove(&mut self, remove_keys: Vec<Uuid>) -> Result<bool> {
|
||||
let buf = self.after_version();
|
||||
if buf.is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
let mut cur = Cursor::new(buf);
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)? as usize;
|
||||
|
||||
@@ -87,7 +87,7 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> std::result::Resu
|
||||
jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
|
||||
}
|
||||
|
||||
pub fn extract_claims<T: DeserializeOwned>(
|
||||
pub fn extract_claims<T: DeserializeOwned + Clone>(
|
||||
token: &str,
|
||||
secret: &str,
|
||||
) -> std::result::Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {
|
||||
@@ -193,7 +193,7 @@ mod tests {
|
||||
assert_eq!(error.to_string(), "secret key length is too short");
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
|
||||
struct Claims {
|
||||
sub: String,
|
||||
company: String,
|
||||
|
||||
@@ -32,14 +32,17 @@ rustfs-utils = { workspace = true, features = ["path", "sys"] }
|
||||
rustfs-targets = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
dashmap = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
form_urlencoded = { workspace = true }
|
||||
hashbrown = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
|
||||
rayon = { workspace = true }
|
||||
rumqttc = { workspace = true }
|
||||
rustc-hash = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
starshard = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -13,9 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use hashbrown::HashMap;
|
||||
use rustfs_targets::EventName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use url::form_urlencoded;
|
||||
|
||||
/// Represents the identity of the user who triggered the event
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use crate::Event;
|
||||
use async_trait::async_trait;
|
||||
use hashbrown::HashSet;
|
||||
use rumqttc::QoS;
|
||||
use rustfs_config::notify::{ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS};
|
||||
use rustfs_config::{
|
||||
@@ -27,7 +28,6 @@ use rustfs_targets::{
|
||||
error::TargetError,
|
||||
target::{mqtt::MQTTArgs, webhook::WebhookArgs},
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, warn};
|
||||
use url::Url;
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
use crate::{
|
||||
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use rustfs_ecstore::config::{Config, KVS};
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use rustfs_targets::store::{Key, Store};
|
||||
use rustfs_targets::target::EntityTarget;
|
||||
use rustfs_targets::{StoreError, Target};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -212,11 +212,6 @@ impl NotificationSystem {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// if let Err(e) = rustfs_ecstore::config::com::save_server_config(store, &new_config).await {
|
||||
// error!("Failed to save config: {}", e);
|
||||
// return Err(NotificationError::SaveConfig(e.to_string()));
|
||||
// }
|
||||
|
||||
info!("Configuration updated. Reloading system...");
|
||||
self.reload_config(new_config).await
|
||||
}
|
||||
|
||||
@@ -13,19 +13,20 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{error::NotificationError, event::Event, rules::RulesMap};
|
||||
use dashmap::DashMap;
|
||||
use hashbrown::HashMap;
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::Target;
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use rustfs_targets::target::EntityTarget;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use starshard::AsyncShardedHashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
/// Manages event notification to targets based on rules
|
||||
pub struct EventNotifier {
|
||||
target_list: Arc<RwLock<TargetList>>,
|
||||
bucket_rules_map: Arc<DashMap<String, RulesMap>>,
|
||||
bucket_rules_map: Arc<AsyncShardedHashMap<String, RulesMap, rustc_hash::FxBuildHasher>>,
|
||||
}
|
||||
|
||||
impl Default for EventNotifier {
|
||||
@@ -39,7 +40,7 @@ impl EventNotifier {
|
||||
pub fn new() -> Self {
|
||||
EventNotifier {
|
||||
target_list: Arc::new(RwLock::new(TargetList::new())),
|
||||
bucket_rules_map: Arc::new(DashMap::new()),
|
||||
bucket_rules_map: Arc::new(AsyncShardedHashMap::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,7 +59,7 @@ impl EventNotifier {
|
||||
/// This method removes all rules associated with the specified bucket name.
|
||||
/// It will log a message indicating the removal of rules.
|
||||
pub async fn remove_rules_map(&self, bucket_name: &str) {
|
||||
if self.bucket_rules_map.remove(bucket_name).is_some() {
|
||||
if self.bucket_rules_map.remove(&bucket_name.to_string()).await.is_some() {
|
||||
info!("Removed all notification rules for bucket: {}", bucket_name);
|
||||
}
|
||||
}
|
||||
@@ -76,21 +77,21 @@ impl EventNotifier {
|
||||
/// Adds a rules map for a bucket
|
||||
pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) {
|
||||
if rules_map.is_empty() {
|
||||
self.bucket_rules_map.remove(bucket_name);
|
||||
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
|
||||
} else {
|
||||
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map);
|
||||
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map).await;
|
||||
}
|
||||
info!("Added rules for bucket: {}", bucket_name);
|
||||
}
|
||||
|
||||
/// Gets the rules map for a specific bucket.
|
||||
pub fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
|
||||
self.bucket_rules_map.get(bucket_name).map(|r| r.clone())
|
||||
pub async fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
|
||||
self.bucket_rules_map.get(&bucket_name.to_string()).await
|
||||
}
|
||||
|
||||
/// Removes notification rules for a bucket
|
||||
pub async fn remove_notification(&self, bucket_name: &str) {
|
||||
self.bucket_rules_map.remove(bucket_name);
|
||||
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
|
||||
info!("Removed notification rules for bucket: {}", bucket_name);
|
||||
}
|
||||
|
||||
@@ -113,7 +114,7 @@ impl EventNotifier {
|
||||
/// Return `true` if at least one matching notification rule exists.
|
||||
pub async fn has_subscriber(&self, bucket_name: &str, event_name: &EventName) -> bool {
|
||||
// Rules to check if the bucket exists
|
||||
if let Some(rules_map) = self.bucket_rules_map.get(bucket_name) {
|
||||
if let Some(rules_map) = self.bucket_rules_map.get(&bucket_name.to_string()).await {
|
||||
// A composite event (such as ObjectCreatedAll) is expanded to multiple single events.
|
||||
// We need to check whether any of these single events have the rules configured.
|
||||
rules_map.has_subscriber(event_name)
|
||||
@@ -129,7 +130,7 @@ impl EventNotifier {
|
||||
let bucket_name = &event.s3.bucket.name;
|
||||
let object_key = &event.s3.object.key;
|
||||
let event_name = event.event_name;
|
||||
if let Some(rules) = self.bucket_rules_map.get(bucket_name) {
|
||||
if let Some(rules) = self.bucket_rules_map.get(bucket_name).await {
|
||||
let target_ids = rules.match_rules(event_name, object_key);
|
||||
if target_ids.is_empty() {
|
||||
debug!("No matching targets for event in bucket: {}", bucket_name);
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
use crate::Event;
|
||||
use crate::factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use rustfs_config::notify::NOTIFY_ROUTE_PREFIX;
|
||||
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX};
|
||||
use rustfs_ecstore::config::{Config, KVS};
|
||||
use rustfs_targets::Target;
|
||||
use rustfs_targets::TargetError;
|
||||
use rustfs_targets::target::ChannelTargetType;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Registry for managing target factories
|
||||
|
||||
@@ -17,10 +17,10 @@ use super::xml_config::ParseConfigError as BucketNotificationConfigError;
|
||||
use crate::rules::NotificationConfiguration;
|
||||
use crate::rules::pattern_rules;
|
||||
use crate::rules::target_id_set;
|
||||
use hashbrown::HashMap;
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
|
||||
/// Configuration for bucket notifications.
|
||||
|
||||
@@ -14,9 +14,10 @@
|
||||
|
||||
use super::pattern;
|
||||
use super::target_id_set::TargetIdSet;
|
||||
use hashbrown::HashMap;
|
||||
use rayon::prelude::*;
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// PatternRules - Event rule that maps object name patterns to TargetID collections.
|
||||
/// `event.Rules` (map[string]TargetIDSet) in the Go code
|
||||
@@ -43,13 +44,19 @@ impl PatternRules {
|
||||
|
||||
/// Returns all TargetIDs that match the object name.
|
||||
pub fn match_targets(&self, object_name: &str) -> TargetIdSet {
|
||||
let mut matched_targets = TargetIdSet::new();
|
||||
for (pattern_str, target_set) in &self.rules {
|
||||
if pattern::match_simple(pattern_str, object_name) {
|
||||
matched_targets.extend(target_set.iter().cloned());
|
||||
}
|
||||
}
|
||||
matched_targets
|
||||
self.rules
|
||||
.par_iter()
|
||||
.filter_map(|(pattern_str, target_set)| {
|
||||
if pattern::match_simple(pattern_str, object_name) {
|
||||
Some(target_set.iter().cloned().collect::<TargetIdSet>())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.reduce(TargetIdSet::new, |mut acc, set| {
|
||||
acc.extend(set);
|
||||
acc
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
|
||||
@@ -14,10 +14,10 @@
|
||||
|
||||
use super::pattern_rules::PatternRules;
|
||||
use super::target_id_set::TargetIdSet;
|
||||
use hashbrown::HashMap;
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// RulesMap - Rule mapping organized by event name。
|
||||
/// `event.RulesMap` (map[Name]Rules) in the corresponding Go code
|
||||
|
||||
@@ -12,8 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use hashbrown::HashSet;
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// TargetIDSet - A collection representation of TargetID.
|
||||
pub type TargetIdSet = HashSet<TargetID>;
|
||||
|
||||
@@ -13,10 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use super::pattern;
|
||||
use hashbrown::HashSet;
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::arn::{ARN, ArnError, TargetIDError};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::io::Read;
|
||||
use thiserror::Error;
|
||||
|
||||
|
||||
@@ -228,7 +228,7 @@ mod tests {
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, Validation, decode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct Claims {
|
||||
sub: String,
|
||||
exp: usize,
|
||||
|
||||
@@ -59,7 +59,7 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> std::result::Resu
|
||||
jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
|
||||
}
|
||||
|
||||
pub fn extract_claims<T: DeserializeOwned>(
|
||||
pub fn extract_claims<T: DeserializeOwned + Clone>(
|
||||
token: &str,
|
||||
secret: &str,
|
||||
) -> std::result::Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {
|
||||
|
||||
@@ -157,7 +157,7 @@ impl QueryStateMachine {
|
||||
|
||||
pub fn begin_optimize(&self) {
|
||||
// TODO record time
|
||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTMIZING)));
|
||||
self.translate_to(Box::new(QueryState::RUNNING(RUNNING::OPTIMIZING)));
|
||||
}
|
||||
|
||||
pub fn end_optimize(&self) {
|
||||
@@ -222,7 +222,7 @@ impl AsRef<str> for QueryState {
|
||||
pub enum RUNNING {
|
||||
DISPATCHING,
|
||||
ANALYZING,
|
||||
OPTMIZING,
|
||||
OPTIMIZING,
|
||||
SCHEDULING,
|
||||
}
|
||||
|
||||
@@ -231,7 +231,7 @@ impl AsRef<str> for RUNNING {
|
||||
match self {
|
||||
Self::DISPATCHING => "DISPATCHING",
|
||||
Self::ANALYZING => "ANALYZING",
|
||||
Self::OPTMIZING => "OPTMIZING",
|
||||
Self::OPTIMIZING => "OPTIMIZING",
|
||||
Self::SCHEDULING => "SCHEDULING",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ mod tests {
|
||||
fn test_running_state_as_ref() {
|
||||
assert_eq!(RUNNING::DISPATCHING.as_ref(), "DISPATCHING");
|
||||
assert_eq!(RUNNING::ANALYZING.as_ref(), "ANALYZING");
|
||||
assert_eq!(RUNNING::OPTMIZING.as_ref(), "OPTMIZING");
|
||||
assert_eq!(RUNNING::OPTIMIZING.as_ref(), "OPTIMIZING");
|
||||
assert_eq!(RUNNING::SCHEDULING.as_ref(), "SCHEDULING");
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ documentation = "https://docs.rs/rustfs-target/latest/rustfs_target/"
|
||||
|
||||
[dependencies]
|
||||
rustfs-config = { workspace = true, features = ["notify", "constants", "audit"] }
|
||||
rustfs-utils = { workspace = true, features = ["sys"] }
|
||||
rustfs-utils = { workspace = true, features = ["sys", "notify"] }
|
||||
async-trait = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
rumqttc = { workspace = true }
|
||||
|
||||
70
crates/targets/src/check.rs
Normal file
70
crates/targets/src/check.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Check if MQTT Broker is available
|
||||
/// # Arguments
|
||||
/// * `broker_url` - URL of MQTT Broker, for example `mqtt://localhost:1883`
|
||||
/// * `topic` - Topic for testing connections
|
||||
/// # Returns
|
||||
/// * `Ok(())` - If the connection is successful
|
||||
/// * `Err(String)` - If the connection fails, contains an error message
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust,no_run
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let result = rustfs_targets::check_mqtt_broker_available("mqtt://localhost:1883", "test/topic").await;
|
||||
/// if result.is_ok() {
|
||||
/// println!("MQTT Broker is available");
|
||||
/// } else {
|
||||
/// println!("MQTT Broker is not available: {}", result.err().unwrap());
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
/// # Note
|
||||
/// Need to add `rumqttc` and `url` dependencies in `Cargo.toml`
|
||||
/// ```toml
|
||||
/// [dependencies]
|
||||
/// rumqttc = "0.25.0"
|
||||
/// url = "2.5.7"
|
||||
/// tokio = { version = "1", features = ["full"] }
|
||||
/// ```
|
||||
pub async fn check_mqtt_broker_available(broker_url: &str, topic: &str) -> Result<(), String> {
|
||||
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||
let url = rustfs_utils::parse_url(broker_url).map_err(|e| format!("Broker URL parsing failed:{e}"))?;
|
||||
let url = url.url();
|
||||
|
||||
match url.scheme() {
|
||||
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" | "tls" | "tcps" => {}
|
||||
_ => return Err("unsupported broker url scheme".to_string()),
|
||||
}
|
||||
|
||||
let host = url.host_str().ok_or("Broker is missing host")?;
|
||||
let port = url.port().unwrap_or(1883);
|
||||
let mut mqtt_options = MqttOptions::new("rustfs_check", host, port);
|
||||
mqtt_options.set_keep_alive(std::time::Duration::from_secs(5));
|
||||
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 1);
|
||||
|
||||
// Try to connect and subscribe
|
||||
client
|
||||
.subscribe(topic, QoS::AtLeastOnce)
|
||||
.await
|
||||
.map_err(|e| format!("MQTT subscription failed:{e}"))?;
|
||||
// Wait for eventloop to receive at least one event
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(3), eventloop.poll()).await {
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(e)) => Err(format!("MQTT connection failed:{e}")),
|
||||
Err(_) => Err("MQTT connection timeout".to_string()),
|
||||
}
|
||||
}
|
||||
@@ -13,11 +13,13 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod arn;
|
||||
mod check;
|
||||
pub mod error;
|
||||
mod event_name;
|
||||
pub mod store;
|
||||
pub mod target;
|
||||
|
||||
pub use check::check_mqtt_broker_available;
|
||||
pub use error::{StoreError, TargetError};
|
||||
pub use event_name::EventName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -324,7 +324,7 @@ async fn run_mqtt_event_loop(
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(_) => {
|
||||
debug!(target_id = %target_id, "MQTT poll timed out (EVENT_LOOP_POLL_TIMEOUT) while not connected or status pending.");
|
||||
Err(rumqttc::ConnectionError::NetworkTimeout)
|
||||
Err(ConnectionError::NetworkTimeout)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -376,7 +376,7 @@ async fn run_mqtt_event_loop(
|
||||
connected_status.store(false, Ordering::SeqCst);
|
||||
error!(target_id = %target_id, error = %e, "Error from MQTT event loop poll");
|
||||
|
||||
if matches!(e, rumqttc::ConnectionError::NetworkTimeout) && (!initial_connection_established || !connected_status.load(Ordering::SeqCst)) {
|
||||
if matches!(e, ConnectionError::NetworkTimeout) && (!initial_connection_established || !connected_status.load(Ordering::SeqCst)) {
|
||||
warn!(target_id = %target_id, "Timeout during initial poll or pending state, will retry.");
|
||||
continue;
|
||||
}
|
||||
@@ -395,8 +395,8 @@ async fn run_mqtt_event_loop(
|
||||
error!(target_id = %target_id, error = %e, "Fatal MQTT error, terminating event loop.");
|
||||
break;
|
||||
}
|
||||
// rumqttc's eventloop.poll() may return Err and terminate after some errors,
|
||||
// Or it will handle reconnection internally. The continue here will make select! wait again.
|
||||
// rumqttc's eventloop.poll() may return Err and terminate after some errors,
|
||||
// Or it will handle reconnection internally. To continue here will make select! wait again.
|
||||
// If the error is temporary and rumqttc is handling reconnection, poll() should eventually succeed or return a different error again.
|
||||
// Sleep briefly to avoid busy cycles in case of rapid failure.
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
@@ -29,14 +29,16 @@ base64-simd = { workspace = true, optional = true }
|
||||
blake3 = { workspace = true, optional = true }
|
||||
brotli = { workspace = true, optional = true }
|
||||
bytes = { workspace = true, optional = true }
|
||||
crc32fast = { workspace = true }
|
||||
crc32fast = { workspace = true, optional = true }
|
||||
flate2 = { workspace = true, optional = true }
|
||||
futures = { workspace = true, optional = true }
|
||||
hashbrown = { workspace = true, optional = true }
|
||||
hex-simd = { workspace = true, optional = true }
|
||||
highway = { workspace = true, optional = true }
|
||||
hickory-resolver = { workspace = true, optional = true }
|
||||
hmac = { workspace = true, optional = true }
|
||||
hyper = { workspace = true, optional = true }
|
||||
libc = { workspace = true, optional = true }
|
||||
local-ip-address = { workspace = true, optional = true }
|
||||
lz4 = { workspace = true, optional = true }
|
||||
md-5 = { workspace = true, optional = true }
|
||||
@@ -53,7 +55,7 @@ s3s = { workspace = true, optional = true }
|
||||
serde = { workspace = true, optional = true }
|
||||
sha1 = { workspace = true, optional = true }
|
||||
sha2 = { workspace = true, optional = true }
|
||||
convert_case = "0.8.0"
|
||||
convert_case = { workspace = true, optional = true }
|
||||
siphasher = { workspace = true, optional = true }
|
||||
snap = { workspace = true, optional = true }
|
||||
sysinfo = { workspace = true, optional = true }
|
||||
@@ -83,13 +85,13 @@ tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls charac
|
||||
net = ["ip", "dep:url", "dep:netif", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:hickory-resolver", "dep:moka", "dep:thiserror", "dep:tokio"] # network features with DNS resolver
|
||||
io = ["dep:tokio"]
|
||||
path = []
|
||||
notify = ["dep:hyper", "dep:s3s"] # file system notification features
|
||||
notify = ["dep:hyper", "dep:s3s", "dep:hashbrown", "dep:thiserror", "dep:serde", "dep:libc"] # file system notification features
|
||||
compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"]
|
||||
string = ["dep:regex", "dep:rand"]
|
||||
crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper", "dep:sha1"]
|
||||
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher", "dep:hex-simd", "dep:base64-simd"]
|
||||
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher", "dep:hex-simd", "dep:base64-simd", "dep:crc32fast"]
|
||||
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
|
||||
integration = [] # integration test features
|
||||
sys = ["dep:sysinfo"] # system information features
|
||||
http = []
|
||||
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify","http"] # all features
|
||||
http = ["dep:convert_case"]
|
||||
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify", "http"] # all features
|
||||
|
||||
@@ -12,9 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod net;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use hyper::HeaderMap;
|
||||
use s3s::{S3Request, S3Response};
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub use net::*;
|
||||
|
||||
/// Extract request parameters from S3Request, mainly header information.
|
||||
#[allow(dead_code)]
|
||||
|
||||
533
crates/utils/src/notify/net.rs
Normal file
533
crates/utils/src/notify/net.rs
Normal file
@@ -0,0 +1,533 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::IpAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::LazyLock;
|
||||
use thiserror::Error;
|
||||
use url::Url;
|
||||
|
||||
// Lazy static for the host label regex.
|
||||
static HOST_LABEL_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?$").unwrap());
|
||||
|
||||
/// NetError represents errors that can occur in network operations.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum NetError {
|
||||
#[error("invalid argument")]
|
||||
InvalidArgument,
|
||||
#[error("invalid hostname")]
|
||||
InvalidHost,
|
||||
#[error("missing '[' in host")]
|
||||
MissingBracket,
|
||||
#[error("parse error: {0}")]
|
||||
ParseError(String),
|
||||
#[error("unexpected scheme: {0}")]
|
||||
UnexpectedScheme(String),
|
||||
#[error("scheme appears with empty host")]
|
||||
SchemeWithEmptyHost,
|
||||
}
|
||||
|
||||
// Host represents a network host with IP/name and port.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Host {
|
||||
pub name: String,
|
||||
pub port: Option<u16>, // Using Option<u16> to represent if port is set, similar to IsPortSet.
|
||||
}
|
||||
|
||||
// Implementation of Host methods.
|
||||
impl Host {
|
||||
// is_empty returns true if the host name is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.name.is_empty()
|
||||
}
|
||||
|
||||
// equal checks if two hosts are equal by comparing their string representations.
|
||||
pub fn equal(&self, other: &Host) -> bool {
|
||||
self.to_string() == other.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Host {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self.port {
|
||||
Some(p) => write!(f, "{}:{}", self.name, p),
|
||||
None => write!(f, "{}", self.name),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parse_host parses a string into a Host, with validation similar to Go's ParseHost.
|
||||
pub fn parse_host(s: &str) -> Result<Host, NetError> {
|
||||
if s.is_empty() {
|
||||
return Err(NetError::InvalidArgument);
|
||||
}
|
||||
|
||||
// is_valid_host validates the host string, checking for IP or hostname validity.
|
||||
let is_valid_host = |host: &str| -> bool {
|
||||
if host.is_empty() {
|
||||
return true;
|
||||
}
|
||||
if host.parse::<IpAddr>().is_ok() {
|
||||
return true;
|
||||
}
|
||||
if !(1..=253).contains(&host.len()) {
|
||||
return false;
|
||||
}
|
||||
for (i, label) in host.split('.').enumerate() {
|
||||
if i + 1 == host.split('.').count() && label.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if !(1..=63).contains(&label.len()) || !HOST_LABEL_REGEX.is_match(label) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
};
|
||||
|
||||
// Split host and port, similar to net.SplitHostPort.
|
||||
let (host_str, port_str) = s.rsplit_once(':').map_or((s, ""), |(h, p)| (h, p));
|
||||
let port = if !port_str.is_empty() {
|
||||
Some(port_str.parse().map_err(|_| NetError::ParseError(port_str.to_string()))?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Trim IPv6 brackets if present.
|
||||
let host = trim_ipv6(host_str)?;
|
||||
|
||||
// Handle IPv6 zone identifier.
|
||||
let trimmed_host = host.split('%').next().unwrap_or(&host);
|
||||
|
||||
if !is_valid_host(trimmed_host) {
|
||||
return Err(NetError::InvalidHost);
|
||||
}
|
||||
|
||||
Ok(Host { name: host, port })
|
||||
}
|
||||
|
||||
// trim_ipv6 removes square brackets from IPv6 addresses, similar to Go's trimIPv6.
|
||||
fn trim_ipv6(host: &str) -> Result<String, NetError> {
|
||||
if host.ends_with(']') {
|
||||
if !host.starts_with('[') {
|
||||
return Err(NetError::MissingBracket);
|
||||
}
|
||||
Ok(host[1..host.len() - 1].to_string())
|
||||
} else {
|
||||
Ok(host.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
// URL is a wrapper around url::Url for custom handling.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ParsedURL(pub Url);
|
||||
|
||||
impl ParsedURL {
|
||||
/// is_empty returns true if the URL is empty or "about:blank".
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.as_str() == "" || (self.0.scheme() == "about" && self.0.path() == "blank")
|
||||
}
|
||||
|
||||
/// hostname returns the hostname of the URL.
|
||||
pub fn hostname(&self) -> String {
|
||||
self.0.host_str().unwrap_or("").to_string()
|
||||
}
|
||||
|
||||
/// port returns the port of the URL as a string, defaulting to "80" for http and "443" for https if not set.
|
||||
pub fn port(&self) -> String {
|
||||
match self.0.port() {
|
||||
Some(p) => p.to_string(),
|
||||
None => match self.0.scheme() {
|
||||
"http" => "80".to_string(),
|
||||
"https" => "443".to_string(),
|
||||
_ => "".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// scheme returns the scheme of the URL.
|
||||
pub fn scheme(&self) -> &str {
|
||||
self.0.scheme()
|
||||
}
|
||||
|
||||
/// url returns a reference to the underlying Url.
|
||||
pub fn url(&self) -> &Url {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ParsedURL {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let mut url = self.0.clone();
|
||||
if let Some(host) = url.host_str().map(|h| h.to_string()) {
|
||||
if let Some(port) = url.port() {
|
||||
if (url.scheme() == "http" && port == 80) || (url.scheme() == "https" && port == 443) {
|
||||
url.set_host(Some(&host)).unwrap();
|
||||
url.set_port(None).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut s = url.to_string();
|
||||
|
||||
// If the URL ends with a slash and the path is just "/", remove the trailing slash.
|
||||
if s.ends_with('/') && url.path() == "/" {
|
||||
s.pop();
|
||||
}
|
||||
|
||||
write!(f, "{}", s)
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for ParsedURL {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.serialize_str(&self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for ParsedURL {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s: String = serde::Deserialize::deserialize(deserializer)?;
|
||||
if s.is_empty() {
|
||||
Ok(ParsedURL(Url::parse("about:blank").unwrap()))
|
||||
} else {
|
||||
parse_url(&s).map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parse_url parses a string into a ParsedURL, with host validation and path cleaning.
|
||||
pub fn parse_url(s: &str) -> Result<ParsedURL, NetError> {
|
||||
if let Some(scheme_end) = s.find("://") {
|
||||
if s[scheme_end + 3..].starts_with('/') {
|
||||
let scheme = &s[..scheme_end];
|
||||
if !scheme.is_empty() {
|
||||
return Err(NetError::SchemeWithEmptyHost);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut uu = Url::parse(s).map_err(|e| NetError::ParseError(e.to_string()))?;
|
||||
if uu.host_str().is_none_or(|h| h.is_empty()) {
|
||||
if uu.scheme() != "" {
|
||||
return Err(NetError::SchemeWithEmptyHost);
|
||||
}
|
||||
} else {
|
||||
let port_str = uu.port().map(|p| p.to_string()).unwrap_or_else(|| match uu.scheme() {
|
||||
"http" => "80".to_string(),
|
||||
"https" => "443".to_string(),
|
||||
_ => "".to_string(),
|
||||
});
|
||||
|
||||
if !port_str.is_empty() {
|
||||
let host_port = format!("{}:{}", uu.host_str().unwrap(), port_str);
|
||||
parse_host(&host_port)?; // Validate host.
|
||||
}
|
||||
}
|
||||
|
||||
// Clean path: Use Url's path_segments to normalize.
|
||||
if !uu.path().is_empty() {
|
||||
// Url automatically cleans paths, but we ensure trailing slash if original had it.
|
||||
let mut cleaned_path = String::new();
|
||||
for comp in Path::new(uu.path()).components() {
|
||||
use std::path::Component;
|
||||
match comp {
|
||||
Component::RootDir => cleaned_path.push('/'),
|
||||
Component::Normal(s) => {
|
||||
if !cleaned_path.ends_with('/') {
|
||||
cleaned_path.push('/');
|
||||
}
|
||||
cleaned_path.push_str(&s.to_string_lossy());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if s.ends_with('/') && !cleaned_path.ends_with('/') {
|
||||
cleaned_path.push('/');
|
||||
}
|
||||
if cleaned_path.is_empty() {
|
||||
cleaned_path.push('/');
|
||||
}
|
||||
uu.set_path(&cleaned_path);
|
||||
}
|
||||
|
||||
Ok(ParsedURL(uu))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// parse_http_url parses a string into a ParsedURL, ensuring the scheme is http or https.
|
||||
pub fn parse_http_url(s: &str) -> Result<ParsedURL, NetError> {
|
||||
let u = parse_url(s)?;
|
||||
match u.0.scheme() {
|
||||
"http" | "https" => Ok(u),
|
||||
_ => Err(NetError::UnexpectedScheme(u.0.scheme().to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// is_network_or_host_down checks if an error indicates network or host down, considering timeouts.
|
||||
pub fn is_network_or_host_down(err: &std::io::Error, expect_timeouts: bool) -> bool {
|
||||
if err.kind() == std::io::ErrorKind::TimedOut {
|
||||
return !expect_timeouts;
|
||||
}
|
||||
// Simplified checks based on Go logic; adapt for Rust as needed
|
||||
let err_str = err.to_string().to_lowercase();
|
||||
err_str.contains("connection reset by peer")
|
||||
|| err_str.contains("connection timed out")
|
||||
|| err_str.contains("broken pipe")
|
||||
|| err_str.contains("use of closed network connection")
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// is_conn_reset_err checks if an error indicates a connection reset by peer.
|
||||
pub fn is_conn_reset_err(err: &std::io::Error) -> bool {
|
||||
err.to_string().contains("connection reset by peer") || matches!(err.raw_os_error(), Some(libc::ECONNRESET))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// is_conn_refused_err checks if an error indicates a connection refused.
|
||||
pub fn is_conn_refused_err(err: &std::io::Error) -> bool {
|
||||
err.to_string().contains("connection refused") || matches!(err.raw_os_error(), Some(libc::ECONNREFUSED))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_host_with_empty_string_returns_error() {
|
||||
let result = parse_host("");
|
||||
assert!(matches!(result, Err(NetError::InvalidArgument)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_host_with_valid_ipv4() {
|
||||
let result = parse_host("192.168.1.1:8080");
|
||||
assert!(result.is_ok());
|
||||
let host = result.unwrap();
|
||||
assert_eq!(host.name, "192.168.1.1");
|
||||
assert_eq!(host.port, Some(8080));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_host_with_valid_hostname() {
|
||||
let result = parse_host("example.com:443");
|
||||
assert!(result.is_ok());
|
||||
let host = result.unwrap();
|
||||
assert_eq!(host.name, "example.com");
|
||||
assert_eq!(host.port, Some(443));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_host_with_ipv6_brackets() {
|
||||
let result = parse_host("[::1]:8080");
|
||||
assert!(result.is_ok());
|
||||
let host = result.unwrap();
|
||||
assert_eq!(host.name, "::1");
|
||||
assert_eq!(host.port, Some(8080));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_host_with_invalid_ipv6_missing_bracket() {
|
||||
let result = parse_host("::1]:8080");
|
||||
assert!(matches!(result, Err(NetError::MissingBracket)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_host_with_invalid_hostname() {
|
||||
let result = parse_host("invalid..host:80");
|
||||
assert!(matches!(result, Err(NetError::InvalidHost)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_host_without_port() {
|
||||
let result = parse_host("example.com");
|
||||
assert!(result.is_ok());
|
||||
let host = result.unwrap();
|
||||
assert_eq!(host.name, "example.com");
|
||||
assert_eq!(host.port, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_is_empty_when_name_is_empty() {
|
||||
let host = Host {
|
||||
name: "".to_string(),
|
||||
port: None,
|
||||
};
|
||||
assert!(host.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_is_not_empty_when_name_present() {
|
||||
let host = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: Some(80),
|
||||
};
|
||||
assert!(!host.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_to_string_with_port() {
|
||||
let host = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: Some(80),
|
||||
};
|
||||
assert_eq!(host.to_string(), "example.com:80");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_to_string_without_port() {
|
||||
let host = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: None,
|
||||
};
|
||||
assert_eq!(host.to_string(), "example.com");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_equal_when_same() {
|
||||
let host1 = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: Some(80),
|
||||
};
|
||||
let host2 = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: Some(80),
|
||||
};
|
||||
assert!(host1.equal(&host2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_not_equal_when_different() {
|
||||
let host1 = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: Some(80),
|
||||
};
|
||||
let host2 = Host {
|
||||
name: "example.com".to_string(),
|
||||
port: Some(443),
|
||||
};
|
||||
assert!(!host1.equal(&host2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_url_with_valid_http_url() {
|
||||
let result = parse_url("http://example.com/path");
|
||||
assert!(result.is_ok());
|
||||
let parsed = result.unwrap();
|
||||
assert_eq!(parsed.hostname(), "example.com");
|
||||
assert_eq!(parsed.port(), "80");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_url_with_valid_https_url() {
|
||||
let result = parse_url("https://example.com:443/path");
|
||||
assert!(result.is_ok());
|
||||
let parsed = result.unwrap();
|
||||
assert_eq!(parsed.hostname(), "example.com");
|
||||
assert_eq!(parsed.port(), "443");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_url_with_scheme_but_empty_host() {
|
||||
let result = parse_url("http:///path");
|
||||
assert!(matches!(result, Err(NetError::SchemeWithEmptyHost)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_url_with_invalid_host() {
|
||||
let result = parse_url("http://invalid..host/path");
|
||||
assert!(matches!(result, Err(NetError::InvalidHost)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_url_with_path_cleaning() {
|
||||
let result = parse_url("http://example.com//path/../path/");
|
||||
assert!(result.is_ok());
|
||||
let parsed = result.unwrap();
|
||||
assert_eq!(parsed.0.path(), "/path/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_http_url_with_http_scheme() {
|
||||
let result = parse_http_url("http://example.com");
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_http_url_with_https_scheme() {
|
||||
let result = parse_http_url("https://example.com");
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_http_url_with_invalid_scheme() {
|
||||
let result = parse_http_url("ftp://example.com");
|
||||
assert!(matches!(result, Err(NetError::UnexpectedScheme(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parsed_url_is_empty_when_url_is_empty() {
|
||||
let url = ParsedURL(Url::parse("about:blank").unwrap());
|
||||
assert!(url.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parsed_url_hostname() {
|
||||
let url = ParsedURL(Url::parse("http://example.com:8080").unwrap());
|
||||
assert_eq!(url.hostname(), "example.com");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parsed_url_port() {
|
||||
let url = ParsedURL(Url::parse("http://example.com:8080").unwrap());
|
||||
assert_eq!(url.port(), "8080");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parsed_url_to_string_removes_default_ports() {
|
||||
let url = ParsedURL(Url::parse("http://example.com:80").unwrap());
|
||||
assert_eq!(url.to_string(), "http://example.com");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_network_or_host_down_with_timeout() {
|
||||
let err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
|
||||
assert!(is_network_or_host_down(&err, false));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_network_or_host_down_with_expected_timeout() {
|
||||
let err = std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout");
|
||||
assert!(!is_network_or_host_down(&err, true));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_conn_reset_err_with_reset_message() {
|
||||
let err = std::io::Error::other("connection reset by peer");
|
||||
assert!(is_conn_reset_err(&err));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_conn_refused_err_with_refused_message() {
|
||||
let err = std::io::Error::other("connection refused");
|
||||
assert!(is_conn_refused_err(&err));
|
||||
}
|
||||
}
|
||||
@@ -276,6 +276,27 @@ pub fn trim_etag(etag: &str) -> String {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_trim_etag() {
|
||||
// Test with quoted ETag
|
||||
assert_eq!(trim_etag("\"abc123\""), "abc123");
|
||||
|
||||
// Test with unquoted ETag
|
||||
assert_eq!(trim_etag("abc123"), "abc123");
|
||||
|
||||
// Test with empty string
|
||||
assert_eq!(trim_etag(""), "");
|
||||
|
||||
// Test with only quotes
|
||||
assert_eq!(trim_etag("\"\""), "");
|
||||
|
||||
// Test with MD5 hash
|
||||
assert_eq!(trim_etag("\"2c7ab85a893283e98c931e9511add182\""), "2c7ab85a893283e98c931e9511add182");
|
||||
|
||||
// Test with multipart ETag format
|
||||
assert_eq!(trim_etag("\"098f6bcd4621d373cade4e832627b4f6-2\""), "098f6bcd4621d373cade4e832627b4f6-2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_base_dir_from_prefix() {
|
||||
let a = "da/";
|
||||
|
||||
0
deploy/data/dev/.gitkeep
Normal file
0
deploy/data/dev/.gitkeep
Normal file
0
deploy/data/pro/.gitkeep
Normal file
0
deploy/data/pro/.gitkeep
Normal file
@@ -30,7 +30,7 @@ services:
|
||||
- "9000:9000" # S3 API port
|
||||
- "9001:9001" # Console port
|
||||
environment:
|
||||
- RUSTFS_VOLUMES=/data/rustfs0,/data/rustfs1,/data/rustfs2,/data/rustfs3
|
||||
- RUSTFS_VOLUMES=/data/rustfs{0..3} # Define 4 storage volumes
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
|
||||
- RUSTFS_CONSOLE_ENABLE=true
|
||||
@@ -43,12 +43,9 @@ services:
|
||||
- RUSTFS_TLS_PATH=/opt/tls
|
||||
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4317
|
||||
volumes:
|
||||
- rustfs_data_0:/data/rustfs0
|
||||
- rustfs_data_1:/data/rustfs1
|
||||
- rustfs_data_2:/data/rustfs2
|
||||
- rustfs_data_3:/data/rustfs3
|
||||
- logs_data:/app/logs
|
||||
- .docker/tls/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
|
||||
- deploy/data/pro:/data
|
||||
- deploy/logs:/app/logs
|
||||
- deploy/data/certs/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
|
||||
networks:
|
||||
- rustfs-network
|
||||
restart: unless-stopped
|
||||
@@ -78,7 +75,7 @@ services:
|
||||
- "9010:9000" # S3 API port
|
||||
- "9011:9001" # Console port
|
||||
environment:
|
||||
- RUSTFS_VOLUMES=/data/rustfs0,/data/rustfs1
|
||||
- RUSTFS_VOLUMES=/data/rustfs{1..4}
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
|
||||
- RUSTFS_CONSOLE_ENABLE=true
|
||||
@@ -90,7 +87,7 @@ services:
|
||||
- RUSTFS_LOG_LEVEL=debug
|
||||
volumes:
|
||||
- .:/app # Mount source code to /app for development
|
||||
- rustfs_dev_data:/data
|
||||
- deploy/data/dev:/data
|
||||
networks:
|
||||
- rustfs-network
|
||||
restart: unless-stopped
|
||||
@@ -98,7 +95,7 @@ services:
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"sh", "-c",
|
||||
"sh", "-c",
|
||||
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
|
||||
]
|
||||
interval: 30s
|
||||
@@ -239,5 +236,5 @@ volumes:
|
||||
driver: local
|
||||
redis_data:
|
||||
driver: local
|
||||
logs_data:
|
||||
logs:
|
||||
driver: local
|
||||
|
||||
60
docs/examples/README.md
Normal file
60
docs/examples/README.md
Normal file
@@ -0,0 +1,60 @@
|
||||
# RustFS Deployment Examples
|
||||
|
||||
This directory contains practical deployment examples and configurations for RustFS.
|
||||
|
||||
## Available Examples
|
||||
|
||||
### [MNMD (Multi-Node Multi-Drive)](./mnmd/)
|
||||
|
||||
Complete Docker Compose example for deploying RustFS in a 4-node, 4-drive-per-node configuration.
|
||||
|
||||
**Features:**
|
||||
- Proper disk indexing (1..4) to avoid VolumeNotFound errors
|
||||
- Startup coordination via `wait-and-start.sh` script
|
||||
- Service discovery using Docker service names
|
||||
- Health checks with alternatives for different base images
|
||||
- Comprehensive documentation and verification checklist
|
||||
|
||||
**Use Case:** Production-ready multi-node deployment for high availability and performance.
|
||||
|
||||
**Quick Start:**
|
||||
```bash
|
||||
cd docs/examples/mnmd
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
**See also:**
|
||||
- [MNMD README](./mnmd/README.md) - Detailed usage guide
|
||||
- [MNMD CHECKLIST](./mnmd/CHECKLIST.md) - Step-by-step verification
|
||||
|
||||
## Other Deployment Examples
|
||||
|
||||
For additional deployment examples, see:
|
||||
- [`examples/`](/examples/) - Root-level examples directory with:
|
||||
- `docker-quickstart.sh` - Quick start script for basic deployments
|
||||
- `enhanced-docker-deployment.sh` - Advanced deployment scenarios
|
||||
- `docker-comprehensive.yml` - Docker Compose with multiple profiles
|
||||
- [`.docker/compose/`](/.docker/compose/) - Docker Compose configurations:
|
||||
- `docker-compose.cluster.yaml` - Basic cluster setup
|
||||
- `docker-compose.observability.yaml` - Observability stack integration
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Console & Endpoint Service Separation](../console-separation.md)
|
||||
- [Environment Variables](../ENVIRONMENT_VARIABLES.md)
|
||||
- [Performance Testing](../PERFORMANCE_TESTING.md)
|
||||
|
||||
## Contributing
|
||||
|
||||
When adding new examples:
|
||||
1. Create a dedicated subdirectory under `docs/examples/`
|
||||
2. Include a comprehensive README.md
|
||||
3. Provide working configuration files
|
||||
4. Add verification steps or checklists
|
||||
5. Document common issues and troubleshooting
|
||||
|
||||
## Support
|
||||
|
||||
For issues or questions:
|
||||
- GitHub Issues: https://github.com/rustfs/rustfs/issues
|
||||
- Documentation: https://rustfs.io
|
||||
329
docs/examples/mnmd/CHECKLIST.md
Normal file
329
docs/examples/mnmd/CHECKLIST.md
Normal file
@@ -0,0 +1,329 @@
|
||||
# MNMD Deployment Checklist
|
||||
|
||||
This checklist provides step-by-step verification for deploying RustFS in MNMD (Multi-Node Multi-Drive) mode using
|
||||
Docker.
|
||||
|
||||
## Pre-Deployment Checks
|
||||
|
||||
### 1. System Requirements
|
||||
|
||||
- [ ] Docker Engine 20.10+ installed
|
||||
- [ ] Docker Compose 2.0+ installed
|
||||
- [ ] At least 8GB RAM available
|
||||
- [ ] At least 40GB disk space available (for 4 nodes × 4 volumes)
|
||||
|
||||
Verify with:
|
||||
|
||||
```bash
|
||||
docker --version
|
||||
docker-compose --version
|
||||
free -h
|
||||
df -h
|
||||
```
|
||||
|
||||
### 2. File System Checks
|
||||
|
||||
- [ ] Using XFS, ext4, or another suitable filesystem (not NFS for production)
|
||||
- [ ] File system supports extended attributes
|
||||
|
||||
Verify with:
|
||||
|
||||
```bash
|
||||
df -T | grep -E '(xfs|ext4)'
|
||||
```
|
||||
|
||||
### 3. Permissions and SELinux
|
||||
|
||||
- [ ] Current user is in `docker` group or can run `sudo docker`
|
||||
- [ ] SELinux is properly configured (if enabled)
|
||||
|
||||
Verify with:
|
||||
|
||||
```bash
|
||||
groups | grep docker
|
||||
getenforce # If enabled, should show "Permissive" or "Enforcing" with proper policies
|
||||
```
|
||||
|
||||
### 4. Network Configuration
|
||||
|
||||
- [ ] Ports 9000-9031 are available
|
||||
- [ ] No firewall blocking Docker bridge network
|
||||
|
||||
Verify with:
|
||||
|
||||
```bash
|
||||
# Check if ports are free
|
||||
netstat -tuln | grep -E ':(9000|9001|9010|9011|9020|9021|9030|9031)'
|
||||
# Should return nothing if ports are free
|
||||
```
|
||||
|
||||
### 5. Files Present
|
||||
|
||||
- [ ] `docker-compose.yml` exists in current directory
|
||||
|
||||
Verify with:
|
||||
|
||||
```bash
|
||||
cd docs/examples/mnmd
|
||||
ls -la
|
||||
chmod +x wait-and-start.sh # If needed
|
||||
```
|
||||
|
||||
## Deployment Steps
|
||||
|
||||
### 1. Start the Cluster
|
||||
|
||||
- [ ] Navigate to the example directory
|
||||
- [ ] Pull the latest RustFS image
|
||||
- [ ] Start the cluster
|
||||
|
||||
```bash
|
||||
cd docs/examples/mnmd
|
||||
docker-compose pull
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
### 2. Monitor Startup
|
||||
|
||||
- [ ] Watch container logs during startup
|
||||
- [ ] Verify no VolumeNotFound errors
|
||||
- [ ] Check that peer discovery completes
|
||||
|
||||
```bash
|
||||
# Watch all logs
|
||||
docker-compose logs -f
|
||||
|
||||
# Watch specific node
|
||||
docker-compose logs -f rustfs-node1
|
||||
|
||||
# Look for successful startup messages
|
||||
docker-compose logs | grep -i "ready\|listening\|started"
|
||||
```
|
||||
|
||||
### 3. Verify Container Status
|
||||
|
||||
- [ ] All 4 containers are running
|
||||
- [ ] All 4 containers show as healthy
|
||||
|
||||
```bash
|
||||
docker-compose ps
|
||||
|
||||
# Expected output: 4 containers in "Up" state with "healthy" status
|
||||
```
|
||||
|
||||
### 4. Check Health Endpoints
|
||||
|
||||
- [ ] API health endpoints respond on all nodes
|
||||
- [ ] Console health endpoints respond on all nodes
|
||||
|
||||
```bash
|
||||
# Test API endpoints
|
||||
curl http://localhost:9000/health
|
||||
curl http://localhost:9010/health
|
||||
curl http://localhost:9020/health
|
||||
curl http://localhost:9030/health
|
||||
|
||||
# Test Console endpoints
|
||||
curl http://localhost:9001/health
|
||||
curl http://localhost:9011/health
|
||||
curl http://localhost:9021/health
|
||||
curl http://localhost:9031/health
|
||||
|
||||
# All should return successful health status
|
||||
```
|
||||
|
||||
## Post-Deployment Verification
|
||||
|
||||
### 1. In-Container Checks
|
||||
|
||||
- [ ] Data directories exist
|
||||
- [ ] Directories have correct permissions
|
||||
- [ ] RustFS process is running
|
||||
|
||||
```bash
|
||||
# Check node1
|
||||
docker exec rustfs-node1 ls -la /data/
|
||||
docker exec rustfs-node1 ps aux | grep rustfs
|
||||
|
||||
# Verify all 4 data directories exist
|
||||
docker exec rustfs-node1 ls -d /data/rustfs{1..4}
|
||||
```
|
||||
|
||||
### 2. DNS and Network Validation
|
||||
|
||||
- [ ] Service names resolve correctly
|
||||
- [ ] Inter-node connectivity works
|
||||
|
||||
```bash
|
||||
# DNS resolution test
|
||||
docker exec rustfs-node1 nslookup rustfs-node2
|
||||
docker exec rustfs-node1 nslookup rustfs-node3
|
||||
docker exec rustfs-node1 nslookup rustfs-node4
|
||||
|
||||
# Connectivity test (using nc if available)
|
||||
docker exec rustfs-node1 nc -zv rustfs-node2 9000
|
||||
docker exec rustfs-node1 nc -zv rustfs-node3 9000
|
||||
docker exec rustfs-node1 nc -zv rustfs-node4 9000
|
||||
|
||||
# Or using telnet/curl
|
||||
docker exec rustfs-node1 curl -v http://rustfs-node2:9000/health
|
||||
```
|
||||
|
||||
### 3. Volume Configuration Validation
|
||||
|
||||
- [ ] RUSTFS_VOLUMES environment variable is correct
|
||||
- [ ] All 16 endpoints are configured (4 nodes × 4 drives)
|
||||
|
||||
```bash
|
||||
# Check environment variable
|
||||
docker exec rustfs-node1 env | grep RUSTFS_VOLUMES
|
||||
|
||||
# Expected output:
|
||||
# RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
|
||||
```
|
||||
|
||||
### 4. Cluster Functionality
|
||||
|
||||
- [ ] Can list buckets via API
|
||||
- [ ] Can create a bucket
|
||||
- [ ] Can upload an object
|
||||
- [ ] Can download an object
|
||||
|
||||
```bash
|
||||
# Configure AWS CLI or s3cmd
|
||||
export AWS_ACCESS_KEY_ID=rustfsadmin
|
||||
export AWS_SECRET_ACCESS_KEY=rustfsadmin
|
||||
|
||||
# Using AWS CLI (if installed)
|
||||
aws --endpoint-url http://localhost:9000 s3 mb s3://test-bucket
|
||||
aws --endpoint-url http://localhost:9000 s3 ls
|
||||
echo "test content" > test.txt
|
||||
aws --endpoint-url http://localhost:9000 s3 cp test.txt s3://test-bucket/
|
||||
aws --endpoint-url http://localhost:9000 s3 ls s3://test-bucket/
|
||||
aws --endpoint-url http://localhost:9000 s3 cp s3://test-bucket/test.txt downloaded.txt
|
||||
cat downloaded.txt
|
||||
|
||||
# Or using curl
|
||||
curl -X PUT http://localhost:9000/test-bucket \
|
||||
-H "Host: localhost:9000" \
|
||||
--user rustfsadmin:rustfsadmin
|
||||
```
|
||||
|
||||
### 5. Healthcheck Verification
|
||||
|
||||
- [ ] Docker reports all services as healthy
|
||||
- [ ] Healthcheck scripts work in containers
|
||||
|
||||
```bash
|
||||
# Check Docker health status
|
||||
docker inspect rustfs-node1 --format='{{.State.Health.Status}}'
|
||||
docker inspect rustfs-node2 --format='{{.State.Health.Status}}'
|
||||
docker inspect rustfs-node3 --format='{{.State.Health.Status}}'
|
||||
docker inspect rustfs-node4 --format='{{.State.Health.Status}}'
|
||||
|
||||
# All should return "healthy"
|
||||
|
||||
# Test healthcheck command manually
|
||||
docker exec rustfs-node1 nc -z localhost 9000
|
||||
echo $? # Should be 0
|
||||
```
|
||||
|
||||
## Troubleshooting Checks
|
||||
|
||||
### If VolumeNotFound Error Occurs
|
||||
|
||||
- [ ] Verify volume indexing starts at 1, not 0
|
||||
- [ ] Check that RUSTFS_VOLUMES matches mounted paths
|
||||
- [ ] Ensure all /data/rustfs{1..4} directories exist
|
||||
|
||||
```bash
|
||||
# Check mounted volumes
|
||||
docker inspect rustfs-node1 | jq '.[].Mounts'
|
||||
|
||||
# Verify directories in container
|
||||
docker exec rustfs-node1 ls -la /data/
|
||||
```
|
||||
|
||||
### If Healthcheck Fails
|
||||
|
||||
- [ ] Check if `nc` is available in the image
|
||||
- [ ] Try alternative healthcheck (curl/wget)
|
||||
- [ ] Increase `start_period` in docker-compose.yml
|
||||
|
||||
```bash
|
||||
# Check if nc is available
|
||||
docker exec rustfs-node1 which nc
|
||||
|
||||
# Test healthcheck manually
|
||||
docker exec rustfs-node1 nc -z localhost 9000
|
||||
|
||||
# Check logs for errors
|
||||
docker-compose logs rustfs-node1 | grep -i error
|
||||
```
|
||||
|
||||
### If Startup Takes Too Long
|
||||
|
||||
- [ ] Check peer discovery timeout in logs
|
||||
- [ ] Verify network connectivity between nodes
|
||||
- [ ] Consider increasing timeout in wait-and-start.sh
|
||||
|
||||
```bash
|
||||
# Check startup logs
|
||||
docker-compose logs rustfs-node1 | grep -i "waiting\|peer\|timeout"
|
||||
|
||||
# Check network
|
||||
docker network inspect mnmd_rustfs-mnmd
|
||||
```
|
||||
|
||||
### If Containers Crash or Restart
|
||||
|
||||
- [ ] Review container logs
|
||||
- [ ] Check resource usage (CPU/Memory)
|
||||
- [ ] Verify no port conflicts
|
||||
|
||||
```bash
|
||||
# View last crash logs
|
||||
docker-compose logs --tail=100 rustfs-node1
|
||||
|
||||
# Check resource usage
|
||||
docker stats --no-stream
|
||||
|
||||
# Check restart count
|
||||
docker-compose ps
|
||||
```
|
||||
|
||||
## Cleanup Checklist
|
||||
|
||||
When done testing:
|
||||
|
||||
- [ ] Stop the cluster: `docker-compose down`
|
||||
- [ ] Remove volumes (optional, destroys data): `docker-compose down -v`
|
||||
- [ ] Clean up dangling images: `docker image prune`
|
||||
- [ ] Verify ports are released: `netstat -tuln | grep -E ':(9000|9001|9010|9011|9020|9021|9030|9031)'`
|
||||
|
||||
## Production Deployment Additional Checks
|
||||
|
||||
Before deploying to production:
|
||||
|
||||
- [ ] Change default credentials (RUSTFS_ACCESS_KEY, RUSTFS_SECRET_KEY)
|
||||
- [ ] Configure TLS certificates
|
||||
- [ ] Set up proper logging and monitoring
|
||||
- [ ] Configure backups for volumes
|
||||
- [ ] Review and adjust resource limits
|
||||
- [ ] Set up external load balancer (if needed)
|
||||
- [ ] Document disaster recovery procedures
|
||||
- [ ] Test failover scenarios
|
||||
- [ ] Verify data persistence after container restart
|
||||
|
||||
## Summary
|
||||
|
||||
This checklist ensures:
|
||||
|
||||
- ✓ Correct disk indexing (1..4 instead of 0..3)
|
||||
- ✓ Proper startup coordination via wait-and-start.sh
|
||||
- ✓ Service discovery via Docker service names
|
||||
- ✓ Health checks function correctly
|
||||
- ✓ All 16 endpoints (4 nodes × 4 drives) are operational
|
||||
- ✓ No VolumeNotFound errors occur
|
||||
|
||||
For more details, see [README.md](./README.md) in this directory.
|
||||
268
docs/examples/mnmd/README.md
Normal file
268
docs/examples/mnmd/README.md
Normal file
@@ -0,0 +1,268 @@
|
||||
# RustFS MNMD (Multi-Node Multi-Drive) Docker Example
|
||||
|
||||
This directory contains a complete, ready-to-use MNMD deployment example for RustFS with 4 nodes and 4 drives per node (
|
||||
4x4 configuration).
|
||||
|
||||
## Overview
|
||||
|
||||
This example addresses common deployment issues including:
|
||||
|
||||
- **VolumeNotFound errors** - Fixed by using correct disk indexing (`/data/rustfs{1...4}` instead of
|
||||
`/data/rustfs{0...3}`)
|
||||
- **Startup race conditions** - Solved with a simple `sleep` command in each service.
|
||||
- **Service discovery** - Uses Docker service names (`rustfs-node{1..4}`) instead of hard-coded IPs
|
||||
- **Health checks** - Implements proper health monitoring with `nc` (with alternatives documented)
|
||||
|
||||
## Quick Start
|
||||
|
||||
From this directory (`docs/examples/mnmd`), run:
|
||||
|
||||
```bash
|
||||
# Start the cluster
|
||||
docker-compose up -d
|
||||
|
||||
# Check the status
|
||||
docker-compose ps
|
||||
|
||||
# View logs
|
||||
docker-compose logs -f
|
||||
|
||||
# Test the deployment
|
||||
curl http://localhost:9000/health
|
||||
curl http://localhost:9001/health
|
||||
|
||||
# Run comprehensive tests
|
||||
./test-deployment.sh
|
||||
|
||||
# Stop the cluster
|
||||
docker-compose down
|
||||
|
||||
# Clean up volumes (WARNING: deletes all data)
|
||||
docker-compose down -v
|
||||
```
|
||||
|
||||
## Configuration Details
|
||||
|
||||
### Volume Configuration
|
||||
|
||||
The example uses the following volume configuration:
|
||||
|
||||
```bash
|
||||
RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
|
||||
```
|
||||
|
||||
This expands to 16 endpoints (4 nodes × 4 drives):
|
||||
|
||||
- Node 1: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
|
||||
- Node 2: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
|
||||
- Node 3: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
|
||||
- Node 4: `/data/rustfs1`, `/data/rustfs2`, `/data/rustfs3`, `/data/rustfs4`
|
||||
|
||||
**Important:** Disk indexing starts at 1 to match the mounted paths (`/data/rustfs1..4`).
|
||||
|
||||
### Port Mappings
|
||||
|
||||
| Node | API Port | Console Port |
|
||||
|-------|----------|--------------|
|
||||
| node1 | 9000 | 9001 |
|
||||
| node2 | 9010 | 9011 |
|
||||
| node3 | 9020 | 9021 |
|
||||
| node4 | 9030 | 9031 |
|
||||
|
||||
### Startup Coordination
|
||||
|
||||
To prevent race conditions during startup where nodes might not find each other, a simple `sleep 3` command is added to
|
||||
each service's command. This provides a brief delay, allowing the network and other services to initialize before RustFS
|
||||
starts. For more complex scenarios, a more robust health-check dependency or an external entrypoint script might be
|
||||
required.
|
||||
|
||||
### Health Checks
|
||||
|
||||
Default health check using `nc` (netcat):
|
||||
|
||||
```yaml
|
||||
healthcheck:
|
||||
test: [ "CMD-SHELL", "nc -z localhost 9000 || exit 1" ]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
```
|
||||
|
||||
#### Alternative Health Checks
|
||||
|
||||
If your base image lacks `nc`, use one of these alternatives:
|
||||
|
||||
**Using curl:**
|
||||
|
||||
```yaml
|
||||
healthcheck:
|
||||
test: [ "CMD-SHELL", "curl -f http://localhost:9000/health || exit 1" ]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
```
|
||||
|
||||
**Using wget:**
|
||||
|
||||
```yaml
|
||||
healthcheck:
|
||||
test: [ "CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9000/health || exit 1" ]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
```
|
||||
|
||||
### Brace Expansion Alternatives
|
||||
|
||||
If your Docker Compose runtime doesn't support brace expansion (`{1...4}`), replace with explicit endpoints:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- RUSTFS_VOLUMES=http://rustfs-node1:9000/data/rustfs1,http://rustfs-node1:9000/data/rustfs2,http://rustfs-node1:9000/data/rustfs3,http://rustfs-node1:9000/data/rustfs4,http://rustfs-node2:9000/data/rustfs1,http://rustfs-node2:9000/data/rustfs2,http://rustfs-node2:9000/data/rustfs3,http://rustfs-node2:9000/data/rustfs4,http://rustfs-node3:9000/data/rustfs1,http://rustfs-node3:9000/data/rustfs2,http://rustfs-node3:9000/data/rustfs3,http://rustfs-node3:9000/data/rustfs4,http://rustfs-node4:9000/data/rustfs1,http://rustfs-node4:9000/data/rustfs2,http://rustfs-node4:9000/data/rustfs3,http://rustfs-node4:9000/data/rustfs4
|
||||
```
|
||||
|
||||
## Using RUSTFS_CMD
|
||||
|
||||
The `RUSTFS_CMD` environment variable provides a fallback when no command is specified:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- RUSTFS_CMD=rustfs # Default fallback command
|
||||
```
|
||||
|
||||
This allows the entrypoint to execute the correct command when Docker doesn't provide one.
|
||||
|
||||
## Testing the Deployment
|
||||
|
||||
After starting the cluster, verify it's working:
|
||||
|
||||
### Automated Testing
|
||||
|
||||
Use the provided test script for comprehensive validation:
|
||||
|
||||
```bash
|
||||
./test-deployment.sh
|
||||
```
|
||||
|
||||
This script tests:
|
||||
|
||||
- Container status (4/4 running)
|
||||
- Health checks (4/4 healthy)
|
||||
- API endpoints (4 ports)
|
||||
- Console endpoints (4 ports)
|
||||
- Inter-node connectivity
|
||||
- Data directory existence
|
||||
|
||||
### Manual Testing
|
||||
|
||||
For manual verification:
|
||||
|
||||
```bash
|
||||
# 1. Check all containers are healthy
|
||||
docker-compose ps
|
||||
|
||||
# 2. Test API endpoints
|
||||
for port in 9000 9010 9020 9030; do
|
||||
echo "Testing port $port..."
|
||||
curl -s http://localhost:${port}/health | jq '.'
|
||||
done
|
||||
|
||||
# 3. Test console endpoints
|
||||
for port in 9001 9011 9021 9031; do
|
||||
echo "Testing console port $port..."
|
||||
curl -s http://localhost:${port}/health | jq '.'
|
||||
done
|
||||
|
||||
# 4. Check inter-node connectivity
|
||||
docker exec rustfs-node1 nc -zv rustfs-node2 9000
|
||||
docker exec rustfs-node1 nc -zv rustfs-node3 9000
|
||||
docker exec rustfs-node1 nc -zv rustfs-node4 9000
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### VolumeNotFound Error
|
||||
|
||||
**Symptom:** Error message about `/data/rustfs0` not found.
|
||||
|
||||
**Solution:** This example uses `/data/rustfs{1...4}` indexing to match the mounted Docker volumes. Ensure your
|
||||
`RUSTFS_VOLUMES` configuration starts at index 1, not 0.
|
||||
|
||||
### Health Check Failures
|
||||
|
||||
**Symptom:** Containers show as unhealthy.
|
||||
|
||||
**Solutions:**
|
||||
|
||||
1. Check if `nc` is available: `docker exec rustfs-node1 which nc`
|
||||
2. Use alternative health checks (curl/wget) as documented above
|
||||
3. Increase `start_period` if nodes need more time to initialize
|
||||
|
||||
### Startup Timeouts
|
||||
|
||||
**Symptom:** Services timeout waiting for peers.
|
||||
|
||||
**Solutions:**
|
||||
|
||||
1. Check logs: `docker-compose logs rustfs-node1`
|
||||
2. Verify network connectivity: `docker-compose exec rustfs-node1 ping rustfs-node2`
|
||||
3. Consider increasing the `sleep` duration in the `docker-compose.yml` `command` directive if a longer delay is needed.
|
||||
|
||||
### Permission Issues
|
||||
|
||||
**Symptom:** Cannot create directories or write data.
|
||||
|
||||
**Solution:** Ensure volumes have correct permissions or set `RUSTFS_UID` and `RUSTFS_GID` environment variables.
|
||||
|
||||
## Advanced Configuration
|
||||
|
||||
### Custom Credentials
|
||||
|
||||
Replace default credentials in production:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- RUSTFS_ACCESS_KEY=your_access_key
|
||||
- RUSTFS_SECRET_KEY=your_secret_key
|
||||
```
|
||||
|
||||
### TLS Configuration
|
||||
|
||||
Add TLS certificates:
|
||||
|
||||
```yaml
|
||||
volumes:
|
||||
- ./certs:/opt/tls:ro
|
||||
environment:
|
||||
- RUSTFS_TLS_PATH=/opt/tls
|
||||
```
|
||||
|
||||
### Resource Limits
|
||||
|
||||
Add resource constraints:
|
||||
|
||||
```yaml
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: '2'
|
||||
memory: 4G
|
||||
reservations:
|
||||
cpus: '1'
|
||||
memory: 2G
|
||||
```
|
||||
|
||||
## See Also
|
||||
|
||||
- [CHECKLIST.md](./CHECKLIST.md) - Step-by-step verification guide
|
||||
- [../../console-separation.md](../../console-separation.md) - Console & endpoint service separation guide
|
||||
- [../../../examples/docker-comprehensive.yml](../../../examples/docker-comprehensive.yml) - More deployment examples
|
||||
- [Issue #618](https://github.com/rustfs/rustfs/issues/618) - Original VolumeNotFound issue
|
||||
|
||||
## References
|
||||
|
||||
- RustFS Documentation: https://rustfs.io
|
||||
- Docker Compose Documentation: https://docs.docker.com/compose/
|
||||
182
docs/examples/mnmd/docker-compose.yml
Normal file
182
docs/examples/mnmd/docker-compose.yml
Normal file
@@ -0,0 +1,182 @@
|
||||
# Copyright 2024 RustFS Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# MNMD (Multi-Node Multi-Drive) Docker Compose Example
|
||||
# 4 nodes x 4 drives configuration
|
||||
# This example demonstrates a complete, ready-to-use MNMD deployment
|
||||
# addressing startup coordination and VolumeNotFound issues.
|
||||
|
||||
services:
|
||||
rustfs-node1:
|
||||
image: rustfs/rustfs:latest
|
||||
container_name: rustfs-node1
|
||||
hostname: rustfs-node1
|
||||
environment:
|
||||
# Use service names and correct disk indexing (1..4 to match mounted paths)
|
||||
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_CONSOLE_ENABLE=true
|
||||
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
|
||||
- RUSTFS_ACCESS_KEY=rustfsadmin
|
||||
- RUSTFS_SECRET_KEY=rustfsadmin
|
||||
- RUSTFS_CMD=rustfs
|
||||
ports:
|
||||
- "9000:9000" # API endpoint
|
||||
- "9001:9001" # Console
|
||||
volumes:
|
||||
- node1-data1:/data/rustfs1
|
||||
- node1-data2:/data/rustfs2
|
||||
- node1-data3:/data/rustfs3
|
||||
- node1-data4:/data/rustfs4
|
||||
command: [ "sh", "-c", "sleep 3 && rustfs" ]
|
||||
healthcheck:
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"sh", "-c",
|
||||
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
|
||||
]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
networks:
|
||||
- rustfs-mnmd
|
||||
|
||||
rustfs-node2:
|
||||
image: rustfs/rustfs:latest
|
||||
container_name: rustfs-node2
|
||||
hostname: rustfs-node2
|
||||
environment:
|
||||
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_CONSOLE_ENABLE=true
|
||||
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
|
||||
- RUSTFS_ACCESS_KEY=rustfsadmin
|
||||
- RUSTFS_SECRET_KEY=rustfsadmin
|
||||
- RUSTFS_CMD=rustfs
|
||||
ports:
|
||||
- "9010:9000" # API endpoint
|
||||
- "9011:9001" # Console
|
||||
volumes:
|
||||
- node2-data1:/data/rustfs1
|
||||
- node2-data2:/data/rustfs2
|
||||
- node2-data3:/data/rustfs3
|
||||
- node2-data4:/data/rustfs4
|
||||
command: [ "sh", "-c", "sleep 3 && rustfs" ]
|
||||
healthcheck:
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"sh", "-c",
|
||||
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
|
||||
]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
networks:
|
||||
- rustfs-mnmd
|
||||
|
||||
rustfs-node3:
|
||||
image: rustfs/rustfs:latest
|
||||
container_name: rustfs-node3
|
||||
hostname: rustfs-node3
|
||||
environment:
|
||||
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_CONSOLE_ENABLE=true
|
||||
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
|
||||
- RUSTFS_ACCESS_KEY=rustfsadmin
|
||||
- RUSTFS_SECRET_KEY=rustfsadmin
|
||||
- RUSTFS_CMD=rustfs
|
||||
ports:
|
||||
- "9020:9000" # API endpoint
|
||||
- "9021:9001" # Console
|
||||
volumes:
|
||||
- node3-data1:/data/rustfs1
|
||||
- node3-data2:/data/rustfs2
|
||||
- node3-data3:/data/rustfs3
|
||||
- node3-data4:/data/rustfs4
|
||||
command: [ "sh", "-c", "sleep 3 && rustfs" ]
|
||||
healthcheck:
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"sh", "-c",
|
||||
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
|
||||
]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
networks:
|
||||
- rustfs-mnmd
|
||||
|
||||
rustfs-node4:
|
||||
image: rustfs/rustfs:latest
|
||||
container_name: rustfs-node4
|
||||
hostname: rustfs-node4
|
||||
environment:
|
||||
- RUSTFS_VOLUMES=http://rustfs-node{1...4}:9000/data/rustfs{1...4}
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_CONSOLE_ENABLE=true
|
||||
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
|
||||
- RUSTFS_ACCESS_KEY=rustfsadmin
|
||||
- RUSTFS_SECRET_KEY=rustfsadmin
|
||||
- RUSTFS_CMD=rustfs
|
||||
ports:
|
||||
- "9030:9000" # API endpoint
|
||||
- "9031:9001" # Console
|
||||
volumes:
|
||||
- node4-data1:/data/rustfs1
|
||||
- node4-data2:/data/rustfs2
|
||||
- node4-data3:/data/rustfs3
|
||||
- node4-data4:/data/rustfs4
|
||||
command: [ "sh", "-c", "sleep 3 && rustfs" ]
|
||||
healthcheck:
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"sh", "-c",
|
||||
"curl -f http://localhost:9000/health && curl -f http://localhost:9001/health"
|
||||
]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
networks:
|
||||
- rustfs-mnmd
|
||||
|
||||
networks:
|
||||
rustfs-mnmd:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
node1-data1:
|
||||
node1-data2:
|
||||
node1-data3:
|
||||
node1-data4:
|
||||
node2-data1:
|
||||
node2-data2:
|
||||
node2-data3:
|
||||
node2-data4:
|
||||
node3-data1:
|
||||
node3-data2:
|
||||
node3-data3:
|
||||
node3-data4:
|
||||
node4-data1:
|
||||
node4-data2:
|
||||
node4-data3:
|
||||
node4-data4:
|
||||
172
docs/examples/mnmd/test-deployment.sh
Executable file
172
docs/examples/mnmd/test-deployment.sh
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/bin/bash
|
||||
# Copyright 2024 RustFS Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# test-deployment.sh - Quick test script for MNMD deployment
|
||||
# Usage: ./test-deployment.sh
|
||||
|
||||
set -e
|
||||
|
||||
# Colors for output
|
||||
GREEN='\033[0;32m'
|
||||
RED='\033[0;31m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
echo "========================================="
|
||||
echo "RustFS MNMD Deployment Test"
|
||||
echo "========================================="
|
||||
echo ""
|
||||
|
||||
# Test 1: Check if all containers are running
|
||||
echo "Test 1: Checking container status..."
|
||||
RUNNING=$(docker-compose ps | grep -c "Up" || echo "0")
|
||||
if [ "$RUNNING" -eq 4 ]; then
|
||||
echo -e "${GREEN}✓ All 4 containers are running${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ Only $RUNNING/4 containers are running${NC}"
|
||||
docker-compose ps
|
||||
exit 1
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 2: Check health status
|
||||
echo "Test 2: Checking health status..."
|
||||
HEALTHY=0
|
||||
for node in rustfs-node1 rustfs-node2 rustfs-node3 rustfs-node4; do
|
||||
STATUS=$(docker inspect "$node" --format='{{.State.Health.Status}}' 2>/dev/null || echo "unknown")
|
||||
if [ "$STATUS" = "healthy" ]; then
|
||||
echo -e " ${GREEN}✓ $node is healthy${NC}"
|
||||
HEALTHY=$((HEALTHY + 1))
|
||||
elif [ "$STATUS" = "starting" ]; then
|
||||
echo -e " ${YELLOW}⚠ $node is starting (wait a moment)${NC}"
|
||||
else
|
||||
echo -e " ${RED}✗ $node status: $STATUS${NC}"
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$HEALTHY" -eq 4 ]; then
|
||||
echo -e "${GREEN}✓ All containers are healthy${NC}"
|
||||
elif [ "$HEALTHY" -gt 0 ]; then
|
||||
echo -e "${YELLOW}⚠ $HEALTHY/4 containers are healthy (some may still be starting)${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ No containers are healthy${NC}"
|
||||
exit 1
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 3: Check API endpoints
|
||||
echo "Test 3: Testing API endpoints..."
|
||||
PORTS=(9000 9010 9020 9030)
|
||||
API_SUCCESS=0
|
||||
for port in "${PORTS[@]}"; do
|
||||
if curl -sf http://localhost:${port}/health >/dev/null 2>&1; then
|
||||
echo -e " ${GREEN}✓ API on port $port is responding${NC}"
|
||||
API_SUCCESS=$((API_SUCCESS + 1))
|
||||
else
|
||||
echo -e " ${RED}✗ API on port $port is not responding${NC}"
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$API_SUCCESS" -eq 4 ]; then
|
||||
echo -e "${GREEN}✓ All API endpoints are working${NC}"
|
||||
else
|
||||
echo -e "${YELLOW}⚠ $API_SUCCESS/4 API endpoints are working${NC}"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 4: Check Console endpoints
|
||||
echo "Test 4: Testing Console endpoints..."
|
||||
CONSOLE_PORTS=(9001 9011 9021 9031)
|
||||
CONSOLE_SUCCESS=0
|
||||
for port in "${CONSOLE_PORTS[@]}"; do
|
||||
if curl -sf http://localhost:${port}/health >/dev/null 2>&1; then
|
||||
echo -e " ${GREEN}✓ Console on port $port is responding${NC}"
|
||||
CONSOLE_SUCCESS=$((CONSOLE_SUCCESS + 1))
|
||||
else
|
||||
echo -e " ${RED}✗ Console on port $port is not responding${NC}"
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$CONSOLE_SUCCESS" -eq 4 ]; then
|
||||
echo -e "${GREEN}✓ All Console endpoints are working${NC}"
|
||||
else
|
||||
echo -e "${YELLOW}⚠ $CONSOLE_SUCCESS/4 Console endpoints are working${NC}"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 5: Check inter-node connectivity
|
||||
echo "Test 5: Testing inter-node connectivity..."
|
||||
CONN_SUCCESS=0
|
||||
for node in rustfs-node2 rustfs-node3 rustfs-node4; do
|
||||
if docker exec rustfs-node1 nc -z "$node" 9000 2>/dev/null; then
|
||||
echo -e " ${GREEN}✓ node1 → $node connection OK${NC}"
|
||||
CONN_SUCCESS=$((CONN_SUCCESS + 1))
|
||||
else
|
||||
echo -e " ${RED}✗ node1 → $node connection failed${NC}"
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$CONN_SUCCESS" -eq 3 ]; then
|
||||
echo -e "${GREEN}✓ All inter-node connections are working${NC}"
|
||||
else
|
||||
echo -e "${YELLOW}⚠ $CONN_SUCCESS/3 inter-node connections are working${NC}"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 6: Verify data directories
|
||||
echo "Test 6: Verifying data directories..."
|
||||
DIR_SUCCESS=0
|
||||
for i in {1..4}; do
|
||||
if docker exec rustfs-node1 test -d "/data/rustfs${i}"; then
|
||||
DIR_SUCCESS=$((DIR_SUCCESS + 1))
|
||||
else
|
||||
echo -e " ${RED}✗ /data/rustfs${i} not found in node1${NC}"
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$DIR_SUCCESS" -eq 4 ]; then
|
||||
echo -e "${GREEN}✓ All data directories exist${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ Only $DIR_SUCCESS/4 data directories exist${NC}"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Summary
|
||||
echo "========================================="
|
||||
echo "Test Summary"
|
||||
echo "========================================="
|
||||
echo "Containers running: $RUNNING/4"
|
||||
echo "Healthy containers: $HEALTHY/4"
|
||||
echo "API endpoints: $API_SUCCESS/4"
|
||||
echo "Console endpoints: $CONSOLE_SUCCESS/4"
|
||||
echo "Inter-node connections: $CONN_SUCCESS/3"
|
||||
echo "Data directories: $DIR_SUCCESS/4"
|
||||
echo ""
|
||||
|
||||
TOTAL=$((RUNNING + HEALTHY + API_SUCCESS + CONSOLE_SUCCESS + CONN_SUCCESS + DIR_SUCCESS))
|
||||
MAX_SCORE=23
|
||||
|
||||
if [ "$TOTAL" -eq "$MAX_SCORE" ]; then
|
||||
echo -e "${GREEN}✓ All tests passed! Deployment is working correctly.${NC}"
|
||||
exit 0
|
||||
elif [ "$TOTAL" -ge 20 ]; then
|
||||
echo -e "${YELLOW}⚠ Most tests passed. Some components may still be starting up.${NC}"
|
||||
echo " Try running this script again in a few moments."
|
||||
exit 0
|
||||
else
|
||||
echo -e "${RED}✗ Some tests failed. Check the output above and logs for details.${NC}"
|
||||
echo " Run 'docker-compose logs' for more information."
|
||||
exit 1
|
||||
fi
|
||||
@@ -78,7 +78,6 @@ matchit = { workspace = true }
|
||||
md5.workspace = true
|
||||
mime_guess = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
percent-encoding = { workspace = true }
|
||||
pin-project-lite.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
rustls = { workspace = true }
|
||||
|
||||
@@ -12,21 +12,16 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::admin::router::Operation;
|
||||
use crate::auth::{check_key_valid, get_session_token};
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use matchit::Params;
|
||||
use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
|
||||
use rustfs_config::{ENABLE_KEY, EnableState};
|
||||
use rustfs_notify::rules::{BucketNotificationConfig, PatternRules};
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::check_mqtt_broker_available;
|
||||
use s3s::header::CONTENT_LENGTH;
|
||||
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_urlencoded::from_bytes;
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::net::SocketAddr;
|
||||
@@ -36,12 +31,6 @@ use tokio::time::{Duration, sleep};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BucketQuery {
|
||||
#[serde(rename = "bucketName")]
|
||||
bucket_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct KeyValue {
|
||||
pub key: String,
|
||||
@@ -177,6 +166,7 @@ impl Operation for NotificationTarget {
|
||||
let mut client_cert_val = None;
|
||||
let mut client_key_val = None;
|
||||
let mut qos_val = None;
|
||||
let mut topic_val = String::new();
|
||||
|
||||
for kv in notification_body.key_values.iter() {
|
||||
if !allowed_keys.contains(kv.key.as_str()) {
|
||||
@@ -190,6 +180,16 @@ impl Operation for NotificationTarget {
|
||||
if kv.key == "endpoint" {
|
||||
endpoint_val = Some(kv.value.clone());
|
||||
}
|
||||
|
||||
if target_type == NOTIFY_MQTT_SUB_SYS {
|
||||
if kv.key == rustfs_config::MQTT_BROKER {
|
||||
endpoint_val = Some(kv.value.clone());
|
||||
}
|
||||
if kv.key == rustfs_config::MQTT_TOPIC {
|
||||
topic_val = kv.value.clone();
|
||||
}
|
||||
}
|
||||
|
||||
if kv.key == "queue_dir" {
|
||||
queue_dir_val = Some(kv.value.clone());
|
||||
}
|
||||
@@ -236,12 +236,15 @@ impl Operation for NotificationTarget {
|
||||
}
|
||||
|
||||
if target_type == NOTIFY_MQTT_SUB_SYS {
|
||||
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?;
|
||||
let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
|
||||
match url.scheme() {
|
||||
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {}
|
||||
_ => return Err(s3_error!(InvalidArgument, "unsupported broker url scheme")),
|
||||
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?;
|
||||
if topic_val.is_empty() {
|
||||
return Err(s3_error!(InvalidArgument, "topic is required"));
|
||||
}
|
||||
// Check MQTT Broker availability
|
||||
if let Err(e) = check_mqtt_broker_available(&endpoint, &topic_val).await {
|
||||
return Err(s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e));
|
||||
}
|
||||
|
||||
if let Some(queue_dir) = queue_dir_val {
|
||||
validate_queue_dir(&queue_dir).await?;
|
||||
if let Some(qos) = qos_val {
|
||||
@@ -420,113 +423,3 @@ fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, &
|
||||
let target_name = extract_param(params, "target_name")?;
|
||||
Ok((target_type, target_name))
|
||||
}
|
||||
|
||||
/// Set notification rules for buckets
|
||||
pub struct SetBucketNotification {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for SetBucketNotification {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
// 1. Analyze query parameters
|
||||
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
|
||||
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
|
||||
|
||||
// 2. Permission verification
|
||||
let Some(input_cred) = &req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "credentials not found"));
|
||||
};
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
// 3. Get notification system instance
|
||||
let Some(ns) = rustfs_notify::global::notification_system() else {
|
||||
return Err(s3_error!(InternalError, "notification system not initialized"));
|
||||
};
|
||||
|
||||
// 4. The parsing request body is BucketNotificationConfig
|
||||
let mut input = req.input;
|
||||
let body = input.store_all_unlimited().await.map_err(|e| {
|
||||
warn!("failed to read request body: {:?}", e);
|
||||
s3_error!(InvalidRequest, "failed to read request body")
|
||||
})?;
|
||||
let config: BucketNotificationConfig = serde_json::from_slice(&body)
|
||||
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for bucket notification config: {}", e))?;
|
||||
|
||||
// 5. Load bucket notification configuration
|
||||
info!("Loading notification config for bucket '{}'", &query.bucket_name);
|
||||
ns.load_bucket_notification_config(&query.bucket_name, &config)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("failed to load bucket notification config: {}", e);
|
||||
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to load bucket notification config: {e}"))
|
||||
})?;
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get notification rules for buckets
|
||||
#[derive(Serialize)]
|
||||
struct BucketRulesResponse {
|
||||
rules: HashMap<EventName, PatternRules>,
|
||||
}
|
||||
pub struct GetBucketNotification {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for GetBucketNotification {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
|
||||
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
|
||||
|
||||
let Some(input_cred) = &req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "credentials not found"));
|
||||
};
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let Some(ns) = rustfs_notify::global::notification_system() else {
|
||||
return Err(s3_error!(InternalError, "notification system not initialized"));
|
||||
};
|
||||
|
||||
let rules_map = ns.notifier.get_rules_map(&query.bucket_name);
|
||||
let response = BucketRulesResponse {
|
||||
rules: rules_map.unwrap_or_default().inner().clone(),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response)
|
||||
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize rules: {e}")))?;
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all notification rules for a bucket
|
||||
pub struct RemoveBucketNotification {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for RemoveBucketNotification {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let query: BucketQuery = from_bytes(req.uri.query().unwrap_or("").as_bytes())
|
||||
.map_err(|e| s3_error!(InvalidArgument, "invalid query parameters: {}", e))?;
|
||||
|
||||
let Some(input_cred) = &req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "credentials not found"));
|
||||
};
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let Some(ns) = rustfs_notify::global::notification_system() else {
|
||||
return Err(s3_error!(InternalError, "notification system not initialized"));
|
||||
};
|
||||
|
||||
info!("Removing notification config for bucket '{}'", &query.bucket_name);
|
||||
ns.remove_bucket_notification_config(&query.bucket_name).await;
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,10 +23,7 @@ pub mod utils;
|
||||
use handlers::{
|
||||
GetReplicationMetricsHandler, HealthCheckHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler,
|
||||
bucket_meta,
|
||||
event::{
|
||||
GetBucketNotification, ListNotificationTargets, NotificationTarget, RemoveBucketNotification, RemoveNotificationTarget,
|
||||
SetBucketNotification,
|
||||
},
|
||||
event::{ListNotificationTargets, ListTargetsArns, NotificationTarget, RemoveNotificationTarget},
|
||||
group, kms, kms_dynamic, kms_keys, policies, pools, rebalance,
|
||||
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
|
||||
sts, tier, user,
|
||||
@@ -519,25 +516,7 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target/arns").as_str(),
|
||||
AdminOperation(&ListNotificationTargets {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target-set-bucket").as_str(),
|
||||
AdminOperation(&SetBucketNotification {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target-get-bucket").as_str(),
|
||||
AdminOperation(&GetBucketNotification {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target-remove-bucket").as_str(),
|
||||
AdminOperation(&RemoveBucketNotification {}),
|
||||
AdminOperation(&ListTargetsArns {}),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -92,6 +92,10 @@ where
|
||||
T: Operation,
|
||||
{
|
||||
fn is_match(&self, method: &Method, uri: &Uri, headers: &HeaderMap, _: &mut Extensions) -> bool {
|
||||
if method == Method::GET && uri.path() == "/health" {
|
||||
return true;
|
||||
}
|
||||
|
||||
// AssumeRole
|
||||
if method == Method::POST && uri.path() == "/" {
|
||||
if let Some(val) = headers.get(header::CONTENT_TYPE) {
|
||||
@@ -104,36 +108,13 @@ where
|
||||
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(CONSOLE_PREFIX)
|
||||
}
|
||||
|
||||
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
|
||||
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
|
||||
if let Some(console_router) = &self.console_router {
|
||||
let mut console_router = console_router.clone();
|
||||
let req = convert_request(req);
|
||||
let result = console_router.call(req).await;
|
||||
return match result {
|
||||
Ok(resp) => Ok(convert_response(resp)),
|
||||
Err(e) => Err(s3_error!(InternalError, "{}", e)),
|
||||
};
|
||||
}
|
||||
return Err(s3_error!(InternalError, "console is not enabled"));
|
||||
}
|
||||
|
||||
let uri = format!("{}|{}", &req.method, req.uri.path());
|
||||
|
||||
// warn!("get uri {}", &uri);
|
||||
|
||||
if let Ok(mat) = self.router.at(&uri) {
|
||||
let op: &T = mat.value;
|
||||
let mut resp = op.call(req, mat.params).await?;
|
||||
resp.status = Some(resp.output.0);
|
||||
return Ok(resp.map_output(|x| x.1));
|
||||
}
|
||||
|
||||
return Err(s3_error!(NotImplemented));
|
||||
}
|
||||
|
||||
// check_access before call
|
||||
async fn check_access(&self, req: &mut S3Request<Body>) -> S3Result<()> {
|
||||
// Allow unauthenticated access to health check
|
||||
if req.method == Method::GET && req.uri.path() == "/health" {
|
||||
return Ok(());
|
||||
}
|
||||
// Allow unauthenticated access to console static files if console is enabled
|
||||
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -156,6 +137,31 @@ where
|
||||
None => Err(s3_error!(AccessDenied, "Signature is required")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
|
||||
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
|
||||
if let Some(console_router) = &self.console_router {
|
||||
let mut console_router = console_router.clone();
|
||||
let req = convert_request(req);
|
||||
let result = console_router.call(req).await;
|
||||
return match result {
|
||||
Ok(resp) => Ok(convert_response(resp)),
|
||||
Err(e) => Err(s3_error!(InternalError, "{}", e)),
|
||||
};
|
||||
}
|
||||
return Err(s3_error!(InternalError, "console is not enabled"));
|
||||
}
|
||||
|
||||
let uri = format!("{}|{}", &req.method, req.uri.path());
|
||||
if let Ok(mat) = self.router.at(&uri) {
|
||||
let op: &T = mat.value;
|
||||
let mut resp = op.call(req, mat.params).await?;
|
||||
resp.status = Some(resp.output.0);
|
||||
return Ok(resp.map_output(|x| x.1));
|
||||
}
|
||||
|
||||
Err(s3_error!(NotImplemented))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -18,6 +18,11 @@ use rustfs_config::DEFAULT_DELIMITER;
|
||||
use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
/// Start the audit system.
|
||||
/// This function checks if the audit subsystem is configured in the global server configuration.
|
||||
/// If configured, it initializes and starts the audit system.
|
||||
/// If not configured, it skips the initialization.
|
||||
/// It also handles cases where the audit system is already running or if the global configuration is not loaded.
|
||||
pub(crate) async fn start_audit_system() -> AuditResult<()> {
|
||||
info!(
|
||||
target: "rustfs::main::start_audit_system",
|
||||
@@ -94,6 +99,10 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop the audit system.
|
||||
/// This function checks if the audit system is initialized and running.
|
||||
/// If it is running, it prepares to stop the system, stops it, and records the stop time.
|
||||
/// If the system is already stopped or not initialized, it logs a warning and returns.
|
||||
pub(crate) async fn stop_audit_system() -> AuditResult<()> {
|
||||
if let Some(system) = audit_system() {
|
||||
let state = system.get_state().await;
|
||||
|
||||
@@ -12,136 +12,112 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use super::access::authorize_request;
|
||||
use super::options::del_opts;
|
||||
use super::options::extract_metadata;
|
||||
use super::options::put_opts;
|
||||
use crate::auth::get_condition_values;
|
||||
use crate::error::ApiError;
|
||||
use crate::storage::access::ReqInfo;
|
||||
use crate::storage::options::copy_dst_opts;
|
||||
use crate::storage::options::copy_src_opts;
|
||||
use crate::storage::options::get_complete_multipart_upload_opts;
|
||||
use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts, parse_copy_source_range};
|
||||
use bytes::Bytes;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
|
||||
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
|
||||
use datafusion::arrow::json::writer::JsonArray;
|
||||
use http::StatusCode;
|
||||
use rustfs_ecstore::bucket::metadata_sys::get_replication_config;
|
||||
use rustfs_ecstore::bucket::object_lock::objectlock_sys::BucketObjectLockSys;
|
||||
use rustfs_ecstore::bucket::replication::DeletedObjectReplicationInfo;
|
||||
use rustfs_ecstore::bucket::replication::REPLICATE_INCOMING_DELETE;
|
||||
use rustfs_ecstore::bucket::replication::ReplicationConfigurationExt;
|
||||
use rustfs_ecstore::bucket::replication::check_replicate_delete;
|
||||
use rustfs_ecstore::bucket::replication::get_must_replicate_options;
|
||||
use rustfs_ecstore::bucket::replication::must_replicate;
|
||||
use rustfs_ecstore::bucket::replication::schedule_replication;
|
||||
use rustfs_ecstore::bucket::replication::schedule_replication_delete;
|
||||
use rustfs_ecstore::bucket::versioning::VersioningApi;
|
||||
use rustfs_ecstore::disk::error::DiskError;
|
||||
use rustfs_ecstore::disk::error_reduce::is_all_buckets_not_found;
|
||||
use rustfs_ecstore::error::is_err_bucket_not_found;
|
||||
use rustfs_ecstore::error::is_err_object_not_found;
|
||||
use rustfs_ecstore::error::is_err_version_not_found;
|
||||
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
|
||||
use rustfs_ecstore::store_api::ObjectInfo;
|
||||
use rustfs_filemeta::ReplicationStatusType;
|
||||
use rustfs_filemeta::ReplicationType;
|
||||
use rustfs_filemeta::VersionPurgeStatusType;
|
||||
use rustfs_s3select_api::object_store::bytes_stream;
|
||||
use rustfs_s3select_api::query::Context;
|
||||
use rustfs_s3select_api::query::Query;
|
||||
use rustfs_s3select_query::get_global_db;
|
||||
|
||||
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
|
||||
use crate::storage::{
|
||||
access::{ReqInfo, authorize_request},
|
||||
options::{
|
||||
copy_dst_opts, copy_src_opts, del_opts, extract_metadata, extract_metadata_from_mime_with_object_name,
|
||||
get_complete_multipart_upload_opts, get_opts, parse_copy_source_range, put_opts,
|
||||
},
|
||||
};
|
||||
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
use datafusion::arrow::{
|
||||
csv::WriterBuilder as CsvWriterBuilder, json::WriterBuilder as JsonWriterBuilder, json::writer::JsonArray,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
|
||||
use rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_REPLICATION_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_SSECONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::BUCKET_VERSIONING_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata::OBJECT_LOCK_CONFIG;
|
||||
use rustfs_ecstore::bucket::metadata_sys;
|
||||
use rustfs_ecstore::bucket::policy_sys::PolicySys;
|
||||
use rustfs_ecstore::bucket::tagging::decode_tags;
|
||||
use rustfs_ecstore::bucket::tagging::encode_tags;
|
||||
use rustfs_ecstore::bucket::utils::serialize;
|
||||
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use rustfs_ecstore::client::object_api_utils::format_etag;
|
||||
use rustfs_ecstore::compress::MIN_COMPRESSIBLE_SIZE;
|
||||
use rustfs_ecstore::compress::is_compressible;
|
||||
use rustfs_ecstore::error::StorageError;
|
||||
use rustfs_ecstore::new_object_layer_fn;
|
||||
use rustfs_ecstore::set_disk::{DEFAULT_READ_BUFFER_SIZE, is_valid_storage_class};
|
||||
use rustfs_ecstore::store_api::BucketOptions;
|
||||
use rustfs_ecstore::store_api::CompletePart;
|
||||
use rustfs_ecstore::store_api::DeleteBucketOptions;
|
||||
use rustfs_ecstore::store_api::HTTPRangeSpec;
|
||||
use rustfs_ecstore::store_api::MakeBucketOptions;
|
||||
use rustfs_ecstore::store_api::MultipartUploadResult;
|
||||
use rustfs_ecstore::store_api::ObjectIO;
|
||||
use rustfs_ecstore::store_api::ObjectOptions;
|
||||
use rustfs_ecstore::store_api::ObjectToDelete;
|
||||
use rustfs_ecstore::store_api::PutObjReader;
|
||||
use rustfs_ecstore::store_api::StorageAPI;
|
||||
use rustfs_filemeta::fileinfo::ObjectPartInfo;
|
||||
use rustfs_kms::DataKey;
|
||||
use rustfs_kms::service_manager::get_global_encryption_service;
|
||||
use rustfs_kms::types::{EncryptionMetadata, ObjectEncryptionContext};
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use rustfs_ecstore::{
|
||||
bucket::{
|
||||
lifecycle::{bucket_lifecycle_ops::validate_transition_tier, lifecycle::Lifecycle},
|
||||
metadata::{
|
||||
BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_REPLICATION_CONFIG,
|
||||
BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, BUCKET_VERSIONING_CONFIG, OBJECT_LOCK_CONFIG,
|
||||
},
|
||||
metadata_sys,
|
||||
metadata_sys::get_replication_config,
|
||||
object_lock::objectlock_sys::BucketObjectLockSys,
|
||||
policy_sys::PolicySys,
|
||||
replication::{
|
||||
DeletedObjectReplicationInfo, REPLICATE_INCOMING_DELETE, ReplicationConfigurationExt, check_replicate_delete,
|
||||
get_must_replicate_options, must_replicate, schedule_replication, schedule_replication_delete,
|
||||
},
|
||||
tagging::{decode_tags, encode_tags},
|
||||
utils::serialize,
|
||||
versioning::VersioningApi,
|
||||
versioning_sys::BucketVersioningSys,
|
||||
},
|
||||
client::object_api_utils::format_etag,
|
||||
compress::{MIN_COMPRESSIBLE_SIZE, is_compressible},
|
||||
disk::{error::DiskError, error_reduce::is_all_buckets_not_found},
|
||||
error::{StorageError, is_err_bucket_not_found, is_err_object_not_found, is_err_version_not_found},
|
||||
new_object_layer_fn,
|
||||
set_disk::{DEFAULT_READ_BUFFER_SIZE, MAX_PARTS_COUNT, is_valid_storage_class},
|
||||
store_api::{
|
||||
BucketOptions,
|
||||
CompletePart,
|
||||
DeleteBucketOptions,
|
||||
HTTPRangeSpec,
|
||||
MakeBucketOptions,
|
||||
MultipartUploadResult,
|
||||
ObjectIO,
|
||||
ObjectInfo,
|
||||
ObjectOptions,
|
||||
ObjectToDelete,
|
||||
PutObjReader,
|
||||
StorageAPI,
|
||||
// RESERVED_METADATA_PREFIX,
|
||||
},
|
||||
};
|
||||
use rustfs_filemeta::{ReplicationStatusType, ReplicationType, VersionPurgeStatusType, fileinfo::ObjectPartInfo};
|
||||
use rustfs_kms::{
|
||||
DataKey,
|
||||
service_manager::get_global_encryption_service,
|
||||
types::{EncryptionMetadata, ObjectEncryptionContext},
|
||||
};
|
||||
use rustfs_notify::global::notifier_instance;
|
||||
use rustfs_policy::auth;
|
||||
use rustfs_policy::policy::action::Action;
|
||||
use rustfs_policy::policy::action::S3Action;
|
||||
use rustfs_policy::policy::{BucketPolicy, BucketPolicyArgs, Validator};
|
||||
use rustfs_rio::CompressReader;
|
||||
use rustfs_rio::EtagReader;
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_rio::Reader;
|
||||
use rustfs_rio::WarpReader;
|
||||
use rustfs_rio::{DecryptReader, EncryptReader, HardLimitReader};
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::arn::{TargetID, TargetIDError};
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::http::AMZ_BUCKET_REPLICATION_STATUS;
|
||||
use rustfs_utils::http::headers::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use rustfs_utils::http::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
|
||||
use rustfs_utils::path::is_dir_object;
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use rustfs_policy::{
|
||||
auth,
|
||||
policy::{
|
||||
action::{Action, S3Action},
|
||||
{BucketPolicy, BucketPolicyArgs, Validator},
|
||||
},
|
||||
};
|
||||
use rustfs_rio::{CompressReader, DecryptReader, EncryptReader, EtagReader, HardLimitReader, HashReader, Reader, WarpReader};
|
||||
use rustfs_s3select_api::{
|
||||
object_store::bytes_stream,
|
||||
query::{Context, Query},
|
||||
};
|
||||
use rustfs_s3select_query::get_global_db;
|
||||
use rustfs_targets::{
|
||||
EventName,
|
||||
arn::{TargetID, TargetIDError},
|
||||
};
|
||||
use rustfs_utils::{
|
||||
CompressionAlgorithm,
|
||||
http::{
|
||||
AMZ_BUCKET_REPLICATION_STATUS,
|
||||
headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER},
|
||||
},
|
||||
path::{is_dir_object, path_join_buf},
|
||||
};
|
||||
use rustfs_zip::CompressionFormat;
|
||||
use s3s::S3;
|
||||
use s3s::S3Error;
|
||||
use s3s::S3ErrorCode;
|
||||
use s3s::S3Result;
|
||||
use s3s::dto::*;
|
||||
use s3s::s3_error;
|
||||
use s3s::{S3Request, S3Response};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::LazyLock;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::sync::mpsc;
|
||||
use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, dto::*, s3_error};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
path::Path,
|
||||
str::FromStr,
|
||||
sync::{Arc, LazyLock},
|
||||
};
|
||||
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
|
||||
use tokio::{io::AsyncRead, sync::mpsc};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_tar::Archive;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use tokio_util::io::{ReaderStream, StreamReader};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
macro_rules! try_ {
|
||||
|
||||
@@ -58,7 +58,7 @@ export RUSTFS_EXTERNAL_ADDRESS=":9000"
|
||||
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
|
||||
#export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name
|
||||
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
|
||||
export RUSTFS_OBS_ENVIRONMENT=production # Environment name
|
||||
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
|
||||
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
|
||||
export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
|
||||
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
|
||||
@@ -123,6 +123,10 @@ export RUSTFS_COMPRESSION_ENABLED=true # Whether to enable compression
|
||||
|
||||
#export RUSTFS_REGION="us-east-1"
|
||||
|
||||
export RUSTFS_ENABLE_SCANNER=false
|
||||
|
||||
export RUSTFS_ENABLE_HEAL=false
|
||||
|
||||
# Event message configuration
|
||||
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user