diff --git a/Cargo.lock b/Cargo.lock index 18efd2c8..624ea269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -764,6 +764,7 @@ dependencies = [ "flatbuffers", "lazy_static", "lock", + "madmin", "protos", "rmp-serde", "serde", @@ -1677,6 +1678,7 @@ dependencies = [ "common", "psutil", "serde", + "time", ] [[package]] diff --git a/common/workers/src/workers.rs b/common/workers/src/workers.rs index 5c03447f..85be5efb 100644 --- a/common/workers/src/workers.rs +++ b/common/workers/src/workers.rs @@ -81,7 +81,7 @@ mod tests { sleep(Duration::from_secs(1)).await; workers.wait().await; if workers.available().await != workers.limit { - assert!(false); + unreachable!(); } } } diff --git a/crypto/src/jwt/tests.rs b/crypto/src/jwt/tests.rs index abfe0020..627e8802 100644 --- a/crypto/src/jwt/tests.rs +++ b/crypto/src/jwt/tests.rs @@ -10,7 +10,10 @@ fn test() { "bbb": "bbb" }); - let jwt_token = encode(b"aaaa", &claims).unwrap(); - let new_claims = decode(&jwt_token, b"aaaa").unwrap(); - assert_eq!(new_claims.claims, claims); + let jwt_token = encode(b"aaaa", &claims).unwrap_or_default(); + let new_claims = match decode(&jwt_token, b"aaaa") { + Ok(res) => Some(res.claims), + Err(_errr) => None, + }; + assert_eq!(new_claims, Some(claims)); } diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 771f97e8..6700f01b 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -20,4 +20,5 @@ serde_json.workspace = true tonic = { version = "0.12.3", features = ["gzip"] } tokio = { workspace = true } tower.workspace = true -url.workspace = true \ No newline at end of file +url.workspace = true +madmin.workspace =true \ No newline at end of file diff --git a/e2e_test/src/reliant/node_interact_test.rs b/e2e_test/src/reliant/node_interact_test.rs index db456ad2..16b333a3 100644 --- a/e2e_test/src/reliant/node_interact_test.rs +++ b/e2e_test/src/reliant/node_interact_test.rs @@ -1,6 +1,6 @@ #![cfg(test)] -use ecstore::{disk::VolumeInfo, store_api::StorageInfo}; +use ecstore::disk::VolumeInfo; use protos::{ models::{PingBody, PingBodyBuilder}, node_service_time_out_client, @@ -118,7 +118,7 @@ async fn storage_info() -> Result<(), Box> { let info = response.storage_info; let mut buf = Deserializer::new(Cursor::new(info)); - let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); + let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); println!("{:?}", storage_info); Ok(()) } diff --git a/ecstore/src/admin_server_info.rs b/ecstore/src/admin_server_info.rs index bee9d996..6ecadff2 100644 --- a/ecstore/src/admin_server_info.rs +++ b/ecstore/src/admin_server_info.rs @@ -15,12 +15,7 @@ use protos::{ use serde::{Deserialize, Serialize}; use tonic::Request; -use crate::{ - disk::endpoint::Endpoint, - global::GLOBAL_Endpoints, - new_object_layer_fn, - store_api::{StorageAPI, StorageDisk}, -}; +use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI}; pub const ITEM_OFFLINE: &str = "offline"; pub const ITEM_INITIALIZING: &str = "initializing"; @@ -44,7 +39,7 @@ pub struct ServerProperties { pub version: String, pub commit_id: String, pub network: HashMap, - pub disks: Vec, + pub disks: Vec, pub pool_number: i32, pub pool_numbers: Vec, pub mem_stats: MemStats, diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index 738fed60..e26d039b 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -169,6 +169,7 @@ pub async fn new_bitrot_writer( pub type BitrotReader = Box; +#[allow(clippy::too_many_arguments)] pub fn new_bitrot_reader( disk: DiskStore, data: &[u8], @@ -691,16 +692,16 @@ mod test { let ep = Endpoint::try_from(temp_dir.as_str())?; let opt = DiskOption::default(); let disk = new_disk(&ep, &opt).await?; - let _ = disk.make_volume(volume).await?; + disk.make_volume(volume).await?; let mut writer = new_bitrot_writer(disk.clone(), "", volume, file_path, 35, algo.clone(), 10).await?; - let _ = writer.write(b"aaaaaaaaaa").await?; - let _ = writer.write(b"aaaaaaaaaa").await?; - let _ = writer.write(b"aaaaaaaaaa").await?; - let _ = writer.write(b"aaaaa").await?; + writer.write(b"aaaaaaaaaa").await?; + writer.write(b"aaaaaaaaaa").await?; + writer.write(b"aaaaaaaaaa").await?; + writer.write(b"aaaaa").await?; let sum = bitrot_writer_sum(&writer); - let _ = writer.close().await?; + writer.close().await?; let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10); let read_len = 10; diff --git a/ecstore/src/bucket/policy/resource.rs b/ecstore/src/bucket/policy/resource.rs index 409cd9cb..b9c520f5 100644 --- a/ecstore/src/bucket/policy/resource.rs +++ b/ecstore/src/bucket/policy/resource.rs @@ -169,6 +169,7 @@ impl<'de> Deserialize<'de> for Resource { { struct Visitor; + #[allow(clippy::needless_lifetimes)] impl<'de> serde::de::Visitor<'de> for Visitor { type Value = Resource; diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs index 6a47e489..f25f557c 100644 --- a/ecstore/src/config/common.rs +++ b/ecstore/src/config/common.rs @@ -105,9 +105,9 @@ pub async fn read_config_without_migrate(api: Arc) -> Result res, Err(err) => { if is_not_found(&err) { - warn!("config not found init start"); + warn!("config not found, start to init"); let cfg = new_and_save_server_config(api).await?; - warn!("config not found init done"); + warn!("config init done"); return Ok(cfg); } else { error!("read config err {:?}", &err); diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index 0902e5d0..36e49f46 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -139,13 +139,12 @@ impl DiskError { } pub fn count_errs(&self, errs: &[Option]) -> usize { - return errs - .iter() + errs.iter() .filter(|&err| match err { None => false, Some(e) => self.is(e), }) - .count(); + .count() } pub fn quorum_unformatted_disks(errs: &[Option]) -> bool { diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 6117df74..4ad6af74 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1291,7 +1291,7 @@ impl DiskAPI for LocalDisk { Ok(res) => res, Err(e) => { if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) { - error!("list_dir err {:?}", &e); + info!("list_dir err {:?}", &e); } if opts.report_notfound && is_err_file_not_found(&e) { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index ab57ce45..dd1fc6c0 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -994,6 +994,7 @@ impl BufferWriter { pub fn new(inner: Vec) -> Self { Self { inner } } + #[allow(clippy::should_implement_trait)] pub fn as_ref(&self) -> &[u8] { self.inner.as_ref() } @@ -1038,6 +1039,10 @@ impl Writer for LocalFileWriter { } } +type NodeClient = NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, +>; + #[derive(Debug)] pub struct RemoteFileWriter { pub root: PathBuf, @@ -1049,15 +1054,7 @@ pub struct RemoteFileWriter { } impl RemoteFileWriter { - pub async fn new( - root: PathBuf, - volume: String, - path: String, - is_append: bool, - mut client: NodeServiceClient< - InterceptedService) -> Result, Status> + Send + Sync + 'static>>, - >, - ) -> Result { + pub async fn new(root: PathBuf, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result { let (tx, rx) = mpsc::channel(128); let in_stream = ReceiverStream::new(rx); @@ -1247,14 +1244,7 @@ pub struct RemoteFileReader { } impl RemoteFileReader { - pub async fn new( - root: PathBuf, - volume: String, - path: String, - mut client: NodeServiceClient< - InterceptedService) -> Result, Status> + Send + Sync + 'static>>, - >, - ) -> Result { + pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeClient) -> Result { let (tx, rx) = mpsc::channel(128); let in_stream = ReceiverStream::new(rx); diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 73dd58d7..334c74d6 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -1,9 +1,11 @@ +use tracing::warn; + use crate::{ disk::endpoint::{Endpoint, EndpointType}, disks_layout::DisksLayout, error::{Error, Result}, global::global_rustfs_port, - utils::net, + utils::net::{self, XHost}, }; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -530,20 +532,30 @@ impl EndpointServerPools { nodes } - pub fn hosts_sorted(&self) -> Vec<()> { + pub fn hosts_sorted(&self) -> Vec> { let (mut peers, local) = self.peers(); + let mut ret = vec![None; peers.len()]; + peers.sort(); - for peer in peers.iter() { + for (i, peer) in peers.iter().enumerate() { if &local == peer { continue; } - // FIXME:TODO + let host = match XHost::try_from(peer.clone()) { + Ok(res) => res, + Err(err) => { + warn!("Xhost parse failed {:?}", err); + continue; + } + }; + + ret[i] = Some(host); } - unimplemented!() + ret } pub fn peers(&self) -> (Vec, String) { let mut local = None; @@ -554,12 +566,8 @@ impl EndpointServerPools { continue; } let host = endpoint.host_port(); - if endpoint.is_local { - if endpoint.url.port() == Some(global_rustfs_port()) { - if local.is_none() { - local = Some(host.clone()); - } - } + if endpoint.is_local && endpoint.url.port() == Some(global_rustfs_port()) && local.is_none() { + local = Some(host.clone()); } set.insert(host); @@ -570,6 +578,28 @@ impl EndpointServerPools { (hosts, local.unwrap_or_default()) } + + pub fn find_grid_hosts_from_peer(&self, host: &XHost) -> Option { + for ep in self.0.iter() { + for endpoint in ep.endpoints.0.iter() { + if endpoint.is_local { + continue; + } + let xhost = match XHost::try_from(endpoint.host_port()) { + Ok(res) => res, + Err(_) => { + continue; + } + }; + + if xhost.to_string() == host.to_string() { + return Some(endpoint.grid_host()); + } + } + } + + None + } } #[cfg(test)] diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 9691a763..bd559e49 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -387,7 +387,7 @@ impl Erasure { // 算出每个分片大小 pub fn shard_size(&self, data_size: usize) -> usize { - (data_size + self.data_shards - 1) / self.data_shards + data_size.div_ceil(self.data_shards) } // returns final erasure size from original size. pub fn shard_file_size(&self, total_size: usize) -> usize { @@ -397,7 +397,7 @@ impl Erasure { let num_shards = total_size / self.block_size; let last_block_size = total_size % self.block_size; - let last_shard_size = (last_block_size + self.data_shards - 1) / self.data_shards; + let last_shard_size = last_block_size.div_ceil(self.data_shards); num_shards * self.shard_size(self.block_size) + last_shard_size // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index 3a774289..3afae055 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -250,7 +250,7 @@ impl FileMeta { // TODO: use old buf let meta_buf = ver.marshal_msg()?; - let pre_mod_time = self.versions[idx].header.mod_time.clone(); + let pre_mod_time = self.versions[idx].header.mod_time; self.versions[idx].header = ver.header(); self.versions[idx].meta = meta_buf; @@ -824,7 +824,7 @@ impl TryFrom for FileMetaVersion { } } -#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Ord, Hash)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)] pub struct FileMetaVersionHeader { pub version_id: Option, pub mod_time: Option, @@ -974,26 +974,33 @@ impl FileMetaVersionHeader { impl PartialOrd for FileMetaVersionHeader { fn partial_cmp(&self, other: &Self) -> Option { - match self.mod_time.partial_cmp(&other.mod_time) { - Some(core::cmp::Ordering::Equal) => {} + Some(self.cmp(other)) + } +} + +impl Ord for FileMetaVersionHeader { + fn cmp(&self, other: &Self) -> Ordering { + match self.mod_time.cmp(&other.mod_time) { + core::cmp::Ordering::Equal => {} ord => return ord, } - match self.version_type.partial_cmp(&other.version_type) { - Some(core::cmp::Ordering::Equal) => {} + match self.version_type.cmp(&other.version_type) { + core::cmp::Ordering::Equal => {} ord => return ord, } - match self.signature.partial_cmp(&other.signature) { - Some(core::cmp::Ordering::Equal) => {} + match self.signature.cmp(&other.signature) { + core::cmp::Ordering::Equal => {} ord => return ord, } - match self.version_id.partial_cmp(&other.version_id) { - Some(core::cmp::Ordering::Equal) => {} + match self.version_id.cmp(&other.version_id) { + core::cmp::Ordering::Equal => {} ord => return ord, } - self.flags.partial_cmp(&other.flags) + self.flags.cmp(&other.flags) } } + impl From for FileMetaVersionHeader { fn from(value: FileMetaVersion) -> Self { let flags = { diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index 63e07edb..f5a817c4 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -39,7 +39,7 @@ lazy_static! { pub fn global_rustfs_port() -> u16 { if let Some(p) = GLOBAL_RUSTFS_PORT.get() { - p.clone() + *p } else { DEFAULT_PORT } @@ -65,8 +65,16 @@ pub fn set_global_endpoints(eps: Vec) { .expect("GLOBAL_Endpoints set faild") } +pub fn get_global_endpoints() -> EndpointServerPools { + if let Some(eps) = GLOBAL_Endpoints.get() { + eps.clone() + } else { + EndpointServerPools::default() + } +} + pub fn new_object_layer_fn() -> Option> { - GLOBAL_OBJECT_API.get().map(|ec| ec.clone()) + GLOBAL_OBJECT_API.get().cloned() } pub async fn set_object_layer(o: Arc) { diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 721a09f8..0f579093 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -117,10 +117,8 @@ async fn run_data_scanner() { .map_or(Vec::new(), |buf| buf); let mut buf_t = Deserializer::new(Cursor::new(buf)); - let mut cycle_info: CurrentScannerCycle = match Deserialize::deserialize(&mut buf_t) { - Ok(info) => info, - Err(_) => CurrentScannerCycle::default(), - }; + + let mut cycle_info: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap_or_default(); loop { let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); diff --git a/ecstore/src/heal/heal_commands.rs b/ecstore/src/heal/heal_commands.rs index 2943f1e5..0ce85910 100644 --- a/ecstore/src/heal/heal_commands.rs +++ b/ecstore/src/heal/heal_commands.rs @@ -17,7 +17,7 @@ use crate::{ global::GLOBAL_BackgroundHealState, heal::heal_ops::HEALING_TRACKER_FILENAME, new_object_layer_fn, - store_api::{BucketInfo, StorageAPI, StorageDisk}, + store_api::{BucketInfo, StorageAPI}, utils::fs::read_file, }; @@ -131,35 +131,6 @@ impl Default for HealStartSuccess { pub type HealStopSuccess = HealStartSuccess; -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct HealingDisk { - pub id: String, - pub heal_id: String, - pub pool_index: Option, - pub set_index: Option, - pub disk_index: Option, - pub endpoint: String, - pub path: String, - pub started: Option, - pub last_update: Option, - pub retry_attempts: u64, - pub objects_total_count: u64, - pub objects_total_size: u64, - pub items_healed: u64, - pub items_failed: u64, - pub item_skipped: u64, - pub bytes_done: u64, - pub bytes_failed: u64, - pub bytes_skipped: u64, - pub objects_healed: u64, - pub objects_failed: u64, - pub bucket: String, - pub object: String, - pub queue_buckets: Vec, - pub healed_buckets: Vec, - pub finished: bool, -} - #[derive(Debug, Default, Deserialize, Serialize)] pub struct HealingTracker { #[serde(skip_serializing, skip_deserializing)] @@ -375,10 +346,10 @@ impl HealingTracker { }); } - pub async fn to_healing_disk(&self) -> HealingDisk { + pub async fn to_healing_disk(&self) -> madmin::HealingDisk { let _ = self.mu.read().await; - HealingDisk { + madmin::HealingDisk { id: self.id.clone(), heal_id: self.heal_id.clone(), pool_index: self.pool_index, @@ -516,7 +487,7 @@ pub struct SetStatus { pub heal_status: String, pub heal_priority: String, pub total_objects: usize, - pub disks: Vec, + pub disks: Vec, } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 73d7f82f..53b5551a 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -3,8 +3,7 @@ use super::{ data_scanner::HEAL_DELETE_DANGLING, error::ERR_SKIP_FILE, heal_commands::{ - HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker, - HEAL_ITEM_BUCKET_METADATA, + HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA, }, }; use crate::store_api::StorageAPI; @@ -664,7 +663,7 @@ impl AllHealState { self.heal_status.write().await.insert(tracker.id.clone(), tracker.clone()); } - pub async fn get_local_healing_disks(&self) -> HashMap { + pub async fn get_local_healing_disks(&self) -> HashMap { let _ = self.mu.read().await; let mut dst = HashMap::new(); diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index 4ea46a1d..a36d1462 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -2,8 +2,12 @@ use std::sync::OnceLock; use crate::endpoints::EndpointServerPools; use crate::error::{Error, Result}; +use crate::global::get_global_endpoints; use crate::peer_rest_client::PeerRestClient; +use crate::StorageAPI; +use futures::future::join_all; use lazy_static::lazy_static; +use tracing::error; lazy_static! { pub static ref GLOBAL_NotificationSys: OnceLock = OnceLock::new(); @@ -16,9 +20,13 @@ pub async fn new_global_notification_sys(eps: EndpointServerPools) -> Result<()> Ok(()) } +pub fn get_global_notification_sys() -> Option<&'static NotificationSys> { + GLOBAL_NotificationSys.get() +} + pub struct NotificationSys { - pub peer_clients: Vec, - pub all_peer_clients: Vec, + peer_clients: Vec>, + all_peer_clients: Vec>, } impl NotificationSys { @@ -50,4 +58,71 @@ impl NotificationSys { pub async fn delete_user(&self) -> Vec { unimplemented!() } + + pub async fn storage_info(&self, api: &S) -> madmin::StorageInfo { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + + for client in self.peer_clients.iter() { + futures.push(async move { + if let Some(client) = client { + match client.local_storage_info().await { + Ok(info) => Some(info), + Err(_) => Some(madmin::StorageInfo { + disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()), + ..Default::default() + }), + } + } else { + None + } + }); + } + + let mut replies = join_all(futures).await; + + replies.push(Some(api.local_storage_info().await)); + + let mut disks = Vec::new(); + for info in replies.into_iter().flatten() { + disks.extend(info.disks); + } + + let backend = api.backend_info().await; + madmin::StorageInfo { disks, backend } + } + + pub async fn reload_pool_meta(&self) { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().flatten() { + futures.push(client.reload_pool_meta()); + } + + let results = join_all(futures).await; + for result in results { + if let Err(err) = result { + error!("notification reload_pool_meta err {:?}", err); + } + } + } +} + +fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec { + let mut offline_disks = Vec::new(); + + for pool in endpoints.as_ref() { + for ep in pool.endpoints.as_ref() { + if (offline_host.is_empty() && ep.is_local) || offline_host == ep.host_port() { + offline_disks.push(madmin::Disk { + endpoint: ep.to_string(), + state: madmin::ItemState::Offline.to_string().to_owned(), + pool_index: ep.pool_idx, + set_index: ep.set_idx, + disk_index: ep.disk_idx, + ..Default::default() + }); + } + } + } + + offline_disks } diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 61737fc0..a5217ba9 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -110,7 +110,7 @@ impl S3PeerSys { let mut futures = Vec::new(); let heal_bucket_results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.clients.len()])); for (idx, client) in self.clients.iter().enumerate() { - let opts_clone = opts.clone(); + let opts_clone = opts; let heal_bucket_results_clone = heal_bucket_results.clone(); futures.push(async move { match client.heal_bucket(bucket, &opts_clone).await { diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index d60b537d..76aaee69 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -6,7 +6,7 @@ use crate::{ global::is_dist_erasure, heal::heal_commands::BgHealState, metrics_realtime::{CollectMetricsOpts, MetricType}, - store_api::StorageInfo, + utils::net::XHost, }; use common::error::{Error, Result}; use madmin::{ @@ -29,35 +29,53 @@ use protos::{ use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize as _}; use tonic::Request; +use tracing::warn; pub const PEER_RESTSIGNAL: &str = "signal"; pub const PEER_RESTSUB_SYS: &str = "sub-sys"; pub const PEER_RESTDRY_RUN: &str = "dry-run"; +#[derive(Debug, Clone)] pub struct PeerRestClient { - addr: String, + pub host: XHost, + pub grid_host: String, } impl PeerRestClient { - pub fn new(url: url::Url) -> Self { - Self { - addr: format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()), - } + pub fn new(host: XHost, grid_host: String) -> Self { + Self { host, grid_host } } - pub async fn new_clients(_eps: EndpointServerPools) -> (Vec, Vec) { + pub async fn new_clients(eps: EndpointServerPools) -> (Vec>, Vec>) { if !is_dist_erasure().await { return (Vec::new(), Vec::new()); } - // FIXME:TODO + let eps = eps.clone(); + let hosts = eps.hosts_sorted(); + let mut remote = vec![None; hosts.len()]; + let mut all = Vec::with_capacity(hosts.len()); + for (i, hs_host) in hosts.iter().enumerate() { + if let Some(host) = hs_host { + if let Some(grid_host) = eps.find_grid_hosts_from_peer(host) { + let client = PeerRestClient::new(host.clone(), grid_host); - todo!() + all[i] = Some(client.clone()); + remote.push(Some(client)); + } + } + } + + if all.len() != remote.len() + 1 { + warn!("Expected number of all hosts ({}) to be remote +1 ({})", all.len(), remote.len()); + } + + (remote, all) } } impl PeerRestClient { - pub async fn local_storage_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + pub async fn local_storage_info(&self) -> Result { + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LocalStorageInfoRequest { metrics: true }); @@ -72,13 +90,13 @@ impl PeerRestClient { let data = response.storage_info; let mut buf = Deserializer::new(Cursor::new(data)); - let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); + let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); Ok(storage_info) } pub async fn server_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ServerInfoRequest { metrics: true }); @@ -99,7 +117,7 @@ impl PeerRestClient { } pub async fn get_cpus(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetCpusRequest {}); @@ -120,7 +138,7 @@ impl PeerRestClient { } pub async fn get_net_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetNetInfoRequest {}); @@ -141,7 +159,7 @@ impl PeerRestClient { } pub async fn get_partitions(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetPartitionsRequest {}); @@ -162,7 +180,7 @@ impl PeerRestClient { } pub async fn get_os_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetOsInfoRequest {}); @@ -183,7 +201,7 @@ impl PeerRestClient { } pub async fn get_se_linux_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSeLinuxInfoRequest {}); @@ -204,7 +222,7 @@ impl PeerRestClient { } pub async fn get_sys_config(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSysConfigRequest {}); @@ -225,7 +243,7 @@ impl PeerRestClient { } pub async fn get_sys_errors(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSysErrorsRequest {}); @@ -246,7 +264,7 @@ impl PeerRestClient { } pub async fn get_mem_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetMemInfoRequest {}); @@ -267,7 +285,7 @@ impl PeerRestClient { } pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let mut buf_t = Vec::new(); @@ -295,7 +313,7 @@ impl PeerRestClient { } pub async fn get_proc_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetProcInfoRequest {}); @@ -316,7 +334,7 @@ impl PeerRestClient { } pub async fn start_profiling(&self, profiler: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(StartProfilingRequest { @@ -350,7 +368,7 @@ impl PeerRestClient { } pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadBucketMetadataRequest { @@ -368,7 +386,7 @@ impl PeerRestClient { } pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteBucketMetadataRequest { @@ -386,7 +404,7 @@ impl PeerRestClient { } pub async fn delete_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeletePolicyRequest { @@ -404,7 +422,7 @@ impl PeerRestClient { } pub async fn load_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadPolicyRequest { @@ -422,7 +440,7 @@ impl PeerRestClient { } pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadPolicyMappingRequest { @@ -442,7 +460,7 @@ impl PeerRestClient { } pub async fn delete_user(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteUserRequest { @@ -460,7 +478,7 @@ impl PeerRestClient { } pub async fn delete_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteServiceAccountRequest { @@ -478,7 +496,7 @@ impl PeerRestClient { } pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadUserRequest { @@ -497,7 +515,7 @@ impl PeerRestClient { } pub async fn load_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadServiceAccountRequest { @@ -515,7 +533,7 @@ impl PeerRestClient { } pub async fn load_group(&self, group: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadGroupRequest { @@ -533,7 +551,7 @@ impl PeerRestClient { } pub async fn reload_site_replication_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ReloadSiteReplicationConfigRequest {}); @@ -549,7 +567,7 @@ impl PeerRestClient { } pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let mut vars = HashMap::new(); @@ -571,7 +589,7 @@ impl PeerRestClient { } pub async fn background_heal_status(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(BackgroundHealStatusRequest {}); @@ -592,21 +610,21 @@ impl PeerRestClient { } pub async fn get_metacache_listing(&self) -> Result<()> { - let mut _client = node_service_time_out_client(&self.addr) + let mut _client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; todo!() } pub async fn update_metacache_listing(&self) -> Result<()> { - let mut _client = node_service_time_out_client(&self.addr) + let mut _client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; todo!() } pub async fn reload_pool_meta(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ReloadPoolMetaRequest {}); @@ -623,7 +641,7 @@ impl PeerRestClient { } pub async fn stop_rebalance(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(StopRebalanceRequest {}); @@ -640,7 +658,7 @@ impl PeerRestClient { } pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadRebalanceMetaRequest { start_rebalance }); @@ -657,7 +675,7 @@ impl PeerRestClient { } pub async fn load_transition_tier_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadTransitionTierConfigRequest {}); diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index 8712308e..d6a43f83 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -5,13 +5,15 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; use crate::error::{Error, Result}; use crate::heal::heal_commands::HealOpts; use crate::new_object_layer_fn; -use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI, StorageDisk, StorageInfo}; +use crate::notification_sys::get_global_notification_sys; +use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI}; use crate::store_err::{is_err_bucket_exists, StorageError}; use crate::utils::path::{path_join, SLASH_SEPARATOR}; use crate::{sets::Sets, store::ECStore}; use byteorder::{ByteOrder, LittleEndian, WriteBytesExt}; use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; +use std::fmt::Display; use std::io::{Cursor, Write}; use std::path::PathBuf; use std::sync::Arc; @@ -270,7 +272,7 @@ impl PoolMeta { } } -fn path2_bucket_object(name: &String) -> (String, String) { +fn path2_bucket_object(name: &str) -> (String, String) { path2_bucket_object_with_base_path("", name) } @@ -374,11 +376,13 @@ pub struct DecomBucketInfo { pub prefix: String, } -impl ToString for DecomBucketInfo { - fn to_string(&self) -> String { - path_join(&[PathBuf::from(self.name.clone()), PathBuf::from(self.prefix.clone())]) - .to_string_lossy() - .to_string() +impl Display for DecomBucketInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + path_join(&[PathBuf::from(self.name.clone()), PathBuf::from(self.prefix.clone())]).to_string_lossy() + ) } } @@ -531,7 +535,9 @@ impl ECStore { let mut pool_meta = self.pool_meta.write().await; if pool_meta.decommission_failed(idx) { pool_meta.save(self.pools.clone()).await?; - // FIXME: globalNotificationSys.ReloadPoolMeta(ctx) + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.reload_pool_meta().await; + } } Ok(()) @@ -545,7 +551,9 @@ impl ECStore { let mut pool_meta = self.pool_meta.write().await; if pool_meta.decommission_complete(idx) { pool_meta.save(self.pools.clone()).await?; - // FIXME: globalNotificationSys.ReloadPoolMeta(ctx) + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.reload_pool_meta().await; + } } Ok(()) @@ -641,7 +649,9 @@ impl ECStore { pool_meta.save(self.pools.clone()).await?; - // FIXME: globalNotificationSys.ReloadPoolMeta(ctx) + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.reload_pool_meta().await; + } Ok(()) } @@ -670,7 +680,7 @@ impl ECStore { } } -fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize { +fn get_total_usable_capacity(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize { let mut capacity = 0; for disk in disks.iter() { if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { @@ -683,7 +693,7 @@ fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize capacity } -fn get_total_usable_capacity_free(disks: &[StorageDisk], info: &StorageInfo) -> usize { +fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize { let mut capacity = 0; for disk in disks.iter() { if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index e43fc478..152f2063 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -38,10 +38,9 @@ use crate::{ }, quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError}, store_api::{ - BackendByte, BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, - GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, - MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, - RawFileInfo, StorageAPI, StorageDisk, StorageInfo, DEFAULT_BITROT_ALGO, + BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec, + ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, ObjectIO, ObjectInfo, + ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, RawFileInfo, StorageAPI, DEFAULT_BITROT_ALGO, }, store_err::{to_object_err, StorageError}, store_init::{load_format_erasure, ErasureError}, @@ -3596,15 +3595,15 @@ impl ObjectIO for SetDisks { #[async_trait::async_trait] impl StorageAPI for SetDisks { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { unimplemented!() } - async fn storage_info(&self) -> StorageInfo { + async fn storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; get_storage_info(&disks, &self.set_endpoints).await } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; let mut local_disks: Vec>> = Vec::new(); @@ -4947,13 +4946,13 @@ pub fn should_heal_object_on_disk( (false, err.clone()) } -async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { +async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { let mut ret = Vec::new(); for (i, pool) in disks.iter().enumerate() { if let Some(disk) = pool { match disk.disk_info(&DiskInfoOptions::default()).await { - Ok(res) => ret.push(StorageDisk { + Ok(res) => ret.push(madmin::Disk { endpoint: eps[i].to_string(), local: eps[i].is_local, pool_index: eps[i].pool_idx, @@ -4984,7 +4983,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec ret.push(StorageDisk { + Err(err) => ret.push(madmin::Disk { state: err.to_string(), endpoint: eps[i].to_string(), local: eps[i].is_local, @@ -4995,7 +4994,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec], eps: &[Endpoint]) -> Vec>, eps: &Vec) -> StorageInfo { +async fn get_storage_info(disks: &Vec>, eps: &Vec) -> madmin::StorageInfo { let mut disks = get_disks_info(disks, eps).await; disks.sort_by(|a, b| a.total_space.cmp(&b.total_space)); - StorageInfo { + madmin::StorageInfo { disks, - backend: BackendInfo { - backend_type: BackendByte::Erasure, + backend: madmin::BackendInfo { + backend_type: madmin::BackendByte::Erasure, ..Default::default() }, } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 50537374..000fe458 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -23,9 +23,9 @@ use crate::{ }, set_disk::SetDisks, store_api::{ - BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, + BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, - ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, StorageInfo, + ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, }, store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file}, utils::hash, @@ -294,10 +294,10 @@ impl ObjectIO for Sets { #[async_trait::async_trait] impl StorageAPI for Sets { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { unimplemented!() } - async fn storage_info(&self) -> StorageInfo { + async fn storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.disk_set.len()); for set in self.disk_set.iter() { @@ -312,12 +312,12 @@ impl StorageAPI for Sets { disks.extend_from_slice(&res.disks); } - StorageInfo { + madmin::StorageInfo { disks, ..Default::default() } } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.disk_set.len()); for set in self.disk_set.iter() { @@ -331,7 +331,7 @@ impl StorageAPI for Sets { for res in results.into_iter() { disks.extend_from_slice(&res.disks); } - StorageInfo { + madmin::StorageInfo { disks, ..Default::default() } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 2dcc5dc6..148837f5 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -15,10 +15,9 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo}; use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA}; use crate::heal::heal_ops::{HealEntryFn, HealSequence}; use crate::new_object_layer_fn; +use crate::notification_sys::get_global_notification_sys; use crate::pools::PoolMeta; -use crate::store_api::{ - BackendByte, BackendDisks, BackendInfo, ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO, StorageInfo, -}; +use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO}; use crate::store_err::{ is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found, to_object_err, StorageError, @@ -1120,7 +1119,7 @@ lazy_static! { #[async_trait::async_trait] impl StorageAPI for ECStore { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { let (standard_sc_parity, rr_sc_parity) = { if let Some(sc) = GLOBAL_StorageClass.get() { let sc_parity = sc @@ -1151,10 +1150,10 @@ impl StorageAPI for ECStore { drives_per_set.push(*set_count); } - BackendInfo { - backend_type: BackendByte::Erasure, - online_disks: BackendDisks::new(), - offline_disks: BackendDisks::new(), + madmin::BackendInfo { + backend_type: madmin::BackendByte::Erasure, + online_disks: madmin::BackendDisks::new(), + offline_disks: madmin::BackendDisks::new(), standard_sc_data, standard_sc_parity, rr_sc_data, @@ -1164,11 +1163,14 @@ impl StorageAPI for ECStore { ..Default::default() } } - async fn storage_info(&self) -> StorageInfo { - // FIXME: globalNotificationSys.StorageInfo - unimplemented!() + async fn storage_info(&self) -> madmin::StorageInfo { + let Some(notification_sy) = get_global_notification_sys() else { + return madmin::StorageInfo::default(); + }; + + notification_sy.storage_info(self).await } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.pools.len()); for pool in self.pools.iter() { @@ -1184,7 +1186,7 @@ impl StorageAPI for ECStore { } let backend = self.backend_info().await; - StorageInfo { backend, disks } + madmin::StorageInfo { backend, disks } } async fn list_bucket(&self, opts: &BucketOptions) -> Result> { diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index acd8d1dc..715d2181 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,4 +1,3 @@ -use crate::heal::heal_commands::HealingDisk; use crate::heal::heal_ops::HealSequence; use crate::{ disk::DiskStore, @@ -9,7 +8,6 @@ use crate::{ }; use futures::StreamExt; use http::HeaderMap; -use madmin::info_commands::DiskMetrics; use rmp_serde::Serializer; use s3s::{dto::StreamingBlob, Body}; use serde::{Deserialize, Serialize}; @@ -333,7 +331,7 @@ impl ErasureInfo { // 算出每个分片大小 pub fn shard_size(&self, data_size: usize) -> usize { - (data_size + self.data_blocks - 1) / self.data_blocks + data_size.div_ceil(self.data_blocks) } // returns final erasure size from original size. @@ -344,7 +342,7 @@ impl ErasureInfo { let num_shards = total_size / self.block_size; let last_block_size = total_size % self.block_size; - let last_shard_size = (last_block_size + self.data_blocks - 1) / self.data_blocks; + let last_shard_size = last_block_size.div_ceil(self.data_blocks); num_shards * self.shard_size(self.block_size) + last_shard_size // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 @@ -804,84 +802,6 @@ pub struct DeletedObject { // pub replication_state: ReplicationState, } -#[derive(Debug, Default, Serialize, Deserialize)] -pub enum BackendByte { - #[default] - Unknown, - FS, - Erasure, -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct StorageDisk { - pub endpoint: String, - pub root_disk: bool, - pub drive_path: String, - pub healing: bool, - pub scanning: bool, - pub state: String, - pub uuid: String, - pub major: u32, - pub minor: u32, - pub model: Option, - pub total_space: u64, - pub used_space: u64, - pub available_space: u64, - pub read_throughput: f64, - pub write_throughput: f64, - pub read_latency: f64, - pub write_latency: f64, - pub utilization: f64, - pub metrics: Option, - pub heal_info: Option, - pub used_inodes: u64, - pub free_inodes: u64, - pub local: bool, - pub pool_index: i32, - pub set_index: i32, - pub disk_index: i32, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct StorageInfo { - pub disks: Vec, - pub backend: BackendInfo, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct BackendDisks(HashMap); - -impl BackendDisks { - pub fn new() -> Self { - Self(HashMap::new()) - } - pub fn sum(&self) -> usize { - self.0.values().sum() - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase", default)] -pub struct BackendInfo { - pub backend_type: BackendByte, - pub online_disks: BackendDisks, - pub offline_disks: BackendDisks, - #[serde(rename = "StandardSCData")] - pub standard_sc_data: Vec, - #[serde(rename = "StandardSCParities")] - pub standard_sc_parities: Vec, - #[serde(rename = "StandardSCParity")] - pub standard_sc_parity: Option, - #[serde(rename = "RRSCData")] - pub rr_sc_data: Vec, - #[serde(rename = "RRSCParities")] - pub rr_sc_parities: Vec, - #[serde(rename = "RRSCParity")] - pub rr_sc_parity: Option, - pub total_sets: Vec, - pub drives_per_set: Vec, -} - pub struct ListObjectVersionsInfo { pub is_truncated: bool, pub next_marker: String, @@ -906,14 +826,15 @@ pub trait ObjectIO: Send + Sync + 'static { } #[async_trait::async_trait] +#[allow(clippy::too_many_arguments)] pub trait StorageAPI: ObjectIO { // NewNSLock TODO: // Shutdown TODO: // NSScanner TODO: - async fn backend_info(&self) -> BackendInfo; - async fn storage_info(&self) -> StorageInfo; - async fn local_storage_info(&self) -> StorageInfo; + async fn backend_info(&self) -> madmin::BackendInfo; + async fn storage_info(&self) -> madmin::StorageInfo; + async fn local_storage_info(&self) -> madmin::StorageInfo; async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; @@ -953,7 +874,7 @@ pub trait StorageAPI: ObjectIO { objects: Vec, opts: ObjectOptions, ) -> Result<(Vec, Vec>)>; - #[warn(clippy::too_many_arguments)] + // TransitionObject TODO: // RestoreTransitionedObject TODO: diff --git a/ecstore/src/store_err.rs b/ecstore/src/store_err.rs index 5ee591dd..e0dabb87 100644 --- a/ecstore/src/store_err.rs +++ b/ecstore/src/store_err.rs @@ -208,7 +208,7 @@ pub fn is_err_object_not_found(err: &Error) -> bool { fn test_storage_error() { let e1 = Error::new(StorageError::BucketExists("ss".into())); let e2 = Error::new(StorageError::ObjectNotFound("ss".into(), "sdf".to_owned())); - assert_eq!(is_err_bucket_exists(&e1), true); - assert_eq!(is_err_object_not_found(&e1), false); - assert_eq!(is_err_object_not_found(&e2), true); + assert!(is_err_bucket_exists(&e1)); + assert!(!is_err_object_not_found(&e1)); + assert!(is_err_object_not_found(&e2)); } diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index 52d6d015..c2de72e1 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -2,6 +2,7 @@ use crate::error::{Error, Result}; use lazy_static::lazy_static; use std::{ collections::HashSet, + fmt::Display, net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs}, }; @@ -105,27 +106,38 @@ pub(crate) fn must_get_local_ips() -> Result> { } } +#[derive(Debug, Clone)] pub struct XHost { pub name: String, pub port: u16, pub is_port_set: bool, } -impl ToString for XHost { - fn to_string(&self) -> String { +impl Display for XHost { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if !self.is_port_set { - self.name.clone() + write!(f, "{}", self.name) + } else if self.name.contains(':') { + write!(f, "[{}]:{}", self.name, self.port) } else { - join_host_port(&self.name, self.port) + write!(f, "{}:{}", self.name, self.port) } } } -fn join_host_port(host: &str, port: u16) -> String { - if host.contains(':') { - format!("[{}]:{}", host, port) - } else { - format!("{}:{}", host, port) +impl TryFrom for XHost { + type Error = std::io::Error; + + fn try_from(value: String) -> std::result::Result { + if let Some(addr) = value.to_socket_addrs()?.next() { + Ok(Self { + name: addr.ip().to_string(), + port: addr.port(), + is_port_set: addr.port() > 0, + }) + } else { + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "value invalid")) + } } } diff --git a/ecstore/src/utils/os/unix.rs b/ecstore/src/utils/os/unix.rs index 17ccd06f..bee85715 100644 --- a/ecstore/src/utils/os/unix.rs +++ b/ecstore/src/utils/os/unix.rs @@ -77,6 +77,6 @@ pub fn same_disk(disk1: &str, disk2: &str) -> Result { Ok(stat1.st_dev == stat2.st_dev) } -pub fn get_drive_stats(major: u32, minor: u32) -> Result { +pub fn get_drive_stats(_major: u32, _minor: u32) -> Result { Ok(IOStats::default()) } diff --git a/madmin/Cargo.toml b/madmin/Cargo.toml index 77235f0f..83b54093 100644 --- a/madmin/Cargo.toml +++ b/madmin/Cargo.toml @@ -11,3 +11,4 @@ chrono.workspace = true common.workspace = true psutil = "3.3.0" serde.workspace = true +time.workspace =true diff --git a/madmin/src/info_commands.rs b/madmin/src/info_commands.rs index 17f75e18..ec8f0f27 100644 --- a/madmin/src/info_commands.rs +++ b/madmin/src/info_commands.rs @@ -1,9 +1,36 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::SystemTime}; use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; use crate::metrics::TimedAction; +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum ItemState { + Offline, + Initializing, + Online, +} + +impl ItemState { + pub fn to_string(&self) -> &str { + match self { + ItemState::Offline => "offline", + ItemState::Initializing => "initializing", + ItemState::Online => "online", + } + } + + pub fn from_string(s: &str) -> Option { + match s { + "offline" => Some(ItemState::Offline), + "initializing" => Some(ItemState::Initializing), + "online" => Some(ItemState::Online), + _ => None, + } + } +} + #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct DiskMetrics { pub last_minute: HashMap, @@ -14,3 +41,110 @@ pub struct DiskMetrics { pub total_writes: u64, pub total_deletes: u64, } + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct Disk { + pub endpoint: String, + pub root_disk: bool, + pub drive_path: String, + pub healing: bool, + pub scanning: bool, + pub state: String, + pub uuid: String, + pub major: u32, + pub minor: u32, + pub model: Option, + pub total_space: u64, + pub used_space: u64, + pub available_space: u64, + pub read_throughput: f64, + pub write_throughput: f64, + pub read_latency: f64, + pub write_latency: f64, + pub utilization: f64, + pub metrics: Option, + pub heal_info: Option, + pub used_inodes: u64, + pub free_inodes: u64, + pub local: bool, + pub pool_index: i32, + pub set_index: i32, + pub disk_index: i32, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct HealingDisk { + pub id: String, + pub heal_id: String, + pub pool_index: Option, + pub set_index: Option, + pub disk_index: Option, + pub endpoint: String, + pub path: String, + pub started: Option, + pub last_update: Option, + pub retry_attempts: u64, + pub objects_total_count: u64, + pub objects_total_size: u64, + pub items_healed: u64, + pub items_failed: u64, + pub item_skipped: u64, + pub bytes_done: u64, + pub bytes_failed: u64, + pub bytes_skipped: u64, + pub objects_healed: u64, + pub objects_failed: u64, + pub bucket: String, + pub object: String, + pub queue_buckets: Vec, + pub healed_buckets: Vec, + pub finished: bool, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub enum BackendByte { + #[default] + Unknown, + FS, + Erasure, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StorageInfo { + pub disks: Vec, + pub backend: BackendInfo, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct BackendDisks(HashMap); + +impl BackendDisks { + pub fn new() -> Self { + Self(HashMap::new()) + } + pub fn sum(&self) -> usize { + self.0.values().sum() + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase", default)] +pub struct BackendInfo { + pub backend_type: BackendByte, + pub online_disks: BackendDisks, + pub offline_disks: BackendDisks, + #[serde(rename = "StandardSCData")] + pub standard_sc_data: Vec, + #[serde(rename = "StandardSCParities")] + pub standard_sc_parities: Vec, + #[serde(rename = "StandardSCParity")] + pub standard_sc_parity: Option, + #[serde(rename = "RRSCData")] + pub rr_sc_data: Vec, + #[serde(rename = "RRSCParities")] + pub rr_sc_parities: Vec, + #[serde(rename = "RRSCParity")] + pub rr_sc_parity: Option, + pub total_sets: Vec, + pub drives_per_set: Vec, +} diff --git a/madmin/src/lib.rs b/madmin/src/lib.rs index 908eb69d..fd416c42 100644 --- a/madmin/src/lib.rs +++ b/madmin/src/lib.rs @@ -2,3 +2,5 @@ pub mod health; pub mod info_commands; pub mod metrics; pub mod net; + +pub use info_commands::*; diff --git a/madmin/src/metrics.rs b/madmin/src/metrics.rs index fee5b365..f8c6948c 100644 --- a/madmin/src/metrics.rs +++ b/madmin/src/metrics.rs @@ -168,7 +168,7 @@ impl ScannerMetrics { self.last_minute.ilm.entry(k.clone()).or_default().merge(v); } - self.active_paths.extend(other.active_paths.clone().into_iter()); + self.active_paths.extend(other.active_paths.clone()); self.active_paths.sort(); } @@ -606,14 +606,14 @@ pub struct RealtimeMetrics { impl RealtimeMetrics { pub fn merge(&mut self, other: Self) { if !other.errors.is_empty() { - self.errors.extend(other.errors.into_iter()); + self.errors.extend(other.errors); } for (k, v) in other.by_host.into_iter() { *self.by_host.entry(k).or_default() = v; } - self.hosts.extend(other.hosts.into_iter()); + self.hosts.extend(other.hosts); self.aggregated.merge(&other.aggregated); self.hosts.sort(); diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index fcec96c9..a91b6f7a 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -10,6 +10,7 @@ use ecstore::global::GLOBAL_ALlHealState; use ecstore::heal::heal_commands::HealOpts; use ecstore::heal::heal_ops::new_heal_sequence; use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType}; +use ecstore::new_object_layer_fn; use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::store::is_valid_object_prefix; use ecstore::store_api::StorageAPI; @@ -17,7 +18,6 @@ use ecstore::utils::path::path_join; use ecstore::utils::time::parse_duration; use ecstore::utils::xml; use ecstore::GLOBAL_Endpoints; -use ecstore::{new_object_layer_fn, store_api::BackendInfo}; use futures::{Stream, StreamExt}; use http::Uri; use hyper::StatusCode; @@ -37,7 +37,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration as std_Duration; -use std::u64; use time::{Duration, OffsetDateTime}; use tokio::sync::mpsc::{self}; use tokio::time::interval; @@ -128,7 +127,7 @@ impl Operation for AssumeRoleHandle { #[serde(rename_all = "PascalCase", default)] pub struct AccountInfo { pub account_name: String, - pub server: BackendInfo, + pub server: madmin::BackendInfo, pub policy: BucketPolicy, } @@ -224,7 +223,18 @@ impl Operation for StorageInfoHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle StorageInfoHandler"); - return Err(s3_error!(NotImplemented)); + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + // TODO:getAggregatedBackgroundHealState + + let info = store.storage_info().await; + + let output = serde_json::to_string(&info) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } @@ -492,15 +502,11 @@ fn extract_heal_init_params(body: &Bytes, uri: &Uri, params: Params<'_, '_>) -> hip.client_token = value.to_string(); } } - if key == "forceStart" { - if parts.next().is_some() { - hip.force_start = true; - } + if key == "forceStart" && parts.next().is_some() { + hip.force_start = true; } - if key == "forceStop" { - if parts.next().is_some() { - hip.force_stop = true; - } + if key == "forceStop" && parts.next().is_some() { + hip.force_stop = true; } } } diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index f038b3b5..c87957cd 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -4,10 +4,6 @@ use ecstore::global::DEFAULT_PORT; shadow_rs::shadow!(build); -/// Default port that a rustfs server listens on. -/// -/// Used if no port is specified. - pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e2ba46dc..e0225424 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -8,7 +8,6 @@ use common::{ error::{Error, Result}, globals::set_global_addr, }; -use ecstore::global::set_global_rustfs_port; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::utils::net::{self, get_available_port}; use ecstore::{ @@ -18,6 +17,7 @@ use ecstore::{ store::{init_local_disks, ECStore}, update_erasure_type, }; +use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use grpc::make_server; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -212,10 +212,10 @@ async fn run(opt: config::Opt) -> Result<()> { })?; warn!(" init store success!"); - // new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { - // error!("new_global_notification_sys faild {:?}", &err); - // Error::from_string(err.to_string()) - // })?; + new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { + error!("new_global_notification_sys faild {:?}", &err); + Error::from_string(err.to_string()) + })?; // init scanner init_data_scanner().await; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index cd1da10e..b3a40ac8 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -591,8 +591,6 @@ impl S3 for FS { .await .map_err(to_s3_error)?; - error!("ObjectOptions {:?}", opts); - let obj_info = store .put_object(&bucket, &key, &mut reader, &opts) .await