mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Compare commits
4 Commits
1.0.0-alph
...
multipartc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2a83505fa | ||
|
|
51584986e1 | ||
|
|
93090adf7c | ||
|
|
d4817a4bea |
114
Cargo.lock
generated
114
Cargo.lock
generated
@@ -2980,27 +2980,6 @@ dependencies = [
|
||||
"syn 2.0.110",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_more"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
|
||||
dependencies = [
|
||||
"derive_more-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_more-impl"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.110",
|
||||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "diff"
|
||||
version = "0.1.13"
|
||||
@@ -3677,15 +3656,16 @@ checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
|
||||
|
||||
[[package]]
|
||||
name = "google-cloud-auth"
|
||||
version = "1.1.0"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f084abe65c3bdde490bf4a4eebc7e103637fb3ac4b101d09508dd5bf12ce82b1"
|
||||
checksum = "5628e0c17140a50dd4d75d37465bf190d26a6c67909519c2e3cf87a9e45d5cf6"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"bon",
|
||||
"google-cloud-gax",
|
||||
"http 1.3.1",
|
||||
"jsonwebtoken",
|
||||
"reqwest",
|
||||
"rustc_version",
|
||||
"rustls 0.23.35",
|
||||
@@ -3699,9 +3679,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "google-cloud-gax"
|
||||
version = "1.2.0"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a36f8701dac18c2cbedf588d15ee4ce5c018da531c2e74e9ad043cbd32b0fccb"
|
||||
checksum = "3c5aa07295f49565ee1ae52e0799e42bd67284396e042734f078b8737a816047"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
@@ -3719,9 +3699,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "google-cloud-gax-internal"
|
||||
version = "0.7.3"
|
||||
version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "152076418386463ff650deabbef392672626d62e97639bb5194e11f8984b4835"
|
||||
checksum = "6dbd41e77921bbf75ed32acc8a648f087b3227ca88c62ddd9b37b43230c91554"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"google-cloud-auth",
|
||||
@@ -3730,6 +3710,7 @@ dependencies = [
|
||||
"google-cloud-wkt",
|
||||
"http 1.3.1",
|
||||
"http-body-util",
|
||||
"hyper 1.7.0",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"percent-encoding",
|
||||
"prost 0.14.1",
|
||||
@@ -3740,6 +3721,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic-prost",
|
||||
"tracing",
|
||||
@@ -3814,9 +3796,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "google-cloud-storage"
|
||||
version = "1.2.0"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "166471b8f807470024f4533d7229bc98696161b5e1f254d1a485b60c6c42baf3"
|
||||
checksum = "12e6a4d24384f8ffae6d295d55095b25d21c9099855c5a96a0edf6777178b35b"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
@@ -5077,18 +5059,6 @@ dependencies = [
|
||||
"portable-atomic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "metrics-exporter-opentelemetry"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85831c590c74bde49aac410386630e60f216a7d9473726708da160c5303ef803"
|
||||
dependencies = [
|
||||
"derive_more",
|
||||
"metrics",
|
||||
"opentelemetry 0.30.0",
|
||||
"opentelemetry_sdk 0.30.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mimalloc"
|
||||
version = "0.1.48"
|
||||
@@ -5552,20 +5522,6 @@ version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"js-sys",
|
||||
"pin-project-lite",
|
||||
"thiserror 2.0.17",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.31.0"
|
||||
@@ -5586,7 +5542,7 @@ version = "0.31.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2"
|
||||
dependencies = [
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
@@ -5603,7 +5559,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"http 1.3.1",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
]
|
||||
|
||||
@@ -5614,10 +5570,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
|
||||
dependencies = [
|
||||
"http 1.3.1",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto",
|
||||
"opentelemetry_sdk 0.31.0",
|
||||
"opentelemetry_sdk",
|
||||
"prost 0.14.1",
|
||||
"reqwest",
|
||||
"thiserror 2.0.17",
|
||||
@@ -5631,8 +5587,8 @@ version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
|
||||
dependencies = [
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry_sdk 0.31.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"prost 0.14.1",
|
||||
"tonic",
|
||||
"tonic-prost",
|
||||
@@ -5651,21 +5607,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry_sdk 0.31.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"opentelemetry 0.30.0",
|
||||
"thiserror 2.0.17",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5677,7 +5620,7 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry",
|
||||
"percent-encoding",
|
||||
"rand 0.9.2",
|
||||
"thiserror 2.0.17",
|
||||
@@ -6424,9 +6367,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.38.3"
|
||||
version = "0.38.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
|
||||
checksum = "b66c2058c55a409d601666cffe35f04333cf1013010882cec174a7467cd4e21c"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
@@ -7221,7 +7164,7 @@ dependencies = [
|
||||
"parking_lot",
|
||||
"path-absolutize",
|
||||
"pin-project-lite",
|
||||
"quick-xml 0.38.3",
|
||||
"quick-xml 0.38.4",
|
||||
"rand 0.10.0-rc.5",
|
||||
"reed-solomon-simd",
|
||||
"regex",
|
||||
@@ -7402,7 +7345,7 @@ dependencies = [
|
||||
"form_urlencoded",
|
||||
"futures",
|
||||
"hashbrown 0.16.0",
|
||||
"quick-xml 0.38.3",
|
||||
"quick-xml 0.38.4",
|
||||
"rayon",
|
||||
"rumqttc",
|
||||
"rustc-hash",
|
||||
@@ -7427,15 +7370,14 @@ version = "0.0.5"
|
||||
dependencies = [
|
||||
"flexi_logger",
|
||||
"metrics",
|
||||
"metrics-exporter-opentelemetry",
|
||||
"nu-ansi-term",
|
||||
"nvml-wrapper",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry-appender-tracing",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry-stdout",
|
||||
"opentelemetry_sdk 0.31.0",
|
||||
"opentelemetry_sdk",
|
||||
"rustfs-config",
|
||||
"rustfs-utils",
|
||||
"serde",
|
||||
@@ -9421,8 +9363,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"opentelemetry 0.31.0",
|
||||
"opentelemetry_sdk 0.31.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"rustversion",
|
||||
"smallvec",
|
||||
"thiserror 2.0.17",
|
||||
|
||||
@@ -128,7 +128,7 @@ byteorder = "1.5.0"
|
||||
flatbuffers = "25.9.23"
|
||||
form_urlencoded = "1.2.2"
|
||||
prost = "0.14.1"
|
||||
quick-xml = "0.38.3"
|
||||
quick-xml = "0.38.4"
|
||||
rmcp = { version = "0.8.5" }
|
||||
rmp = { version = "0.8.14" }
|
||||
rmp-serde = { version = "1.3.0" }
|
||||
@@ -142,7 +142,7 @@ aes-gcm = { version = "0.11.0-rc.2", features = ["rand_core"] }
|
||||
argon2 = { version = "0.6.0-rc.2", features = ["std"] }
|
||||
blake3 = { version = "1.8.2" }
|
||||
chacha20poly1305 = { version = "0.11.0-rc.2" }
|
||||
crc-fast = "1.3.0"
|
||||
crc-fast = "1.6.0"
|
||||
crc32c = "0.6.8"
|
||||
crc32fast = "1.5.0"
|
||||
crc64fast-nvme = "1.2.0"
|
||||
@@ -188,8 +188,8 @@ faster-hex = "0.10.0"
|
||||
flate2 = "1.1.5"
|
||||
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
|
||||
glob = "0.3.3"
|
||||
google-cloud-storage = "1.2.0"
|
||||
google-cloud-auth = "1.1.0"
|
||||
google-cloud-storage = "1.3.0"
|
||||
google-cloud-auth = "1.1.1"
|
||||
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
|
||||
heed = { version = "0.22.0" }
|
||||
hex-simd = "0.8.0"
|
||||
@@ -204,7 +204,6 @@ matchit = "0.9.0"
|
||||
md-5 = "0.11.0-rc.3"
|
||||
md5 = "0.8.0"
|
||||
metrics = "0.24.2"
|
||||
metrics-exporter-opentelemetry = "0.1.2"
|
||||
mime_guess = "2.0.5"
|
||||
moka = { version = "0.12.11", features = ["future"] }
|
||||
netif = "0.1.6"
|
||||
|
||||
@@ -19,7 +19,7 @@ use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_e
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
use tokio::spawn;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, warn};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
pub type AgreedFn = Box<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
|
||||
pub type PartialFn =
|
||||
@@ -99,7 +99,7 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
|
||||
match disk.walk_dir(wakl_opts, &mut wr).await {
|
||||
Ok(_res) => {}
|
||||
Err(err) => {
|
||||
error!("walk dir err {:?}", &err);
|
||||
info!("walk dir err {:?}", &err);
|
||||
need_fallback = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5272,6 +5272,7 @@ impl StorageAPI for SetDisks {
|
||||
if err == DiskError::DiskNotFound {
|
||||
None
|
||||
} else if err == DiskError::FileNotFound {
|
||||
warn!("list_multipart_uploads: FileNotFound");
|
||||
return Ok(ListMultipartsInfo {
|
||||
key_marker: key_marker.to_owned(),
|
||||
max_uploads,
|
||||
|
||||
@@ -88,6 +88,8 @@ pub struct ECStore {
|
||||
pub pool_meta: RwLock<PoolMeta>,
|
||||
pub rebalance_meta: RwLock<Option<RebalanceMeta>>,
|
||||
pub decommission_cancelers: Vec<Option<usize>>,
|
||||
// mpCache: cache for MultipartUploadResult, key is "bucket/object"
|
||||
pub mp_cache: Arc<RwLock<HashMap<String, MultipartUploadResult>>>,
|
||||
}
|
||||
|
||||
// impl Clone for ECStore {
|
||||
@@ -240,6 +242,7 @@ impl ECStore {
|
||||
pool_meta: RwLock::new(pool_meta),
|
||||
rebalance_meta: RwLock::new(None),
|
||||
decommission_cancelers,
|
||||
mp_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
});
|
||||
|
||||
// Only set it when the global deployment ID is not yet configured
|
||||
@@ -1763,8 +1766,58 @@ impl StorageAPI for ECStore {
|
||||
) -> Result<ListMultipartsInfo> {
|
||||
check_list_multipart_args(bucket, prefix, &key_marker, &upload_id_marker, &delimiter)?;
|
||||
|
||||
// Return from cache if prefix is empty (list all multipart uploads for the bucket)
|
||||
if prefix.is_empty() {
|
||||
// TODO: return from cache
|
||||
let cache = self.mp_cache.read().await;
|
||||
let mut cached_uploads = Vec::new();
|
||||
let bucket_prefix = format!("{}/", bucket);
|
||||
|
||||
for (key, result) in cache.iter() {
|
||||
if key.starts_with(&bucket_prefix) {
|
||||
let object = key.strip_prefix(&bucket_prefix).unwrap_or("");
|
||||
if let Some(key_marker_val) = &key_marker {
|
||||
if object < key_marker_val.as_str() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(upload_id_marker_val) = &upload_id_marker {
|
||||
if object == key_marker.as_deref().unwrap_or("") && result.upload_id < *upload_id_marker_val {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
cached_uploads.push(MultipartInfo {
|
||||
bucket: bucket.to_owned(),
|
||||
object: decode_dir_object(object),
|
||||
upload_id: result.upload_id.clone(),
|
||||
initiated: None,
|
||||
user_defined: HashMap::new(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by object name and upload_id
|
||||
cached_uploads.sort_by(|a, b| match a.object.cmp(&b.object) {
|
||||
Ordering::Equal => a.upload_id.cmp(&b.upload_id),
|
||||
other => other,
|
||||
});
|
||||
|
||||
// Apply max_uploads limit
|
||||
if cached_uploads.len() > max_uploads {
|
||||
cached_uploads.truncate(max_uploads);
|
||||
}
|
||||
|
||||
if !cached_uploads.is_empty() {
|
||||
return Ok(ListMultipartsInfo {
|
||||
key_marker,
|
||||
upload_id_marker,
|
||||
max_uploads,
|
||||
uploads: cached_uploads,
|
||||
prefix: prefix.to_owned(),
|
||||
delimiter: delimiter.to_owned(),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if self.single_pool() {
|
||||
@@ -1807,8 +1860,15 @@ impl StorageAPI for ECStore {
|
||||
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult> {
|
||||
check_new_multipart_args(bucket, object)?;
|
||||
|
||||
let encoded_object = encode_dir_object(object);
|
||||
let cache_key = format!("{}/{}", bucket, encoded_object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0].new_multipart_upload(bucket, object, opts).await;
|
||||
let result = self.pools[0].new_multipart_upload(bucket, &encoded_object, opts).await?;
|
||||
// Cache the result
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.insert(cache_key, result.clone());
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
for (idx, pool) in self.pools.iter().enumerate() {
|
||||
@@ -1816,14 +1876,18 @@ impl StorageAPI for ECStore {
|
||||
continue;
|
||||
}
|
||||
let res = pool
|
||||
.list_multipart_uploads(bucket, object, None, None, None, MAX_UPLOADS_LIST)
|
||||
.list_multipart_uploads(bucket, &encoded_object, None, None, None, MAX_UPLOADS_LIST)
|
||||
.await?;
|
||||
|
||||
if !res.uploads.is_empty() {
|
||||
return self.pools[idx].new_multipart_upload(bucket, object, opts).await;
|
||||
let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?;
|
||||
// Cache the result
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.insert(cache_key, result.clone());
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
let idx = self.get_pool_idx(bucket, object, -1).await?;
|
||||
let idx = self.get_pool_idx(bucket, &encoded_object, -1).await?;
|
||||
if opts.data_movement && idx == opts.src_pool_idx {
|
||||
return Err(StorageError::DataMovementOverwriteErr(
|
||||
bucket.to_owned(),
|
||||
@@ -1832,7 +1896,11 @@ impl StorageAPI for ECStore {
|
||||
));
|
||||
}
|
||||
|
||||
self.pools[idx].new_multipart_upload(bucket, object, opts).await
|
||||
let result = self.pools[idx].new_multipart_upload(bucket, &encoded_object, opts).await?;
|
||||
// Cache the result
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.insert(cache_key, result.clone());
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
@@ -1983,8 +2051,19 @@ impl StorageAPI for ECStore {
|
||||
|
||||
// TODO: defer DeleteUploadID
|
||||
|
||||
let encoded_object = encode_dir_object(object);
|
||||
let cache_key = format!("{}/{}", bucket, encoded_object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0].abort_multipart_upload(bucket, object, upload_id, opts).await;
|
||||
let result = self.pools[0]
|
||||
.abort_multipart_upload(bucket, &encoded_object, upload_id, opts)
|
||||
.await;
|
||||
// Remove from cache on success
|
||||
if result.is_ok() {
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
@@ -1992,8 +2071,13 @@ impl StorageAPI for ECStore {
|
||||
continue;
|
||||
}
|
||||
|
||||
let err = match pool.abort_multipart_upload(bucket, object, upload_id, opts).await {
|
||||
Ok(_) => return Ok(()),
|
||||
let err = match pool.abort_multipart_upload(bucket, &encoded_object, upload_id, opts).await {
|
||||
Ok(_) => {
|
||||
// Remove from cache on success
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) => {
|
||||
//
|
||||
if is_err_invalid_upload_id(&err) { None } else { Some(err) }
|
||||
@@ -2019,11 +2103,20 @@ impl StorageAPI for ECStore {
|
||||
) -> Result<ObjectInfo> {
|
||||
check_complete_multipart_args(bucket, object, upload_id)?;
|
||||
|
||||
let encoded_object = encode_dir_object(object);
|
||||
let cache_key = format!("{}/{}", bucket, encoded_object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0]
|
||||
let result = self.pools[0]
|
||||
.clone()
|
||||
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
|
||||
.complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts, opts)
|
||||
.await;
|
||||
// Remove from cache on success
|
||||
if result.is_ok() {
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
@@ -2033,10 +2126,15 @@ impl StorageAPI for ECStore {
|
||||
|
||||
let pool = pool.clone();
|
||||
let err = match pool
|
||||
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts.clone(), opts)
|
||||
.complete_multipart_upload(bucket, &encoded_object, upload_id, uploaded_parts.clone(), opts)
|
||||
.await
|
||||
{
|
||||
Ok(res) => return Ok(res),
|
||||
Ok(res) => {
|
||||
// Remove from cache on success
|
||||
let mut cache = self.mp_cache.write().await;
|
||||
cache.remove(&cache_key);
|
||||
return Ok(res);
|
||||
}
|
||||
Err(err) => {
|
||||
//
|
||||
if is_err_invalid_upload_id(&err) { None } else { Some(err) }
|
||||
|
||||
@@ -140,7 +140,7 @@ impl NotificationSystem {
|
||||
info!("Initializing target: {}", target.id());
|
||||
// Initialize the target
|
||||
if let Err(e) = target.init().await {
|
||||
error!("Target {} Initialization failed:{}", target.id(), e);
|
||||
warn!("Target {} Initialization failed:{}", target.id(), e);
|
||||
continue;
|
||||
}
|
||||
debug!("Target {} initialized successfully,enabled:{}", target_id, target.is_enabled());
|
||||
@@ -422,7 +422,7 @@ impl NotificationSystem {
|
||||
if !e.to_string().contains("ARN not found") {
|
||||
return Err(NotificationError::BucketNotification(e.to_string()));
|
||||
} else {
|
||||
error!("{}", e);
|
||||
error!("config validate failed, err: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,6 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] }
|
||||
rustfs-utils = { workspace = true, features = ["ip", "path"] }
|
||||
flexi_logger = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
metrics-exporter-opentelemetry = { workspace = true }
|
||||
nu-ansi-term = { workspace = true }
|
||||
nvml-wrapper = { workspace = true, optional = true }
|
||||
opentelemetry = { workspace = true }
|
||||
|
||||
75
crates/obs/src/error.rs
Normal file
75
crates/obs/src/error.rs
Normal file
@@ -0,0 +1,75 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::OtelGuard;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::SetError;
|
||||
|
||||
/// Error type for global guard operations
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum GlobalError {
|
||||
/// Occurs when attempting to set a global recorder (e.g., via [`crate::Recorder::install_global`] or [`metrics::set_global_recorder`])
|
||||
/// but a global recorder is already initialized.
|
||||
///
|
||||
/// [`crate::Recorder::install_global`]: crate::Recorder::install_global
|
||||
/// [`metrics::set_global_recorder`]: https://docs.rs/metrics/latest/metrics/fn.set_global_recorder.html
|
||||
#[error("Failed to set a global recorder: {0}")]
|
||||
SetRecorder(#[from] metrics::SetRecorderError<crate::Recorder>),
|
||||
#[error("Failed to set global guard: {0}")]
|
||||
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
|
||||
#[error("Global guard not initialized")]
|
||||
NotInitialized,
|
||||
#[error("Global system metrics err: {0}")]
|
||||
MetricsError(String),
|
||||
#[error("Failed to get current PID: {0}")]
|
||||
PidError(String),
|
||||
#[error("Process with PID {0} not found")]
|
||||
ProcessNotFound(u32),
|
||||
#[error("Failed to get physical core count")]
|
||||
CoreCountError,
|
||||
#[error("GPU initialization failed: {0}")]
|
||||
GpuInitError(String),
|
||||
#[error("GPU device not found: {0}")]
|
||||
GpuDeviceError(String),
|
||||
#[error("Failed to send log: {0}")]
|
||||
SendFailed(&'static str),
|
||||
#[error("Operation timed out: {0}")]
|
||||
Timeout(&'static str),
|
||||
#[error("Telemetry initialization failed: {0}")]
|
||||
TelemetryError(#[from] TelemetryError),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TelemetryError {
|
||||
#[error("Span exporter build failed: {0}")]
|
||||
BuildSpanExporter(String),
|
||||
#[error("Metric exporter build failed: {0}")]
|
||||
BuildMetricExporter(String),
|
||||
#[error("Log exporter build failed: {0}")]
|
||||
BuildLogExporter(String),
|
||||
#[error("Install metrics recorder failed: {0}")]
|
||||
InstallMetricsRecorder(String),
|
||||
#[error("Tracing subscriber init failed: {0}")]
|
||||
SubscriberInit(String),
|
||||
#[error("I/O error: {0}")]
|
||||
Io(String),
|
||||
#[error("Set permissions failed: {0}")]
|
||||
SetPermissions(String),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for TelemetryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
TelemetryError::Io(e.to_string())
|
||||
}
|
||||
}
|
||||
@@ -12,47 +12,20 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{AppConfig, OtelGuard, SystemObserver, TelemetryError, telemetry::init_telemetry};
|
||||
use crate::{AppConfig, GlobalError, OtelGuard, SystemObserver, telemetry::init_telemetry};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::{OnceCell, SetError};
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::{error, info};
|
||||
|
||||
/// Global guard for OpenTelemetry tracing
|
||||
static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
|
||||
|
||||
/// Flag indicating if observability is enabled
|
||||
pub(crate) static IS_OBSERVABILITY_ENABLED: OnceCell<bool> = OnceCell::const_new();
|
||||
/// Flag indicating if observability metric is enabled
|
||||
pub(crate) static OBSERVABILITY_METRIC_ENABLED: OnceCell<bool> = OnceCell::const_new();
|
||||
|
||||
/// Check whether Observability is enabled
|
||||
pub fn is_observability_enabled() -> bool {
|
||||
IS_OBSERVABILITY_ENABLED.get().copied().unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Error type for global guard operations
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum GlobalError {
|
||||
#[error("Failed to set global guard: {0}")]
|
||||
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
|
||||
#[error("Global guard not initialized")]
|
||||
NotInitialized,
|
||||
#[error("Global system metrics err: {0}")]
|
||||
MetricsError(String),
|
||||
#[error("Failed to get current PID: {0}")]
|
||||
PidError(String),
|
||||
#[error("Process with PID {0} not found")]
|
||||
ProcessNotFound(u32),
|
||||
#[error("Failed to get physical core count")]
|
||||
CoreCountError,
|
||||
#[error("GPU initialization failed: {0}")]
|
||||
GpuInitError(String),
|
||||
#[error("GPU device not found: {0}")]
|
||||
GpuDeviceError(String),
|
||||
#[error("Failed to send log: {0}")]
|
||||
SendFailed(&'static str),
|
||||
#[error("Operation timed out: {0}")]
|
||||
Timeout(&'static str),
|
||||
#[error("Telemetry initialization failed: {0}")]
|
||||
TelemetryError(#[from] TelemetryError),
|
||||
/// Check whether Observability metric is enabled
|
||||
pub fn observability_metric_enabled() -> bool {
|
||||
OBSERVABILITY_METRIC_ENABLED.get().copied().unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Initialize the observability module
|
||||
|
||||
@@ -54,13 +54,17 @@
|
||||
/// # }
|
||||
/// ```
|
||||
mod config;
|
||||
mod error;
|
||||
mod global;
|
||||
mod metrics;
|
||||
mod recorder;
|
||||
mod system;
|
||||
mod telemetry;
|
||||
|
||||
pub use config::*;
|
||||
pub use error::*;
|
||||
pub use global::*;
|
||||
pub use metrics::*;
|
||||
pub use recorder::*;
|
||||
pub use system::SystemObserver;
|
||||
pub use telemetry::{OtelGuard, TelemetryError};
|
||||
pub use telemetry::OtelGuard;
|
||||
|
||||
323
crates/obs/src/recorder.rs
Normal file
323
crates/obs/src/recorder.rs
Normal file
@@ -0,0 +1,323 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::GlobalError;
|
||||
use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, SharedString, Unit};
|
||||
use opentelemetry::{
|
||||
InstrumentationScope, InstrumentationScopeBuilder, KeyValue, global,
|
||||
metrics::{Meter, MeterProvider},
|
||||
};
|
||||
use opentelemetry_sdk::metrics::{MeterProviderBuilder, SdkMeterProvider};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::HashMap,
|
||||
ops::Deref,
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
/// A builder for constructing a [`Recorder`].
|
||||
#[derive(Debug)]
|
||||
pub struct Builder {
|
||||
builder: MeterProviderBuilder,
|
||||
scope: InstrumentationScopeBuilder,
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
/// Runs the closure (`f`) to modify the [`MeterProviderBuilder`] to build a
|
||||
/// [`MeterProvider`](MeterProvider).
|
||||
pub fn with_meter_provider(mut self, f: impl FnOnce(MeterProviderBuilder) -> MeterProviderBuilder) -> Self {
|
||||
self.builder = f(self.builder);
|
||||
self
|
||||
}
|
||||
|
||||
/// Modify the [`InstrumentationScope`] to provide additional metadata from the
|
||||
/// closure (`f`).
|
||||
pub fn with_instrumentation_scope(
|
||||
mut self,
|
||||
f: impl FnOnce(InstrumentationScopeBuilder) -> InstrumentationScopeBuilder,
|
||||
) -> Self {
|
||||
self.scope = f(self.scope);
|
||||
self
|
||||
}
|
||||
|
||||
/// Consumes the builder and builds a new [`Recorder`] and returns
|
||||
/// a [`SdkMeterProvider`].
|
||||
///
|
||||
/// A [`SdkMeterProvider`] is provided so you have the responsibility to
|
||||
/// do whatever you need to do with it.
|
||||
///
|
||||
/// This will not install the recorder as the global recorder for
|
||||
/// the [`metrics`] crate, use [`Builder::install`]. This will not install a meter
|
||||
/// provider to [`global`], use [`Builder::install_global`].
|
||||
pub fn build(self) -> (SdkMeterProvider, Recorder) {
|
||||
let provider = self.builder.build();
|
||||
let meter = provider.meter_with_scope(self.scope.build());
|
||||
|
||||
(
|
||||
provider,
|
||||
Recorder {
|
||||
meter,
|
||||
metrics_metadata: Arc::new(Mutex::new(HashMap::new())),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Builds a [`Recorder`] and sets it as the global recorder for the [`metrics`]
|
||||
/// crate.
|
||||
///
|
||||
/// This method will not call [`global::set_meter_provider`] for OpenTelemetry and
|
||||
/// will be returned as the first element in the return's type tuple.
|
||||
pub fn install(self) -> Result<(SdkMeterProvider, Recorder), GlobalError> {
|
||||
let (provider, recorder) = self.build();
|
||||
metrics::set_global_recorder(recorder.clone())?;
|
||||
|
||||
Ok((provider, recorder))
|
||||
}
|
||||
|
||||
/// Builds the [`Recorder`] to record metrics to OpenTelemetry, set the global
|
||||
/// recorder for the [`metrics`] crate, and calls [`global::set_meter_provider`]
|
||||
/// to set the constructed [`SdkMeterProvider`].
|
||||
pub fn install_global(self) -> Result<Recorder, GlobalError> {
|
||||
let (provider, recorder) = self.install()?;
|
||||
global::set_meter_provider(provider);
|
||||
|
||||
Ok(recorder)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricMetadata {
|
||||
unit: Option<Unit>,
|
||||
description: SharedString,
|
||||
}
|
||||
|
||||
/// A standard recorder that implements [`metrics::Recorder`].
|
||||
///
|
||||
/// This instance implements <code>[`Deref`]\<Target = [`Meter`]\></code>, so
|
||||
/// you can still interact with the SDK's initialized [`Meter`] instance.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Recorder {
|
||||
meter: Meter,
|
||||
metrics_metadata: Arc<Mutex<HashMap<KeyName, MetricMetadata>>>,
|
||||
}
|
||||
|
||||
impl Recorder {
|
||||
/// Creates a new [`Builder`] with a given name for instrumentation.
|
||||
pub fn builder<S: Into<Cow<'static, str>>>(name: S) -> Builder {
|
||||
Builder {
|
||||
builder: MeterProviderBuilder::default(),
|
||||
scope: InstrumentationScope::builder(name.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a [`Recorder`] with an already established [`Meter`].
|
||||
pub fn with_meter(meter: Meter) -> Self {
|
||||
Recorder {
|
||||
meter,
|
||||
metrics_metadata: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Recorder {
|
||||
type Target = Meter;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.meter
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Recorder for Recorder {
|
||||
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
|
||||
metrics_metadata.insert(key, MetricMetadata { unit, description });
|
||||
}
|
||||
|
||||
fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
|
||||
metrics_metadata.insert(key, MetricMetadata { unit, description });
|
||||
}
|
||||
|
||||
fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
|
||||
metrics_metadata.insert(key, MetricMetadata { unit, description });
|
||||
}
|
||||
|
||||
fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
|
||||
let mut builder = self.meter.u64_counter(key.name().to_owned());
|
||||
if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
|
||||
let counter = builder.build();
|
||||
let labels = key
|
||||
.labels()
|
||||
.map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned()))
|
||||
.collect();
|
||||
|
||||
Counter::from_arc(Arc::new(WrappedCounter {
|
||||
counter,
|
||||
labels,
|
||||
value: AtomicU64::new(0),
|
||||
}))
|
||||
}
|
||||
|
||||
fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
|
||||
let mut builder = self.meter.f64_gauge(key.name().to_owned());
|
||||
if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
|
||||
let gauge = builder.build();
|
||||
let labels = key
|
||||
.labels()
|
||||
.map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned()))
|
||||
.collect();
|
||||
|
||||
Gauge::from_arc(Arc::new(WrappedGauge {
|
||||
gauge,
|
||||
labels,
|
||||
value: AtomicU64::new(0),
|
||||
}))
|
||||
}
|
||||
|
||||
fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
|
||||
let mut builder = self.meter.f64_histogram(key.name().to_owned());
|
||||
if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
|
||||
let histogram = builder.build();
|
||||
let labels = key
|
||||
.labels()
|
||||
.map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned()))
|
||||
.collect();
|
||||
|
||||
Histogram::from_arc(Arc::new(WrappedHistogram { histogram, labels }))
|
||||
}
|
||||
}
|
||||
|
||||
struct WrappedCounter {
|
||||
counter: opentelemetry::metrics::Counter<u64>,
|
||||
labels: Vec<KeyValue>,
|
||||
value: AtomicU64,
|
||||
}
|
||||
|
||||
impl CounterFn for WrappedCounter {
|
||||
fn increment(&self, value: u64) {
|
||||
self.value.fetch_add(value, Ordering::Relaxed);
|
||||
self.counter.add(value, &self.labels);
|
||||
}
|
||||
|
||||
fn absolute(&self, value: u64) {
|
||||
let prev = self.value.swap(value, Ordering::Relaxed);
|
||||
let diff = value.saturating_sub(prev);
|
||||
self.counter.add(diff, &self.labels);
|
||||
}
|
||||
}
|
||||
|
||||
struct WrappedGauge {
|
||||
gauge: opentelemetry::metrics::Gauge<f64>,
|
||||
labels: Vec<KeyValue>,
|
||||
value: AtomicU64,
|
||||
}
|
||||
|
||||
impl GaugeFn for WrappedGauge {
|
||||
fn increment(&self, value: f64) {
|
||||
let mut current = self.value.load(Ordering::Relaxed);
|
||||
let mut new = f64::from_bits(current) + value;
|
||||
while let Err(val) = self
|
||||
.value
|
||||
.compare_exchange(current, new.to_bits(), Ordering::AcqRel, Ordering::Relaxed)
|
||||
{
|
||||
current = val;
|
||||
new = f64::from_bits(current) + value;
|
||||
}
|
||||
|
||||
self.gauge.record(new, &self.labels);
|
||||
}
|
||||
|
||||
fn decrement(&self, value: f64) {
|
||||
let mut current = self.value.load(Ordering::Relaxed);
|
||||
let mut new = f64::from_bits(current) - value;
|
||||
while let Err(val) = self
|
||||
.value
|
||||
.compare_exchange(current, new.to_bits(), Ordering::AcqRel, Ordering::Relaxed)
|
||||
{
|
||||
current = val;
|
||||
new = f64::from_bits(current) - value;
|
||||
}
|
||||
|
||||
self.gauge.record(new, &self.labels);
|
||||
}
|
||||
|
||||
fn set(&self, value: f64) {
|
||||
self.value.store(value.to_bits(), Ordering::Relaxed);
|
||||
self.gauge.record(value, &self.labels);
|
||||
}
|
||||
}
|
||||
|
||||
struct WrappedHistogram {
|
||||
histogram: opentelemetry::metrics::Histogram<f64>,
|
||||
labels: Vec<KeyValue>,
|
||||
}
|
||||
|
||||
impl HistogramFn for WrappedHistogram {
|
||||
fn record(&self, value: f64) {
|
||||
self.histogram.record(value, &self.labels);
|
||||
}
|
||||
|
||||
fn record_many(&self, value: f64, count: usize) {
|
||||
for _ in 0..count {
|
||||
self.histogram.record(value, &self.labels);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use opentelemetry_sdk::metrics::Temporality;
|
||||
|
||||
#[test]
|
||||
fn standard_usage() {
|
||||
let exporter = opentelemetry_stdout::MetricExporterBuilder::default()
|
||||
.with_temporality(Temporality::Cumulative)
|
||||
.build();
|
||||
|
||||
let (provider, recorder) = Recorder::builder("my-app")
|
||||
.with_meter_provider(|builder| builder.with_periodic_exporter(exporter))
|
||||
.build();
|
||||
|
||||
global::set_meter_provider(provider.clone());
|
||||
metrics::set_global_recorder(recorder).unwrap();
|
||||
|
||||
let counter = metrics::counter!("my-counter");
|
||||
counter.increment(1);
|
||||
|
||||
provider.force_flush().unwrap();
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{GlobalError, is_observability_enabled};
|
||||
use crate::{GlobalError, observability_metric_enabled};
|
||||
use opentelemetry::{global::meter, metrics::Meter};
|
||||
use sysinfo::Pid;
|
||||
|
||||
@@ -29,7 +29,7 @@ impl SystemObserver {
|
||||
/// This function will create a new `Collector` instance and start collecting metrics.
|
||||
/// It will run indefinitely until the process is terminated.
|
||||
pub async fn init_process_observer() -> Result<(), GlobalError> {
|
||||
if is_observability_enabled() {
|
||||
if observability_metric_enabled() {
|
||||
let meter = meter("system");
|
||||
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
|
||||
return SystemObserver::init_process_observer_for_pid(meter, pid).await;
|
||||
|
||||
@@ -13,45 +13,44 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::config::OtelConfig;
|
||||
use crate::global::IS_OBSERVABILITY_ENABLED;
|
||||
use crate::global::OBSERVABILITY_METRIC_ENABLED;
|
||||
use crate::{Recorder, TelemetryError};
|
||||
use flexi_logger::{DeferredNow, Record, WriteMode, WriteMode::AsyncWith, style};
|
||||
use metrics::counter;
|
||||
use nu_ansi_term::Color;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry::{KeyValue, global};
|
||||
use opentelemetry::{KeyValue, global, trace::TracerProvider};
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use opentelemetry_otlp::{Compression, Protocol, WithExportConfig, WithHttpConfig};
|
||||
use opentelemetry_sdk::{
|
||||
Resource,
|
||||
logs::SdkLoggerProvider,
|
||||
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
|
||||
metrics::{PeriodicReader, SdkMeterProvider},
|
||||
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
|
||||
};
|
||||
use opentelemetry_semantic_conventions::{
|
||||
SCHEMA_URL,
|
||||
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
|
||||
};
|
||||
use rustfs_config::observability::{ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA};
|
||||
use rustfs_config::{
|
||||
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL,
|
||||
SAMPLE_RATIO, SERVICE_VERSION,
|
||||
observability::{
|
||||
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
|
||||
ENV_OBS_LOG_DIRECTORY,
|
||||
ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA,
|
||||
},
|
||||
};
|
||||
use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default};
|
||||
use smallvec::SmallVec;
|
||||
use std::borrow::Cow;
|
||||
use std::io::IsTerminal;
|
||||
use std::time::Duration;
|
||||
use std::{env, fs};
|
||||
use std::{borrow::Cow, env, fs, io::IsTerminal, time::Duration};
|
||||
use tracing::info;
|
||||
use tracing_error::ErrorLayer;
|
||||
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use tracing_subscriber::fmt::time::LocalTime;
|
||||
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use tracing_subscriber::{
|
||||
EnvFilter, Layer,
|
||||
fmt::{format::FmtSpan, time::LocalTime},
|
||||
layer::SubscriberExt,
|
||||
util::SubscriberInitExt,
|
||||
};
|
||||
|
||||
/// A guard object that manages the lifecycle of OpenTelemetry components.
|
||||
///
|
||||
@@ -69,9 +68,7 @@ pub struct OtelGuard {
|
||||
tracer_provider: Option<SdkTracerProvider>,
|
||||
meter_provider: Option<SdkMeterProvider>,
|
||||
logger_provider: Option<SdkLoggerProvider>,
|
||||
// Add a flexi_logger handle to keep the logging alive
|
||||
flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
|
||||
// WorkerGuard for writing tracing files
|
||||
tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
|
||||
}
|
||||
|
||||
@@ -112,35 +109,12 @@ impl Drop for OtelGuard {
|
||||
}
|
||||
|
||||
if let Some(guard) = self.tracing_guard.take() {
|
||||
// The guard will be dropped here, flushing any remaining logs
|
||||
drop(guard);
|
||||
println!("Tracing guard dropped, flushing logs.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TelemetryError {
|
||||
BuildSpanExporter(String),
|
||||
BuildMetricExporter(String),
|
||||
BuildLogExporter(String),
|
||||
InstallMetricsRecorder(String),
|
||||
SubscriberInit(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TelemetryError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TelemetryError::BuildSpanExporter(e) => write!(f, "Span exporter build failed: {e}"),
|
||||
TelemetryError::BuildMetricExporter(e) => write!(f, "Metric exporter build failed: {e}"),
|
||||
TelemetryError::BuildLogExporter(e) => write!(f, "Log exporter build failed: {e}"),
|
||||
TelemetryError::InstallMetricsRecorder(e) => write!(f, "Install metrics recorder failed: {e}"),
|
||||
TelemetryError::SubscriberInit(e) => write!(f, "Tracing subscriber init failed: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl std::error::Error for TelemetryError {}
|
||||
|
||||
/// create OpenTelemetry Resource
|
||||
fn resource(config: &OtelConfig) -> Resource {
|
||||
Resource::builder()
|
||||
@@ -170,16 +144,16 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
|
||||
}
|
||||
|
||||
// Read the AsyncWith parameter from the environment variable
|
||||
fn get_env_async_with() -> Option<WriteMode> {
|
||||
fn get_env_async_with() -> WriteMode {
|
||||
let pool_capa = get_env_usize(ENV_OBS_LOG_POOL_CAPA, DEFAULT_OBS_LOG_POOL_CAPA);
|
||||
let message_capa = get_env_usize(ENV_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_MESSAGE_CAPA);
|
||||
let flush_ms = get_env_u64(ENV_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_FLUSH_MS);
|
||||
|
||||
Some(AsyncWith {
|
||||
AsyncWith {
|
||||
pool_capa,
|
||||
message_capa,
|
||||
flush_interval: Duration::from_millis(flush_ms),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter {
|
||||
@@ -200,12 +174,9 @@ fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilte
|
||||
fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> {
|
||||
let level = record.level();
|
||||
let level_style = style(level);
|
||||
|
||||
// Get the current thread information
|
||||
let binding = std::thread::current();
|
||||
let thread_name = binding.name().unwrap_or("unnamed");
|
||||
let thread_id = format!("{:?}", std::thread::current().id());
|
||||
|
||||
writeln!(
|
||||
w,
|
||||
"[{}] {} [{}] [{}:{}] [{}:{}] {}",
|
||||
@@ -224,12 +195,9 @@ fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record:
|
||||
#[inline(never)]
|
||||
fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> {
|
||||
let level = record.level();
|
||||
|
||||
// Get the current thread information
|
||||
let binding = std::thread::current();
|
||||
let thread_name = binding.name().unwrap_or("unnamed");
|
||||
let thread_id = format!("{:?}", std::thread::current().id());
|
||||
|
||||
writeln!(
|
||||
w,
|
||||
"[{}] {} [{}] [{}:{}] [{}:{}] {}",
|
||||
@@ -247,9 +215,7 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
|
||||
/// stdout + span information (fix: retain WorkerGuard to avoid releasing after initialization)
|
||||
fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
|
||||
let env_filter = build_env_filter(logger_level, None);
|
||||
|
||||
let (nb, guard) = tracing_appender::non_blocking(std::io::stdout());
|
||||
|
||||
let enable_color = std::io::stdout().is_terminal();
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_timer(LocalTime::rfc_3339())
|
||||
@@ -264,17 +230,15 @@ fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production:
|
||||
.with_current_span(true)
|
||||
.with_span_list(true)
|
||||
.with_span_events(if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL });
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(ErrorLayer::default())
|
||||
.with(fmt_layer)
|
||||
.init();
|
||||
|
||||
IS_OBSERVABILITY_ENABLED.set(false).ok();
|
||||
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
|
||||
counter!("rustfs.start.total").increment(1);
|
||||
info!("Init stdout logging (level: {})", logger_level);
|
||||
|
||||
OtelGuard {
|
||||
tracer_provider: None,
|
||||
meter_provider: None,
|
||||
@@ -285,27 +249,49 @@ fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production:
|
||||
}
|
||||
|
||||
/// File rolling log (size switching + number retained)
|
||||
fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
|
||||
use flexi_logger::{
|
||||
Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming,
|
||||
WriteMode::{AsyncWith, BufferAndFlush},
|
||||
};
|
||||
fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> Result<OtelGuard, TelemetryError> {
|
||||
use flexi_logger::{Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming};
|
||||
|
||||
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
|
||||
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
|
||||
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
|
||||
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
|
||||
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
|
||||
|
||||
if let Err(e) = fs::create_dir_all(log_directory) {
|
||||
eprintln!("ERROR: create log dir '{}': {e}", log_directory);
|
||||
return Err(TelemetryError::Io(e.to_string()));
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::fs::Permissions;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
let _ = fs::set_permissions(log_directory, Permissions::from_mode(0o755));
|
||||
let desired: u32 = 0o755;
|
||||
match fs::metadata(log_directory) {
|
||||
Ok(meta) => {
|
||||
let current = meta.permissions().mode() & 0o777;
|
||||
// Only tighten to 0755 if existing permissions are looser than target, avoid loosening
|
||||
if (current & !desired) != 0 {
|
||||
if let Err(e) = fs::set_permissions(log_directory, Permissions::from_mode(desired)) {
|
||||
return Err(TelemetryError::SetPermissions(format!(
|
||||
"dir='{}', want={:#o}, have={:#o}, err={}",
|
||||
log_directory, desired, current, e
|
||||
)));
|
||||
}
|
||||
// Second verification
|
||||
if let Ok(meta2) = fs::metadata(log_directory) {
|
||||
let after = meta2.permissions().mode() & 0o777;
|
||||
if after != desired {
|
||||
return Err(TelemetryError::SetPermissions(format!(
|
||||
"dir='{}', want={:#o}, after={:#o}",
|
||||
log_directory, desired, after
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(TelemetryError::Io(format!("stat '{}' failed: {}", log_directory, e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parsing level
|
||||
@@ -345,25 +331,10 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo
|
||||
};
|
||||
|
||||
// write mode
|
||||
let write_mode = get_env_async_with().unwrap_or(if is_production {
|
||||
AsyncWith {
|
||||
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
|
||||
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
|
||||
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
|
||||
}
|
||||
} else {
|
||||
BufferAndFlush
|
||||
});
|
||||
|
||||
let write_mode = get_env_async_with();
|
||||
// Build
|
||||
let mut builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
|
||||
.unwrap_or_else(|e| {
|
||||
if !is_production {
|
||||
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
|
||||
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
|
||||
}
|
||||
flexi_logger::Logger::with(log_spec.clone())
|
||||
})
|
||||
.unwrap_or(flexi_logger::Logger::with(log_spec.clone()))
|
||||
.format_for_stderr(format_with_color)
|
||||
.format_for_stdout(format_with_color)
|
||||
.format_for_files(format_for_file)
|
||||
@@ -379,7 +350,7 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo
|
||||
.use_utc();
|
||||
|
||||
// Optional copy to stdout (for local observation)
|
||||
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
|
||||
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production {
|
||||
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::All);
|
||||
} else {
|
||||
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::None);
|
||||
@@ -393,20 +364,20 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo
|
||||
}
|
||||
};
|
||||
|
||||
IS_OBSERVABILITY_ENABLED.set(false).ok();
|
||||
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
|
||||
counter!("rustfs.start.total").increment(1);
|
||||
info!(
|
||||
"Init file logging at '{}', roll size {:?}MB, keep {}",
|
||||
log_directory, config.log_rotation_size_mb, keep_files
|
||||
);
|
||||
|
||||
OtelGuard {
|
||||
Ok(OtelGuard {
|
||||
tracer_provider: None,
|
||||
meter_provider: None,
|
||||
logger_provider: None,
|
||||
flexi_logger_handles: handle,
|
||||
tracing_guard: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Observability (HTTP export, supports three sub-endpoints; if not, fallback to unified endpoint)
|
||||
@@ -464,24 +435,26 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
|
||||
.build()
|
||||
.map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?;
|
||||
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
|
||||
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
|
||||
builder = builder.with_reader(
|
||||
PeriodicReader::builder(exporter)
|
||||
.with_interval(Duration::from_secs(meter_interval))
|
||||
.build(),
|
||||
);
|
||||
if use_stdout {
|
||||
builder = builder.with_reader(create_periodic_reader(meter_interval));
|
||||
}
|
||||
|
||||
let provider = builder.build();
|
||||
let (provider, recorder) = Recorder::builder(service_name.clone())
|
||||
.with_meter_provider(|b| {
|
||||
let b = b.with_resource(res.clone()).with_reader(
|
||||
PeriodicReader::builder(exporter)
|
||||
.with_interval(Duration::from_secs(meter_interval))
|
||||
.build(),
|
||||
);
|
||||
if use_stdout {
|
||||
b.with_reader(create_periodic_reader(meter_interval))
|
||||
} else {
|
||||
b
|
||||
}
|
||||
})
|
||||
.build();
|
||||
global::set_meter_provider(provider.clone());
|
||||
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
|
||||
provider
|
||||
};
|
||||
|
||||
// metrics crate -> OTel
|
||||
let _ = metrics_exporter_opentelemetry::Recorder::builder(service_name.clone()).install_global();
|
||||
|
||||
// Logger(HTTP)
|
||||
let logger_provider = {
|
||||
let exporter = opentelemetry_otlp::LogExporter::builder()
|
||||
@@ -536,7 +509,7 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
|
||||
.with(MetricsLayer::new(meter_provider.clone()))
|
||||
.init();
|
||||
|
||||
IS_OBSERVABILITY_ENABLED.set(true).ok();
|
||||
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
|
||||
counter!("rustfs.start.total").increment(1);
|
||||
info!(
|
||||
"Init observability (HTTP): trace='{}', metric='{}', log='{}'",
|
||||
@@ -571,7 +544,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> Result<OtelGuard, Telemetry
|
||||
// Rule 2: The user has explicitly customized the log directory (determined by whether ENV_OBS_LOG_DIRECTORY is set)
|
||||
let user_set_log_dir = env::var(ENV_OBS_LOG_DIRECTORY).is_ok();
|
||||
if user_set_log_dir {
|
||||
return Ok(init_file_logging(config, logger_level, is_production));
|
||||
return init_file_logging(config, logger_level, is_production);
|
||||
}
|
||||
|
||||
// Rule 1: Default stdout (error level)
|
||||
|
||||
@@ -52,8 +52,10 @@ spec:
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
|
||||
command: ["/usr/bin/rustfs"]
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
{{- if .Values.securityContext }}
|
||||
securityContext:
|
||||
runAsUser: 1000
|
||||
{{- toYaml .Values.securityContext | nindent 12 }}
|
||||
{{- end }}
|
||||
ports:
|
||||
- containerPort: {{ .Values.service.ep_port }}
|
||||
name: endpoint
|
||||
|
||||
@@ -59,13 +59,12 @@ podSecurityContext:
|
||||
# fsGroup: 2000
|
||||
|
||||
securityContext:
|
||||
{}
|
||||
# capabilities:
|
||||
# drop:
|
||||
# - ALL
|
||||
# readOnlyRootFilesystem: true
|
||||
# runAsNonRoot: true
|
||||
# runAsUser: 1000
|
||||
capabilities:
|
||||
drop:
|
||||
- ALL
|
||||
readOnlyRootFilesystem: true
|
||||
runAsNonRoot: true
|
||||
runAsUser: 1000
|
||||
|
||||
service:
|
||||
type: NodePort
|
||||
|
||||
@@ -26,7 +26,7 @@ use tracing::{error, info, warn};
|
||||
pub(crate) async fn start_audit_system() -> AuditResult<()> {
|
||||
info!(
|
||||
target: "rustfs::main::start_audit_system",
|
||||
"Step 1: Initializing the audit system..."
|
||||
"Initializing the audit system..."
|
||||
);
|
||||
|
||||
// 1. Get the global configuration loaded by ecstore
|
||||
@@ -39,7 +39,7 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> {
|
||||
config.clone()
|
||||
}
|
||||
None => {
|
||||
error!(
|
||||
warn!(
|
||||
target: "rustfs::main::start_audit_system",
|
||||
"Audit system initialization failed: Global server configuration not loaded."
|
||||
);
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use rustfs_config::DEFAULT_DELIMITER;
|
||||
use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG;
|
||||
use tracing::{error, info, instrument};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
/// Shuts down the event notifier system gracefully
|
||||
pub(crate) async fn shutdown_event_notifier() {
|
||||
@@ -28,7 +28,7 @@ pub(crate) async fn shutdown_event_notifier() {
|
||||
let system = match rustfs_notify::notification_system() {
|
||||
Some(sys) => sys,
|
||||
None => {
|
||||
error!("Event notifier system is not initialized.");
|
||||
info!("Event notifier system is not initialized.");
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -49,7 +49,7 @@ pub(crate) async fn init_event_notifier() {
|
||||
let server_config = match GLOBAL_SERVER_CONFIG.get() {
|
||||
Some(config) => config.clone(), // Clone the config to pass ownership
|
||||
None => {
|
||||
error!("Event notifier initialization failed: Global server config not loaded.");
|
||||
warn!("Event notifier initialization failed: Global server config not loaded.");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3247,6 +3247,7 @@ impl S3 for FS {
|
||||
Ok(S3Response::new(output))
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, req))]
|
||||
async fn list_multipart_uploads(
|
||||
&self,
|
||||
req: S3Request<ListMultipartUploadsInput>,
|
||||
@@ -3275,6 +3276,11 @@ impl S3 for FS {
|
||||
}
|
||||
}
|
||||
|
||||
warn!(
|
||||
"List multipart uploads with bucket={}, prefix={}, delimiter={:?}, key_marker={:?}, upload_id_marker={:?}, max_uploads={}",
|
||||
bucket, prefix, delimiter, key_marker, upload_id_marker, max_uploads
|
||||
);
|
||||
|
||||
let result = store
|
||||
.list_multipart_uploads(&bucket, &prefix, delimiter, key_marker, upload_id_marker, max_uploads)
|
||||
.await
|
||||
|
||||
@@ -28,8 +28,8 @@ fi
|
||||
current_dir=$(pwd)
|
||||
echo "Current directory: $current_dir"
|
||||
|
||||
# mkdir -p ./target/volume/test
|
||||
mkdir -p ./target/volume/test{1..4}
|
||||
mkdir -p ./target/volume/test
|
||||
# mkdir -p ./target/volume/test{1..4}
|
||||
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
@@ -41,8 +41,8 @@ fi
|
||||
|
||||
# export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB"
|
||||
|
||||
export RUSTFS_VOLUMES="./target/volume/test{1...4}"
|
||||
# export RUSTFS_VOLUMES="./target/volume/test"
|
||||
# export RUSTFS_VOLUMES="./target/volume/test{1...4}"
|
||||
export RUSTFS_VOLUMES="./target/volume/test"
|
||||
export RUSTFS_ADDRESS=":9000"
|
||||
export RUSTFS_CONSOLE_ENABLE=true
|
||||
export RUSTFS_CONSOLE_ADDRESS=":9001"
|
||||
@@ -77,7 +77,7 @@ export RUSTFS_OBS_LOG_FLUSH_MS=300
|
||||
#tokio runtime
|
||||
export RUSTFS_RUNTIME_WORKER_THREADS=16
|
||||
export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true
|
||||
export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=false
|
||||
# shellcheck disable=SC2125
|
||||
export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024
|
||||
export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60
|
||||
|
||||
Reference in New Issue
Block a user