Merge pull request #146 from rustfs/feat/notification_sys

Feat/notification sys
This commit is contained in:
weisd
2024-12-04 11:02:09 +08:00
committed by GitHub
38 changed files with 507 additions and 328 deletions

2
Cargo.lock generated
View File

@@ -764,6 +764,7 @@ dependencies = [
"flatbuffers",
"lazy_static",
"lock",
"madmin",
"protos",
"rmp-serde",
"serde",
@@ -1677,6 +1678,7 @@ dependencies = [
"common",
"psutil",
"serde",
"time",
]
[[package]]

View File

@@ -81,7 +81,7 @@ mod tests {
sleep(Duration::from_secs(1)).await;
workers.wait().await;
if workers.available().await != workers.limit {
assert!(false);
unreachable!();
}
}
}

View File

@@ -10,7 +10,10 @@ fn test() {
"bbb": "bbb"
});
let jwt_token = encode(b"aaaa", &claims).unwrap();
let new_claims = decode(&jwt_token, b"aaaa").unwrap();
assert_eq!(new_claims.claims, claims);
let jwt_token = encode(b"aaaa", &claims).unwrap_or_default();
let new_claims = match decode(&jwt_token, b"aaaa") {
Ok(res) => Some(res.claims),
Err(_errr) => None,
};
assert_eq!(new_claims, Some(claims));
}

View File

@@ -20,4 +20,5 @@ serde_json.workspace = true
tonic = { version = "0.12.3", features = ["gzip"] }
tokio = { workspace = true }
tower.workspace = true
url.workspace = true
url.workspace = true
madmin.workspace =true

View File

@@ -1,6 +1,6 @@
#![cfg(test)]
use ecstore::{disk::VolumeInfo, store_api::StorageInfo};
use ecstore::disk::VolumeInfo;
use protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
@@ -118,7 +118,7 @@ async fn storage_info() -> Result<(), Box<dyn Error>> {
let info = response.storage_info;
let mut buf = Deserializer::new(Cursor::new(info));
let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
println!("{:?}", storage_info);
Ok(())
}

View File

@@ -15,12 +15,7 @@ use protos::{
use serde::{Deserialize, Serialize};
use tonic::Request;
use crate::{
disk::endpoint::Endpoint,
global::GLOBAL_Endpoints,
new_object_layer_fn,
store_api::{StorageAPI, StorageDisk},
};
use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI};
pub const ITEM_OFFLINE: &str = "offline";
pub const ITEM_INITIALIZING: &str = "initializing";
@@ -44,7 +39,7 @@ pub struct ServerProperties {
pub version: String,
pub commit_id: String,
pub network: HashMap<String, String>,
pub disks: Vec<StorageDisk>,
pub disks: Vec<madmin::Disk>,
pub pool_number: i32,
pub pool_numbers: Vec<i32>,
pub mem_stats: MemStats,

View File

@@ -169,6 +169,7 @@ pub async fn new_bitrot_writer(
pub type BitrotReader = Box<dyn ReadAt + Send>;
#[allow(clippy::too_many_arguments)]
pub fn new_bitrot_reader(
disk: DiskStore,
data: &[u8],
@@ -691,16 +692,16 @@ mod test {
let ep = Endpoint::try_from(temp_dir.as_str())?;
let opt = DiskOption::default();
let disk = new_disk(&ep, &opt).await?;
let _ = disk.make_volume(volume).await?;
disk.make_volume(volume).await?;
let mut writer = new_bitrot_writer(disk.clone(), "", volume, file_path, 35, algo.clone(), 10).await?;
let _ = writer.write(b"aaaaaaaaaa").await?;
let _ = writer.write(b"aaaaaaaaaa").await?;
let _ = writer.write(b"aaaaaaaaaa").await?;
let _ = writer.write(b"aaaaa").await?;
writer.write(b"aaaaaaaaaa").await?;
writer.write(b"aaaaaaaaaa").await?;
writer.write(b"aaaaaaaaaa").await?;
writer.write(b"aaaaa").await?;
let sum = bitrot_writer_sum(&writer);
let _ = writer.close().await?;
writer.close().await?;
let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10);
let read_len = 10;

View File

@@ -169,6 +169,7 @@ impl<'de> Deserialize<'de> for Resource {
{
struct Visitor;
#[allow(clippy::needless_lifetimes)]
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = Resource;

View File

@@ -105,9 +105,9 @@ pub async fn read_config_without_migrate<S: StorageAPI>(api: Arc<S>) -> Result<C
Ok(res) => res,
Err(err) => {
if is_not_found(&err) {
warn!("config not found init start");
warn!("config not found, start to init");
let cfg = new_and_save_server_config(api).await?;
warn!("config not found init done");
warn!("config init done");
return Ok(cfg);
} else {
error!("read config err {:?}", &err);

View File

@@ -139,13 +139,12 @@ impl DiskError {
}
pub fn count_errs(&self, errs: &[Option<Error>]) -> usize {
return errs
.iter()
errs.iter()
.filter(|&err| match err {
None => false,
Some(e) => self.is(e),
})
.count();
.count()
}
pub fn quorum_unformatted_disks(errs: &[Option<Error>]) -> bool {

View File

@@ -1291,7 +1291,7 @@ impl DiskAPI for LocalDisk {
Ok(res) => res,
Err(e) => {
if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) {
error!("list_dir err {:?}", &e);
info!("list_dir err {:?}", &e);
}
if opts.report_notfound && is_err_file_not_found(&e) {

View File

@@ -994,6 +994,7 @@ impl BufferWriter {
pub fn new(inner: Vec<u8>) -> Self {
Self { inner }
}
#[allow(clippy::should_implement_trait)]
pub fn as_ref(&self) -> &[u8] {
self.inner.as_ref()
}
@@ -1038,6 +1039,10 @@ impl Writer for LocalFileWriter {
}
}
type NodeClient = NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>;
#[derive(Debug)]
pub struct RemoteFileWriter {
pub root: PathBuf,
@@ -1049,15 +1054,7 @@ pub struct RemoteFileWriter {
}
impl RemoteFileWriter {
pub async fn new(
root: PathBuf,
volume: String,
path: String,
is_append: bool,
mut client: NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
) -> Result<Self> {
pub async fn new(root: PathBuf, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
@@ -1247,14 +1244,7 @@ pub struct RemoteFileReader {
}
impl RemoteFileReader {
pub async fn new(
root: PathBuf,
volume: String,
path: String,
mut client: NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
) -> Result<Self> {
pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeClient) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);

View File

@@ -1,9 +1,11 @@
use tracing::warn;
use crate::{
disk::endpoint::{Endpoint, EndpointType},
disks_layout::DisksLayout,
error::{Error, Result},
global::global_rustfs_port,
utils::net,
utils::net::{self, XHost},
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
@@ -530,20 +532,30 @@ impl EndpointServerPools {
nodes
}
pub fn hosts_sorted(&self) -> Vec<()> {
pub fn hosts_sorted(&self) -> Vec<Option<XHost>> {
let (mut peers, local) = self.peers();
let mut ret = vec![None; peers.len()];
peers.sort();
for peer in peers.iter() {
for (i, peer) in peers.iter().enumerate() {
if &local == peer {
continue;
}
// FIXME:TODO
let host = match XHost::try_from(peer.clone()) {
Ok(res) => res,
Err(err) => {
warn!("Xhost parse failed {:?}", err);
continue;
}
};
ret[i] = Some(host);
}
unimplemented!()
ret
}
pub fn peers(&self) -> (Vec<String>, String) {
let mut local = None;
@@ -554,12 +566,8 @@ impl EndpointServerPools {
continue;
}
let host = endpoint.host_port();
if endpoint.is_local {
if endpoint.url.port() == Some(global_rustfs_port()) {
if local.is_none() {
local = Some(host.clone());
}
}
if endpoint.is_local && endpoint.url.port() == Some(global_rustfs_port()) && local.is_none() {
local = Some(host.clone());
}
set.insert(host);
@@ -570,6 +578,28 @@ impl EndpointServerPools {
(hosts, local.unwrap_or_default())
}
pub fn find_grid_hosts_from_peer(&self, host: &XHost) -> Option<String> {
for ep in self.0.iter() {
for endpoint in ep.endpoints.0.iter() {
if endpoint.is_local {
continue;
}
let xhost = match XHost::try_from(endpoint.host_port()) {
Ok(res) => res,
Err(_) => {
continue;
}
};
if xhost.to_string() == host.to_string() {
return Some(endpoint.grid_host());
}
}
}
None
}
}
#[cfg(test)]

View File

@@ -387,7 +387,7 @@ impl Erasure {
// 算出每个分片大小
pub fn shard_size(&self, data_size: usize) -> usize {
(data_size + self.data_shards - 1) / self.data_shards
data_size.div_ceil(self.data_shards)
}
// returns final erasure size from original size.
pub fn shard_file_size(&self, total_size: usize) -> usize {
@@ -397,7 +397,7 @@ impl Erasure {
let num_shards = total_size / self.block_size;
let last_block_size = total_size % self.block_size;
let last_shard_size = (last_block_size + self.data_shards - 1) / self.data_shards;
let last_shard_size = last_block_size.div_ceil(self.data_shards);
num_shards * self.shard_size(self.block_size) + last_shard_size
// // 因为写入的时候ec需要补全所以最后一个长度应该也是一样的

View File

@@ -250,7 +250,7 @@ impl FileMeta {
// TODO: use old buf
let meta_buf = ver.marshal_msg()?;
let pre_mod_time = self.versions[idx].header.mod_time.clone();
let pre_mod_time = self.versions[idx].header.mod_time;
self.versions[idx].header = ver.header();
self.versions[idx].meta = meta_buf;
@@ -824,7 +824,7 @@ impl TryFrom<FileMetaShallowVersion> for FileMetaVersion {
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Ord, Hash)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)]
pub struct FileMetaVersionHeader {
pub version_id: Option<Uuid>,
pub mod_time: Option<OffsetDateTime>,
@@ -974,26 +974,33 @@ impl FileMetaVersionHeader {
impl PartialOrd for FileMetaVersionHeader {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.mod_time.partial_cmp(&other.mod_time) {
Some(core::cmp::Ordering::Equal) => {}
Some(self.cmp(other))
}
}
impl Ord for FileMetaVersionHeader {
fn cmp(&self, other: &Self) -> Ordering {
match self.mod_time.cmp(&other.mod_time) {
core::cmp::Ordering::Equal => {}
ord => return ord,
}
match self.version_type.partial_cmp(&other.version_type) {
Some(core::cmp::Ordering::Equal) => {}
match self.version_type.cmp(&other.version_type) {
core::cmp::Ordering::Equal => {}
ord => return ord,
}
match self.signature.partial_cmp(&other.signature) {
Some(core::cmp::Ordering::Equal) => {}
match self.signature.cmp(&other.signature) {
core::cmp::Ordering::Equal => {}
ord => return ord,
}
match self.version_id.partial_cmp(&other.version_id) {
Some(core::cmp::Ordering::Equal) => {}
match self.version_id.cmp(&other.version_id) {
core::cmp::Ordering::Equal => {}
ord => return ord,
}
self.flags.partial_cmp(&other.flags)
self.flags.cmp(&other.flags)
}
}
impl From<FileMetaVersion> for FileMetaVersionHeader {
fn from(value: FileMetaVersion) -> Self {
let flags = {

View File

@@ -39,7 +39,7 @@ lazy_static! {
pub fn global_rustfs_port() -> u16 {
if let Some(p) = GLOBAL_RUSTFS_PORT.get() {
p.clone()
*p
} else {
DEFAULT_PORT
}
@@ -65,8 +65,16 @@ pub fn set_global_endpoints(eps: Vec<PoolEndpoints>) {
.expect("GLOBAL_Endpoints set faild")
}
pub fn get_global_endpoints() -> EndpointServerPools {
if let Some(eps) = GLOBAL_Endpoints.get() {
eps.clone()
} else {
EndpointServerPools::default()
}
}
pub fn new_object_layer_fn() -> Option<Arc<ECStore>> {
GLOBAL_OBJECT_API.get().map(|ec| ec.clone())
GLOBAL_OBJECT_API.get().cloned()
}
pub async fn set_object_layer(o: Arc<ECStore>) {

View File

@@ -117,10 +117,8 @@ async fn run_data_scanner() {
.map_or(Vec::new(), |buf| buf);
let mut buf_t = Deserializer::new(Cursor::new(buf));
let mut cycle_info: CurrentScannerCycle = match Deserialize::deserialize(&mut buf_t) {
Ok(info) => info,
Err(_) => CurrentScannerCycle::default(),
};
let mut cycle_info: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap_or_default();
loop {
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);

View File

@@ -17,7 +17,7 @@ use crate::{
global::GLOBAL_BackgroundHealState,
heal::heal_ops::HEALING_TRACKER_FILENAME,
new_object_layer_fn,
store_api::{BucketInfo, StorageAPI, StorageDisk},
store_api::{BucketInfo, StorageAPI},
utils::fs::read_file,
};
@@ -131,35 +131,6 @@ impl Default for HealStartSuccess {
pub type HealStopSuccess = HealStartSuccess;
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct HealingDisk {
pub id: String,
pub heal_id: String,
pub pool_index: Option<usize>,
pub set_index: Option<usize>,
pub disk_index: Option<usize>,
pub endpoint: String,
pub path: String,
pub started: Option<OffsetDateTime>,
pub last_update: Option<SystemTime>,
pub retry_attempts: u64,
pub objects_total_count: u64,
pub objects_total_size: u64,
pub items_healed: u64,
pub items_failed: u64,
pub item_skipped: u64,
pub bytes_done: u64,
pub bytes_failed: u64,
pub bytes_skipped: u64,
pub objects_healed: u64,
pub objects_failed: u64,
pub bucket: String,
pub object: String,
pub queue_buckets: Vec<String>,
pub healed_buckets: Vec<String>,
pub finished: bool,
}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct HealingTracker {
#[serde(skip_serializing, skip_deserializing)]
@@ -375,10 +346,10 @@ impl HealingTracker {
});
}
pub async fn to_healing_disk(&self) -> HealingDisk {
pub async fn to_healing_disk(&self) -> madmin::HealingDisk {
let _ = self.mu.read().await;
HealingDisk {
madmin::HealingDisk {
id: self.id.clone(),
heal_id: self.heal_id.clone(),
pool_index: self.pool_index,
@@ -516,7 +487,7 @@ pub struct SetStatus {
pub heal_status: String,
pub heal_priority: String,
pub total_objects: usize,
pub disks: Vec<StorageDisk>,
pub disks: Vec<madmin::Disk>,
}
#[derive(Debug, Default, Serialize, Deserialize)]

View File

@@ -3,8 +3,7 @@ use super::{
data_scanner::HEAL_DELETE_DANGLING,
error::ERR_SKIP_FILE,
heal_commands::{
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker,
HEAL_ITEM_BUCKET_METADATA,
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA,
},
};
use crate::store_api::StorageAPI;
@@ -664,7 +663,7 @@ impl AllHealState {
self.heal_status.write().await.insert(tracker.id.clone(), tracker.clone());
}
pub async fn get_local_healing_disks(&self) -> HashMap<String, HealingDisk> {
pub async fn get_local_healing_disks(&self) -> HashMap<String, madmin::HealingDisk> {
let _ = self.mu.read().await;
let mut dst = HashMap::new();

View File

@@ -2,8 +2,12 @@ use std::sync::OnceLock;
use crate::endpoints::EndpointServerPools;
use crate::error::{Error, Result};
use crate::global::get_global_endpoints;
use crate::peer_rest_client::PeerRestClient;
use crate::StorageAPI;
use futures::future::join_all;
use lazy_static::lazy_static;
use tracing::error;
lazy_static! {
pub static ref GLOBAL_NotificationSys: OnceLock<NotificationSys> = OnceLock::new();
@@ -16,9 +20,13 @@ pub async fn new_global_notification_sys(eps: EndpointServerPools) -> Result<()>
Ok(())
}
pub fn get_global_notification_sys() -> Option<&'static NotificationSys> {
GLOBAL_NotificationSys.get()
}
pub struct NotificationSys {
pub peer_clients: Vec<PeerRestClient>,
pub all_peer_clients: Vec<PeerRestClient>,
peer_clients: Vec<Option<PeerRestClient>>,
all_peer_clients: Vec<Option<PeerRestClient>>,
}
impl NotificationSys {
@@ -50,4 +58,71 @@ impl NotificationSys {
pub async fn delete_user(&self) -> Vec<NotificationPeerErr> {
unimplemented!()
}
pub async fn storage_info<S: StorageAPI>(&self, api: &S) -> madmin::StorageInfo {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
futures.push(async move {
if let Some(client) = client {
match client.local_storage_info().await {
Ok(info) => Some(info),
Err(_) => Some(madmin::StorageInfo {
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),
..Default::default()
}),
}
} else {
None
}
});
}
let mut replies = join_all(futures).await;
replies.push(Some(api.local_storage_info().await));
let mut disks = Vec::new();
for info in replies.into_iter().flatten() {
disks.extend(info.disks);
}
let backend = api.backend_info().await;
madmin::StorageInfo { disks, backend }
}
pub async fn reload_pool_meta(&self) {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {
futures.push(client.reload_pool_meta());
}
let results = join_all(futures).await;
for result in results {
if let Err(err) = result {
error!("notification reload_pool_meta err {:?}", err);
}
}
}
}
fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec<madmin::Disk> {
let mut offline_disks = Vec::new();
for pool in endpoints.as_ref() {
for ep in pool.endpoints.as_ref() {
if (offline_host.is_empty() && ep.is_local) || offline_host == ep.host_port() {
offline_disks.push(madmin::Disk {
endpoint: ep.to_string(),
state: madmin::ItemState::Offline.to_string().to_owned(),
pool_index: ep.pool_idx,
set_index: ep.set_idx,
disk_index: ep.disk_idx,
..Default::default()
});
}
}
}
offline_disks
}

View File

@@ -110,7 +110,7 @@ impl S3PeerSys {
let mut futures = Vec::new();
let heal_bucket_results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.clients.len()]));
for (idx, client) in self.clients.iter().enumerate() {
let opts_clone = opts.clone();
let opts_clone = opts;
let heal_bucket_results_clone = heal_bucket_results.clone();
futures.push(async move {
match client.heal_bucket(bucket, &opts_clone).await {

View File

@@ -6,7 +6,7 @@ use crate::{
global::is_dist_erasure,
heal::heal_commands::BgHealState,
metrics_realtime::{CollectMetricsOpts, MetricType},
store_api::StorageInfo,
utils::net::XHost,
};
use common::error::{Error, Result};
use madmin::{
@@ -29,35 +29,53 @@ use protos::{
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize as _};
use tonic::Request;
use tracing::warn;
pub const PEER_RESTSIGNAL: &str = "signal";
pub const PEER_RESTSUB_SYS: &str = "sub-sys";
pub const PEER_RESTDRY_RUN: &str = "dry-run";
#[derive(Debug, Clone)]
pub struct PeerRestClient {
addr: String,
pub host: XHost,
pub grid_host: String,
}
impl PeerRestClient {
pub fn new(url: url::Url) -> Self {
Self {
addr: format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()),
}
pub fn new(host: XHost, grid_host: String) -> Self {
Self { host, grid_host }
}
pub async fn new_clients(_eps: EndpointServerPools) -> (Vec<Self>, Vec<Self>) {
pub async fn new_clients(eps: EndpointServerPools) -> (Vec<Option<Self>>, Vec<Option<Self>>) {
if !is_dist_erasure().await {
return (Vec::new(), Vec::new());
}
// FIXME:TODO
let eps = eps.clone();
let hosts = eps.hosts_sorted();
let mut remote = vec![None; hosts.len()];
let mut all = Vec::with_capacity(hosts.len());
for (i, hs_host) in hosts.iter().enumerate() {
if let Some(host) = hs_host {
if let Some(grid_host) = eps.find_grid_hosts_from_peer(host) {
let client = PeerRestClient::new(host.clone(), grid_host);
todo!()
all[i] = Some(client.clone());
remote.push(Some(client));
}
}
}
if all.len() != remote.len() + 1 {
warn!("Expected number of all hosts ({}) to be remote +1 ({})", all.len(), remote.len());
}
(remote, all)
}
}
impl PeerRestClient {
pub async fn local_storage_info(&self) -> Result<StorageInfo> {
let mut client = node_service_time_out_client(&self.addr)
pub async fn local_storage_info(&self) -> Result<madmin::StorageInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LocalStorageInfoRequest { metrics: true });
@@ -72,13 +90,13 @@ impl PeerRestClient {
let data = response.storage_info;
let mut buf = Deserializer::new(Cursor::new(data));
let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
Ok(storage_info)
}
pub async fn server_info(&self) -> Result<ServerProperties> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(ServerInfoRequest { metrics: true });
@@ -99,7 +117,7 @@ impl PeerRestClient {
}
pub async fn get_cpus(&self) -> Result<Cpus> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetCpusRequest {});
@@ -120,7 +138,7 @@ impl PeerRestClient {
}
pub async fn get_net_info(&self) -> Result<NetInfo> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetNetInfoRequest {});
@@ -141,7 +159,7 @@ impl PeerRestClient {
}
pub async fn get_partitions(&self) -> Result<Partitions> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetPartitionsRequest {});
@@ -162,7 +180,7 @@ impl PeerRestClient {
}
pub async fn get_os_info(&self) -> Result<OsInfo> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetOsInfoRequest {});
@@ -183,7 +201,7 @@ impl PeerRestClient {
}
pub async fn get_se_linux_info(&self) -> Result<SysService> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetSeLinuxInfoRequest {});
@@ -204,7 +222,7 @@ impl PeerRestClient {
}
pub async fn get_sys_config(&self) -> Result<SysConfig> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetSysConfigRequest {});
@@ -225,7 +243,7 @@ impl PeerRestClient {
}
pub async fn get_sys_errors(&self) -> Result<SysErrors> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetSysErrorsRequest {});
@@ -246,7 +264,7 @@ impl PeerRestClient {
}
pub async fn get_mem_info(&self) -> Result<MemInfo> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetMemInfoRequest {});
@@ -267,7 +285,7 @@ impl PeerRestClient {
}
pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result<RealtimeMetrics> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let mut buf_t = Vec::new();
@@ -295,7 +313,7 @@ impl PeerRestClient {
}
pub async fn get_proc_info(&self) -> Result<ProcInfo> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(GetProcInfoRequest {});
@@ -316,7 +334,7 @@ impl PeerRestClient {
}
pub async fn start_profiling(&self, profiler: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(StartProfilingRequest {
@@ -350,7 +368,7 @@ impl PeerRestClient {
}
pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadBucketMetadataRequest {
@@ -368,7 +386,7 @@ impl PeerRestClient {
}
pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(DeleteBucketMetadataRequest {
@@ -386,7 +404,7 @@ impl PeerRestClient {
}
pub async fn delete_policy(&self, policy: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(DeletePolicyRequest {
@@ -404,7 +422,7 @@ impl PeerRestClient {
}
pub async fn load_policy(&self, policy: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadPolicyRequest {
@@ -422,7 +440,7 @@ impl PeerRestClient {
}
pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadPolicyMappingRequest {
@@ -442,7 +460,7 @@ impl PeerRestClient {
}
pub async fn delete_user(&self, access_key: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(DeleteUserRequest {
@@ -460,7 +478,7 @@ impl PeerRestClient {
}
pub async fn delete_service_account(&self, access_key: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(DeleteServiceAccountRequest {
@@ -478,7 +496,7 @@ impl PeerRestClient {
}
pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadUserRequest {
@@ -497,7 +515,7 @@ impl PeerRestClient {
}
pub async fn load_service_account(&self, access_key: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadServiceAccountRequest {
@@ -515,7 +533,7 @@ impl PeerRestClient {
}
pub async fn load_group(&self, group: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadGroupRequest {
@@ -533,7 +551,7 @@ impl PeerRestClient {
}
pub async fn reload_site_replication_config(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(ReloadSiteReplicationConfigRequest {});
@@ -549,7 +567,7 @@ impl PeerRestClient {
}
pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let mut vars = HashMap::new();
@@ -571,7 +589,7 @@ impl PeerRestClient {
}
pub async fn background_heal_status(&self) -> Result<BgHealState> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(BackgroundHealStatusRequest {});
@@ -592,21 +610,21 @@ impl PeerRestClient {
}
pub async fn get_metacache_listing(&self) -> Result<()> {
let mut _client = node_service_time_out_client(&self.addr)
let mut _client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
todo!()
}
pub async fn update_metacache_listing(&self) -> Result<()> {
let mut _client = node_service_time_out_client(&self.addr)
let mut _client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
todo!()
}
pub async fn reload_pool_meta(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(ReloadPoolMetaRequest {});
@@ -623,7 +641,7 @@ impl PeerRestClient {
}
pub async fn stop_rebalance(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(StopRebalanceRequest {});
@@ -640,7 +658,7 @@ impl PeerRestClient {
}
pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadRebalanceMetaRequest { start_rebalance });
@@ -657,7 +675,7 @@ impl PeerRestClient {
}
pub async fn load_transition_tier_config(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LoadTransitionTierConfigRequest {});

View File

@@ -5,13 +5,15 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::error::{Error, Result};
use crate::heal::heal_commands::HealOpts;
use crate::new_object_layer_fn;
use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI, StorageDisk, StorageInfo};
use crate::notification_sys::get_global_notification_sys;
use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI};
use crate::store_err::{is_err_bucket_exists, StorageError};
use crate::utils::path::{path_join, SLASH_SEPARATOR};
use crate::{sets::Sets, store::ECStore};
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::io::{Cursor, Write};
use std::path::PathBuf;
use std::sync::Arc;
@@ -270,7 +272,7 @@ impl PoolMeta {
}
}
fn path2_bucket_object(name: &String) -> (String, String) {
fn path2_bucket_object(name: &str) -> (String, String) {
path2_bucket_object_with_base_path("", name)
}
@@ -374,11 +376,13 @@ pub struct DecomBucketInfo {
pub prefix: String,
}
impl ToString for DecomBucketInfo {
fn to_string(&self) -> String {
path_join(&[PathBuf::from(self.name.clone()), PathBuf::from(self.prefix.clone())])
.to_string_lossy()
.to_string()
impl Display for DecomBucketInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
path_join(&[PathBuf::from(self.name.clone()), PathBuf::from(self.prefix.clone())]).to_string_lossy()
)
}
}
@@ -531,7 +535,9 @@ impl ECStore {
let mut pool_meta = self.pool_meta.write().await;
if pool_meta.decommission_failed(idx) {
pool_meta.save(self.pools.clone()).await?;
// FIXME: globalNotificationSys.ReloadPoolMeta(ctx)
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.reload_pool_meta().await;
}
}
Ok(())
@@ -545,7 +551,9 @@ impl ECStore {
let mut pool_meta = self.pool_meta.write().await;
if pool_meta.decommission_complete(idx) {
pool_meta.save(self.pools.clone()).await?;
// FIXME: globalNotificationSys.ReloadPoolMeta(ctx)
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.reload_pool_meta().await;
}
}
Ok(())
@@ -641,7 +649,9 @@ impl ECStore {
pool_meta.save(self.pools.clone()).await?;
// FIXME: globalNotificationSys.ReloadPoolMeta(ctx)
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.reload_pool_meta().await;
}
Ok(())
}
@@ -670,7 +680,7 @@ impl ECStore {
}
}
fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize {
fn get_total_usable_capacity(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize {
let mut capacity = 0;
for disk in disks.iter() {
if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize {
@@ -683,7 +693,7 @@ fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize
capacity
}
fn get_total_usable_capacity_free(disks: &[StorageDisk], info: &StorageInfo) -> usize {
fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize {
let mut capacity = 0;
for disk in disks.iter() {
if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize {

View File

@@ -38,10 +38,9 @@ use crate::{
},
quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError},
store_api::{
BackendByte, BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo,
GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo,
MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader,
RawFileInfo, StorageAPI, StorageDisk, StorageInfo, DEFAULT_BITROT_ALGO,
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec,
ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, ObjectIO, ObjectInfo,
ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, RawFileInfo, StorageAPI, DEFAULT_BITROT_ALGO,
},
store_err::{to_object_err, StorageError},
store_init::{load_format_erasure, ErasureError},
@@ -3596,15 +3595,15 @@ impl ObjectIO for SetDisks {
#[async_trait::async_trait]
impl StorageAPI for SetDisks {
async fn backend_info(&self) -> BackendInfo {
async fn backend_info(&self) -> madmin::BackendInfo {
unimplemented!()
}
async fn storage_info(&self) -> StorageInfo {
async fn storage_info(&self) -> madmin::StorageInfo {
let disks = self.get_disks_internal().await;
get_storage_info(&disks, &self.set_endpoints).await
}
async fn local_storage_info(&self) -> StorageInfo {
async fn local_storage_info(&self) -> madmin::StorageInfo {
let disks = self.get_disks_internal().await;
let mut local_disks: Vec<Option<Arc<crate::disk::Disk>>> = Vec::new();
@@ -4947,13 +4946,13 @@ pub fn should_heal_object_on_disk(
(false, err.clone())
}
async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<StorageDisk> {
async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<madmin::Disk> {
let mut ret = Vec::new();
for (i, pool) in disks.iter().enumerate() {
if let Some(disk) = pool {
match disk.disk_info(&DiskInfoOptions::default()).await {
Ok(res) => ret.push(StorageDisk {
Ok(res) => ret.push(madmin::Disk {
endpoint: eps[i].to_string(),
local: eps[i].is_local,
pool_index: eps[i].pool_idx,
@@ -4984,7 +4983,7 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<St
free_inodes: res.free_inodes,
..Default::default()
}),
Err(err) => ret.push(StorageDisk {
Err(err) => ret.push(madmin::Disk {
state: err.to_string(),
endpoint: eps[i].to_string(),
local: eps[i].is_local,
@@ -4995,7 +4994,7 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<St
}),
}
} else {
ret.push(StorageDisk {
ret.push(madmin::Disk {
endpoint: eps[i].to_string(),
local: eps[i].is_local,
pool_index: eps[i].pool_idx,
@@ -5009,14 +5008,14 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<St
ret
}
async fn get_storage_info(disks: &Vec<Option<DiskStore>>, eps: &Vec<Endpoint>) -> StorageInfo {
async fn get_storage_info(disks: &Vec<Option<DiskStore>>, eps: &Vec<Endpoint>) -> madmin::StorageInfo {
let mut disks = get_disks_info(disks, eps).await;
disks.sort_by(|a, b| a.total_space.cmp(&b.total_space));
StorageInfo {
madmin::StorageInfo {
disks,
backend: BackendInfo {
backend_type: BackendByte::Erasure,
backend: madmin::BackendInfo {
backend_type: madmin::BackendByte::Erasure,
..Default::default()
},
}

View File

@@ -23,9 +23,9 @@ use crate::{
},
set_disk::SetDisks,
store_api::{
BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult,
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, StorageInfo,
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
},
store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file},
utils::hash,
@@ -294,10 +294,10 @@ impl ObjectIO for Sets {
#[async_trait::async_trait]
impl StorageAPI for Sets {
async fn backend_info(&self) -> BackendInfo {
async fn backend_info(&self) -> madmin::BackendInfo {
unimplemented!()
}
async fn storage_info(&self) -> StorageInfo {
async fn storage_info(&self) -> madmin::StorageInfo {
let mut futures = Vec::with_capacity(self.disk_set.len());
for set in self.disk_set.iter() {
@@ -312,12 +312,12 @@ impl StorageAPI for Sets {
disks.extend_from_slice(&res.disks);
}
StorageInfo {
madmin::StorageInfo {
disks,
..Default::default()
}
}
async fn local_storage_info(&self) -> StorageInfo {
async fn local_storage_info(&self) -> madmin::StorageInfo {
let mut futures = Vec::with_capacity(self.disk_set.len());
for set in self.disk_set.iter() {
@@ -331,7 +331,7 @@ impl StorageAPI for Sets {
for res in results.into_iter() {
disks.extend_from_slice(&res.disks);
}
StorageInfo {
madmin::StorageInfo {
disks,
..Default::default()
}

View File

@@ -15,10 +15,9 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::new_object_layer_fn;
use crate::notification_sys::get_global_notification_sys;
use crate::pools::PoolMeta;
use crate::store_api::{
BackendByte, BackendDisks, BackendInfo, ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO, StorageInfo,
};
use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO};
use crate::store_err::{
is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found,
to_object_err, StorageError,
@@ -1120,7 +1119,7 @@ lazy_static! {
#[async_trait::async_trait]
impl StorageAPI for ECStore {
async fn backend_info(&self) -> BackendInfo {
async fn backend_info(&self) -> madmin::BackendInfo {
let (standard_sc_parity, rr_sc_parity) = {
if let Some(sc) = GLOBAL_StorageClass.get() {
let sc_parity = sc
@@ -1151,10 +1150,10 @@ impl StorageAPI for ECStore {
drives_per_set.push(*set_count);
}
BackendInfo {
backend_type: BackendByte::Erasure,
online_disks: BackendDisks::new(),
offline_disks: BackendDisks::new(),
madmin::BackendInfo {
backend_type: madmin::BackendByte::Erasure,
online_disks: madmin::BackendDisks::new(),
offline_disks: madmin::BackendDisks::new(),
standard_sc_data,
standard_sc_parity,
rr_sc_data,
@@ -1164,11 +1163,14 @@ impl StorageAPI for ECStore {
..Default::default()
}
}
async fn storage_info(&self) -> StorageInfo {
// FIXME: globalNotificationSys.StorageInfo
unimplemented!()
async fn storage_info(&self) -> madmin::StorageInfo {
let Some(notification_sy) = get_global_notification_sys() else {
return madmin::StorageInfo::default();
};
notification_sy.storage_info(self).await
}
async fn local_storage_info(&self) -> StorageInfo {
async fn local_storage_info(&self) -> madmin::StorageInfo {
let mut futures = Vec::with_capacity(self.pools.len());
for pool in self.pools.iter() {
@@ -1184,7 +1186,7 @@ impl StorageAPI for ECStore {
}
let backend = self.backend_info().await;
StorageInfo { backend, disks }
madmin::StorageInfo { backend, disks }
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {

View File

@@ -1,4 +1,3 @@
use crate::heal::heal_commands::HealingDisk;
use crate::heal::heal_ops::HealSequence;
use crate::{
disk::DiskStore,
@@ -9,7 +8,6 @@ use crate::{
};
use futures::StreamExt;
use http::HeaderMap;
use madmin::info_commands::DiskMetrics;
use rmp_serde::Serializer;
use s3s::{dto::StreamingBlob, Body};
use serde::{Deserialize, Serialize};
@@ -333,7 +331,7 @@ impl ErasureInfo {
// 算出每个分片大小
pub fn shard_size(&self, data_size: usize) -> usize {
(data_size + self.data_blocks - 1) / self.data_blocks
data_size.div_ceil(self.data_blocks)
}
// returns final erasure size from original size.
@@ -344,7 +342,7 @@ impl ErasureInfo {
let num_shards = total_size / self.block_size;
let last_block_size = total_size % self.block_size;
let last_shard_size = (last_block_size + self.data_blocks - 1) / self.data_blocks;
let last_shard_size = last_block_size.div_ceil(self.data_blocks);
num_shards * self.shard_size(self.block_size) + last_shard_size
// // 因为写入的时候ec需要补全所以最后一个长度应该也是一样的
@@ -804,84 +802,6 @@ pub struct DeletedObject {
// pub replication_state: ReplicationState,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub enum BackendByte {
#[default]
Unknown,
FS,
Erasure,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StorageDisk {
pub endpoint: String,
pub root_disk: bool,
pub drive_path: String,
pub healing: bool,
pub scanning: bool,
pub state: String,
pub uuid: String,
pub major: u32,
pub minor: u32,
pub model: Option<String>,
pub total_space: u64,
pub used_space: u64,
pub available_space: u64,
pub read_throughput: f64,
pub write_throughput: f64,
pub read_latency: f64,
pub write_latency: f64,
pub utilization: f64,
pub metrics: Option<DiskMetrics>,
pub heal_info: Option<HealingDisk>,
pub used_inodes: u64,
pub free_inodes: u64,
pub local: bool,
pub pool_index: i32,
pub set_index: i32,
pub disk_index: i32,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct StorageInfo {
pub disks: Vec<StorageDisk>,
pub backend: BackendInfo,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BackendDisks(HashMap<String, usize>);
impl BackendDisks {
pub fn new() -> Self {
Self(HashMap::new())
}
pub fn sum(&self) -> usize {
self.0.values().sum()
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase", default)]
pub struct BackendInfo {
pub backend_type: BackendByte,
pub online_disks: BackendDisks,
pub offline_disks: BackendDisks,
#[serde(rename = "StandardSCData")]
pub standard_sc_data: Vec<usize>,
#[serde(rename = "StandardSCParities")]
pub standard_sc_parities: Vec<usize>,
#[serde(rename = "StandardSCParity")]
pub standard_sc_parity: Option<usize>,
#[serde(rename = "RRSCData")]
pub rr_sc_data: Vec<usize>,
#[serde(rename = "RRSCParities")]
pub rr_sc_parities: Vec<usize>,
#[serde(rename = "RRSCParity")]
pub rr_sc_parity: Option<usize>,
pub total_sets: Vec<usize>,
pub drives_per_set: Vec<usize>,
}
pub struct ListObjectVersionsInfo {
pub is_truncated: bool,
pub next_marker: String,
@@ -906,14 +826,15 @@ pub trait ObjectIO: Send + Sync + 'static {
}
#[async_trait::async_trait]
#[allow(clippy::too_many_arguments)]
pub trait StorageAPI: ObjectIO {
// NewNSLock TODO:
// Shutdown TODO:
// NSScanner TODO:
async fn backend_info(&self) -> BackendInfo;
async fn storage_info(&self) -> StorageInfo;
async fn local_storage_info(&self) -> StorageInfo;
async fn backend_info(&self) -> madmin::BackendInfo;
async fn storage_info(&self) -> madmin::StorageInfo;
async fn local_storage_info(&self) -> madmin::StorageInfo;
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
@@ -953,7 +874,7 @@ pub trait StorageAPI: ObjectIO {
objects: Vec<ObjectToDelete>,
opts: ObjectOptions,
) -> Result<(Vec<DeletedObject>, Vec<Option<Error>>)>;
#[warn(clippy::too_many_arguments)]
// TransitionObject TODO:
// RestoreTransitionedObject TODO:

View File

@@ -208,7 +208,7 @@ pub fn is_err_object_not_found(err: &Error) -> bool {
fn test_storage_error() {
let e1 = Error::new(StorageError::BucketExists("ss".into()));
let e2 = Error::new(StorageError::ObjectNotFound("ss".into(), "sdf".to_owned()));
assert_eq!(is_err_bucket_exists(&e1), true);
assert_eq!(is_err_object_not_found(&e1), false);
assert_eq!(is_err_object_not_found(&e2), true);
assert!(is_err_bucket_exists(&e1));
assert!(!is_err_object_not_found(&e1));
assert!(is_err_object_not_found(&e2));
}

View File

@@ -2,6 +2,7 @@ use crate::error::{Error, Result};
use lazy_static::lazy_static;
use std::{
collections::HashSet,
fmt::Display,
net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs},
};
@@ -105,27 +106,38 @@ pub(crate) fn must_get_local_ips() -> Result<Vec<IpAddr>> {
}
}
#[derive(Debug, Clone)]
pub struct XHost {
pub name: String,
pub port: u16,
pub is_port_set: bool,
}
impl ToString for XHost {
fn to_string(&self) -> String {
impl Display for XHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if !self.is_port_set {
self.name.clone()
write!(f, "{}", self.name)
} else if self.name.contains(':') {
write!(f, "[{}]:{}", self.name, self.port)
} else {
join_host_port(&self.name, self.port)
write!(f, "{}:{}", self.name, self.port)
}
}
}
fn join_host_port(host: &str, port: u16) -> String {
if host.contains(':') {
format!("[{}]:{}", host, port)
} else {
format!("{}:{}", host, port)
impl TryFrom<String> for XHost {
type Error = std::io::Error;
fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
if let Some(addr) = value.to_socket_addrs()?.next() {
Ok(Self {
name: addr.ip().to_string(),
port: addr.port(),
is_port_set: addr.port() > 0,
})
} else {
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "value invalid"))
}
}
}

View File

@@ -77,6 +77,6 @@ pub fn same_disk(disk1: &str, disk2: &str) -> Result<bool> {
Ok(stat1.st_dev == stat2.st_dev)
}
pub fn get_drive_stats(major: u32, minor: u32) -> Result<IOStats> {
pub fn get_drive_stats(_major: u32, _minor: u32) -> Result<IOStats> {
Ok(IOStats::default())
}

View File

@@ -11,3 +11,4 @@ chrono.workspace = true
common.workspace = true
psutil = "3.3.0"
serde.workspace = true
time.workspace =true

View File

@@ -1,9 +1,36 @@
use std::collections::HashMap;
use std::{collections::HashMap, time::SystemTime};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::metrics::TimedAction;
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum ItemState {
Offline,
Initializing,
Online,
}
impl ItemState {
pub fn to_string(&self) -> &str {
match self {
ItemState::Offline => "offline",
ItemState::Initializing => "initializing",
ItemState::Online => "online",
}
}
pub fn from_string(s: &str) -> Option<ItemState> {
match s {
"offline" => Some(ItemState::Offline),
"initializing" => Some(ItemState::Initializing),
"online" => Some(ItemState::Online),
_ => None,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DiskMetrics {
pub last_minute: HashMap<String, TimedAction>,
@@ -14,3 +41,110 @@ pub struct DiskMetrics {
pub total_writes: u64,
pub total_deletes: u64,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct Disk {
pub endpoint: String,
pub root_disk: bool,
pub drive_path: String,
pub healing: bool,
pub scanning: bool,
pub state: String,
pub uuid: String,
pub major: u32,
pub minor: u32,
pub model: Option<String>,
pub total_space: u64,
pub used_space: u64,
pub available_space: u64,
pub read_throughput: f64,
pub write_throughput: f64,
pub read_latency: f64,
pub write_latency: f64,
pub utilization: f64,
pub metrics: Option<DiskMetrics>,
pub heal_info: Option<HealingDisk>,
pub used_inodes: u64,
pub free_inodes: u64,
pub local: bool,
pub pool_index: i32,
pub set_index: i32,
pub disk_index: i32,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct HealingDisk {
pub id: String,
pub heal_id: String,
pub pool_index: Option<usize>,
pub set_index: Option<usize>,
pub disk_index: Option<usize>,
pub endpoint: String,
pub path: String,
pub started: Option<OffsetDateTime>,
pub last_update: Option<SystemTime>,
pub retry_attempts: u64,
pub objects_total_count: u64,
pub objects_total_size: u64,
pub items_healed: u64,
pub items_failed: u64,
pub item_skipped: u64,
pub bytes_done: u64,
pub bytes_failed: u64,
pub bytes_skipped: u64,
pub objects_healed: u64,
pub objects_failed: u64,
pub bucket: String,
pub object: String,
pub queue_buckets: Vec<String>,
pub healed_buckets: Vec<String>,
pub finished: bool,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub enum BackendByte {
#[default]
Unknown,
FS,
Erasure,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct StorageInfo {
pub disks: Vec<Disk>,
pub backend: BackendInfo,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BackendDisks(HashMap<String, usize>);
impl BackendDisks {
pub fn new() -> Self {
Self(HashMap::new())
}
pub fn sum(&self) -> usize {
self.0.values().sum()
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase", default)]
pub struct BackendInfo {
pub backend_type: BackendByte,
pub online_disks: BackendDisks,
pub offline_disks: BackendDisks,
#[serde(rename = "StandardSCData")]
pub standard_sc_data: Vec<usize>,
#[serde(rename = "StandardSCParities")]
pub standard_sc_parities: Vec<usize>,
#[serde(rename = "StandardSCParity")]
pub standard_sc_parity: Option<usize>,
#[serde(rename = "RRSCData")]
pub rr_sc_data: Vec<usize>,
#[serde(rename = "RRSCParities")]
pub rr_sc_parities: Vec<usize>,
#[serde(rename = "RRSCParity")]
pub rr_sc_parity: Option<usize>,
pub total_sets: Vec<usize>,
pub drives_per_set: Vec<usize>,
}

View File

@@ -2,3 +2,5 @@ pub mod health;
pub mod info_commands;
pub mod metrics;
pub mod net;
pub use info_commands::*;

View File

@@ -168,7 +168,7 @@ impl ScannerMetrics {
self.last_minute.ilm.entry(k.clone()).or_default().merge(v);
}
self.active_paths.extend(other.active_paths.clone().into_iter());
self.active_paths.extend(other.active_paths.clone());
self.active_paths.sort();
}
@@ -606,14 +606,14 @@ pub struct RealtimeMetrics {
impl RealtimeMetrics {
pub fn merge(&mut self, other: Self) {
if !other.errors.is_empty() {
self.errors.extend(other.errors.into_iter());
self.errors.extend(other.errors);
}
for (k, v) in other.by_host.into_iter() {
*self.by_host.entry(k).or_default() = v;
}
self.hosts.extend(other.hosts.into_iter());
self.hosts.extend(other.hosts);
self.aggregated.merge(&other.aggregated);
self.hosts.sort();

View File

@@ -10,6 +10,7 @@ use ecstore::global::GLOBAL_ALlHealState;
use ecstore::heal::heal_commands::HealOpts;
use ecstore::heal::heal_ops::new_heal_sequence;
use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType};
use ecstore::new_object_layer_fn;
use ecstore::peer::is_reserved_or_invalid_bucket;
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::StorageAPI;
@@ -17,7 +18,6 @@ use ecstore::utils::path::path_join;
use ecstore::utils::time::parse_duration;
use ecstore::utils::xml;
use ecstore::GLOBAL_Endpoints;
use ecstore::{new_object_layer_fn, store_api::BackendInfo};
use futures::{Stream, StreamExt};
use http::Uri;
use hyper::StatusCode;
@@ -37,7 +37,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration as std_Duration;
use std::u64;
use time::{Duration, OffsetDateTime};
use tokio::sync::mpsc::{self};
use tokio::time::interval;
@@ -128,7 +127,7 @@ impl Operation for AssumeRoleHandle {
#[serde(rename_all = "PascalCase", default)]
pub struct AccountInfo {
pub account_name: String,
pub server: BackendInfo,
pub server: madmin::BackendInfo,
pub policy: BucketPolicy,
}
@@ -224,7 +223,18 @@ impl Operation for StorageInfoHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle StorageInfoHandler");
return Err(s3_error!(NotImplemented));
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// TODO:getAggregatedBackgroundHealState
let info = store.storage_info().await;
let output = serde_json::to_string(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}
@@ -492,15 +502,11 @@ fn extract_heal_init_params(body: &Bytes, uri: &Uri, params: Params<'_, '_>) ->
hip.client_token = value.to_string();
}
}
if key == "forceStart" {
if parts.next().is_some() {
hip.force_start = true;
}
if key == "forceStart" && parts.next().is_some() {
hip.force_start = true;
}
if key == "forceStop" {
if parts.next().is_some() {
hip.force_stop = true;
}
if key == "forceStop" && parts.next().is_some() {
hip.force_stop = true;
}
}
}

View File

@@ -4,10 +4,6 @@ use ecstore::global::DEFAULT_PORT;
shadow_rs::shadow!(build);
/// Default port that a rustfs server listens on.
///
/// Used if no port is specified.
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";

View File

@@ -8,7 +8,6 @@ use common::{
error::{Error, Result},
globals::set_global_addr,
};
use ecstore::global::set_global_rustfs_port;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::utils::net::{self, get_available_port};
use ecstore::{
@@ -18,6 +17,7 @@ use ecstore::{
store::{init_local_disks, ECStore},
update_erasure_type,
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
use grpc::make_server;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
@@ -212,10 +212,10 @@ async fn run(opt: config::Opt) -> Result<()> {
})?;
warn!(" init store success!");
// new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
// error!("new_global_notification_sys faild {:?}", &err);
// Error::from_string(err.to_string())
// })?;
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
error!("new_global_notification_sys faild {:?}", &err);
Error::from_string(err.to_string())
})?;
// init scanner
init_data_scanner().await;

View File

@@ -591,8 +591,6 @@ impl S3 for FS {
.await
.map_err(to_s3_error)?;
error!("ObjectOptions {:?}", opts);
let obj_info = store
.put_object(&bucket, &key, &mut reader, &opts)
.await