diff --git a/Cargo.lock b/Cargo.lock index 2adf399b..3e7dd830 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6204,7 +6204,9 @@ dependencies = [ "clap", "const-str", "datafusion", + "flatbuffers", "futures", + "futures-util", "http 1.3.1", "http-body 1.0.1", "hyper 1.7.0", @@ -6219,6 +6221,7 @@ dependencies = [ "pin-project-lite", "pprof", "reqwest", + "rmp-serde", "rust-embed", "rustfs-ahm", "rustfs-appauth", @@ -6229,6 +6232,7 @@ dependencies = [ "rustfs-filemeta", "rustfs-iam", "rustfs-kms", + "rustfs-lock", "rustfs-madmin", "rustfs-notify", "rustfs-obs", diff --git a/crates/ecstore/src/notification_sys.rs b/crates/ecstore/src/notification_sys.rs index 7e629588..a4c45001 100644 --- a/crates/ecstore/src/notification_sys.rs +++ b/crates/ecstore/src/notification_sys.rs @@ -16,11 +16,17 @@ use crate::StorageAPI; use crate::admin_server_info::get_commit_id; use crate::error::{Error, Result}; use crate::global::{GLOBAL_BOOT_TIME, get_global_endpoints}; +use crate::metrics_realtime::{CollectMetricsOpts, MetricType}; use crate::rpc::PeerRestClient; use crate::{endpoints::EndpointServerPools, new_object_layer_fn}; use futures::future::join_all; use lazy_static::lazy_static; +use rustfs_madmin::health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService}; +use rustfs_madmin::metrics::RealtimeMetrics; +use rustfs_madmin::net::NetInfo; use rustfs_madmin::{ItemState, ServerProperties}; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; use std::sync::OnceLock; use std::time::SystemTime; use tracing::{error, warn}; @@ -62,21 +68,122 @@ pub struct NotificationPeerErr { } impl NotificationSys { - pub fn rest_client_from_hash(&self, _s: &str) -> Option { - None - } - pub async fn delete_policy(&self) -> Vec { - unimplemented!() - } - pub async fn load_policy(&self) -> Vec { - unimplemented!() + pub fn rest_client_from_hash(&self, s: &str) -> Option { + if self.all_peer_clients.is_empty() { + return None; + } + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + let idx = (hasher.finish() as usize) % self.all_peer_clients.len(); + self.all_peer_clients[idx].clone() } - pub async fn load_policy_mapping(&self) -> Vec { - unimplemented!() + pub async fn delete_policy(&self, policy_name: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let policy = policy_name.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.delete_policy(&policy).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await } - pub async fn delete_user(&self) -> Vec { - unimplemented!() + + pub async fn load_policy(&self, policy_name: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let policy = policy_name.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.load_policy(&policy).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let uog = user_or_group.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.load_policy_mapping(&uog, user_type, is_group).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn delete_user(&self, access_key: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let ak = access_key.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.delete_user(&ak).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await } pub async fn storage_info(&self, api: &S) -> rustfs_madmin::StorageInfo { @@ -140,6 +247,114 @@ impl NotificationSys { join_all(futures).await } + pub async fn load_user(&self, access_key: &str, temp: bool) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let ak = access_key.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.load_user(&ak, temp).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn load_group(&self, group: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let gname = group.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.load_group(&gname).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn delete_service_account(&self, access_key: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let ak = access_key.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.delete_service_account(&ak).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn load_service_account(&self, access_key: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let ak = access_key.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.load_service_account(&ak).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + pub async fn reload_pool_meta(&self) { let mut futures = Vec::with_capacity(self.peer_clients.len()); for client in self.peer_clients.iter().flatten() { @@ -202,6 +417,281 @@ impl NotificationSys { let _ = store.stop_rebalance().await; warn!("notification stop_rebalance stop_rebalance done"); } + + pub async fn load_bucket_metadata(&self, bucket: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let b = bucket.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.load_bucket_metadata(&b).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn delete_bucket_metadata(&self, bucket: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let b = bucket.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.delete_bucket_metadata(&b).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn start_profiling(&self, profiler: &str) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + let pf = profiler.to_string(); + futures.push(async move { + if let Some(client) = client { + match client.start_profiling(&pf).await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn get_cpus(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_cpus().await.unwrap_or_default() + } else { + Cpus::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_net_info(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_net_info().await.unwrap_or_default() + } else { + NetInfo::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_partitions(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_partitions().await.unwrap_or_default() + } else { + Partitions::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_os_info(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_os_info().await.unwrap_or_default() + } else { + OsInfo::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_sys_services(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_se_linux_info().await.unwrap_or_default() + } else { + SysService::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_sys_config(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_sys_config().await.unwrap_or_default() + } else { + SysConfig::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_sys_errors(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_sys_errors().await.unwrap_or_default() + } else { + SysErrors::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_mem_info(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_mem_info().await.unwrap_or_default() + } else { + MemInfo::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_proc_info(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + futures.push(async move { + if let Some(client) = client { + client.get_proc_info().await.unwrap_or_default() + } else { + ProcInfo::default() + } + }); + } + join_all(futures).await + } + + pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().cloned() { + let t_clone = t; + let opts_clone = opts; + futures.push(async move { + if let Some(client) = client { + client.get_metrics(t_clone, opts_clone).await.unwrap_or_default() + } else { + RealtimeMetrics::default() + } + }); + } + join_all(futures).await + } + + pub async fn reload_site_replication_config(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + futures.push(async move { + if let Some(client) = client { + match client.reload_site_replication_config().await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } + + pub async fn load_transition_tier_config(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter() { + futures.push(async move { + if let Some(client) = client { + match client.load_transition_tier_config().await { + Ok(_) => NotificationPeerErr { + host: client.host.to_string(), + err: None, + }, + Err(e) => NotificationPeerErr { + host: client.host.to_string(), + err: Some(e), + }, + } + } else { + NotificationPeerErr { + host: "".to_string(), + err: Some(Error::other("peer is not reachable")), + } + } + }); + } + join_all(futures).await + } } fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec { diff --git a/crates/ecstore/src/rpc/mod.rs b/crates/ecstore/src/rpc/mod.rs index 53d3a1b7..4d140209 100644 --- a/crates/ecstore/src/rpc/mod.rs +++ b/crates/ecstore/src/rpc/mod.rs @@ -16,10 +16,8 @@ mod http_auth; mod peer_rest_client; mod peer_s3_client; mod remote_disk; -mod tonic_service; pub use http_auth::{build_auth_headers, verify_rpc_signature}; pub use peer_rest_client::PeerRestClient; pub use peer_s3_client::{LocalPeerS3Client, PeerS3Client, RemotePeerS3Client, S3PeerSys}; pub use remote_disk::RemoteDisk; -pub use tonic_service::{NodeService, make_server}; diff --git a/crates/iam/src/lib.rs b/crates/iam/src/lib.rs index c3ca5656..ebefb72f 100644 --- a/crates/iam/src/lib.rs +++ b/crates/iam/src/lib.rs @@ -43,3 +43,7 @@ pub async fn init_iam_sys(ecstore: Arc) -> Result<()> { pub fn get() -> Result>> { IAM_SYS.get().map(Arc::clone).ok_or(Error::IamSysNotInitialized) } + +pub fn get_global_iam_sys() -> Option>> { + IAM_SYS.get().cloned() +} diff --git a/crates/iam/src/store.rs b/crates/iam/src/store.rs index 0620445f..9d26bde6 100644 --- a/crates/iam/src/store.rs +++ b/crates/iam/src/store.rs @@ -23,6 +23,7 @@ use time::OffsetDateTime; #[async_trait::async_trait] pub trait Store: Clone + Send + Sync + 'static { + fn has_watcher(&self) -> bool; async fn save_iam_config(&self, item: Item, path: impl AsRef + Send) -> Result<()>; async fn load_iam_config(&self, path: impl AsRef + Send) -> Result; async fn delete_iam_config(&self, path: impl AsRef + Send) -> Result<()>; @@ -89,6 +90,24 @@ impl UserType { UserType::None => "", } } + pub fn to_u64(&self) -> u64 { + match self { + UserType::Svc => 1, + UserType::Sts => 2, + UserType::Reg => 3, + UserType::None => 0, + } + } + + pub fn from_u64(u64: u64) -> Option { + match u64 { + 1 => Some(UserType::Svc), + 2 => Some(UserType::Sts), + 3 => Some(UserType::Reg), + 0 => Some(UserType::None), + _ => None, + } + } } #[derive(Serialize, Deserialize, Clone)] diff --git a/crates/iam/src/store/object.rs b/crates/iam/src/store/object.rs index 9d630f31..d936c4cc 100644 --- a/crates/iam/src/store/object.rs +++ b/crates/iam/src/store/object.rs @@ -380,6 +380,9 @@ impl ObjectStore { #[async_trait::async_trait] impl Store for ObjectStore { + fn has_watcher(&self) -> bool { + false + } async fn load_iam_config(&self, path: impl AsRef + Send) -> Result { let mut data = read_config(self.object_api.clone(), path.as_ref()).await?; diff --git a/crates/iam/src/sys.rs b/crates/iam/src/sys.rs index abcdc13a..8aa0bc30 100644 --- a/crates/iam/src/sys.rs +++ b/crates/iam/src/sys.rs @@ -25,6 +25,7 @@ use crate::store::Store; use crate::store::UserType; use crate::utils::extract_claims; use rustfs_ecstore::global::get_global_action_cred; +use rustfs_ecstore::notification_sys::get_global_notification_sys; use rustfs_madmin::AddOrUpdateUserReq; use rustfs_madmin::GroupDesc; use rustfs_policy::arn::ARN; @@ -41,6 +42,7 @@ use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use time::OffsetDateTime; +use tracing::warn; pub const MAX_SVCSESSION_POLICY_SIZE: usize = 4096; @@ -63,6 +65,9 @@ impl IamSys { roles_map: HashMap::new(), } } + pub fn has_watcher(&self) -> bool { + self.store.api.has_watcher() + } pub async fn load_group(&self, name: &str) -> Result<()> { self.store.group_notification_handler(name).await @@ -104,8 +109,17 @@ impl IamSys { self.store.delete_policy(name, notify).await?; - if notify { - // TODO: implement notification + if !notify || self.has_watcher() { + return Ok(()); + } + + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.delete_policy(name).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify delete_policy failed: {}", err); + } + } } Ok(()) @@ -142,9 +156,20 @@ impl IamSys { } pub async fn set_policy(&self, name: &str, policy: Policy) -> Result { - self.store.set_policy(name, policy).await + let updated_at = self.store.set_policy(name, policy).await?; - // TODO: notification + if !self.has_watcher() { + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.load_policy(name).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify load_policy failed: {}", err); + } + } + } + } + + Ok(updated_at) } pub async fn get_role_policy(&self, arn_str: &str) -> Result<(ARN, String)> { @@ -159,9 +184,51 @@ impl IamSys { Ok((arn, policy.clone())) } - pub async fn delete_user(&self, name: &str, _notify: bool) -> Result<()> { - self.store.delete_user(name, UserType::Reg).await - // TODO: notification + pub async fn delete_user(&self, name: &str, notify: bool) -> Result<()> { + self.store.delete_user(name, UserType::Reg).await?; + + if notify && !self.has_watcher() { + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.delete_user(name).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify delete_user failed: {}", err); + } + } + } + } + + Ok(()) + } + + async fn notify_for_user(&self, name: &str, is_temp: bool) { + if self.has_watcher() { + return; + } + + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.load_user(name, is_temp).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify load_user failed: {}", err); + } + } + } + } + + async fn notify_for_service_account(&self, name: &str) { + if self.has_watcher() { + return; + } + + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.load_service_account(name).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify load_service_account failed: {}", err); + } + } + } } pub async fn current_policies(&self, name: &str) -> String { @@ -177,8 +244,11 @@ impl IamSys { } pub async fn set_temp_user(&self, name: &str, cred: &Credentials, policy_name: Option<&str>) -> Result { - self.store.set_temp_user(name, cred, policy_name).await - // TODO: notification + let updated_at = self.store.set_temp_user(name, cred, policy_name).await?; + + self.notify_for_user(&cred.access_key, true).await; + + Ok(updated_at) } pub async fn is_temp_user(&self, name: &str) -> Result<(bool, String)> { @@ -208,8 +278,11 @@ impl IamSys { } pub async fn set_user_status(&self, name: &str, status: rustfs_madmin::AccountStatus) -> Result { - self.store.set_user_status(name, status).await - // TODO: notification + let updated_at = self.store.set_user_status(name, status).await?; + + self.notify_for_user(name, false).await; + + Ok(updated_at) } pub async fn new_service_account( @@ -294,14 +367,17 @@ impl IamSys { let create_at = self.store.add_service_account(cred.clone()).await?; + self.notify_for_service_account(&cred.access_key).await; + Ok((cred, create_at)) - // TODO: notification } pub async fn update_service_account(&self, name: &str, opts: UpdateServiceAccountOpts) -> Result { - self.store.update_service_account(name, opts).await + let updated_at = self.store.update_service_account(name, opts).await?; - // TODO: notification + self.notify_for_service_account(name).await; + + Ok(updated_at) } pub async fn list_service_accounts(&self, access_key: &str) -> Result> { @@ -424,7 +500,7 @@ impl IamSys { extract_jwt_claims(&u) } - pub async fn delete_service_account(&self, access_key: &str, _notify: bool) -> Result<()> { + pub async fn delete_service_account(&self, access_key: &str, notify: bool) -> Result<()> { let Some(u) = self.store.get_user(access_key).await else { return Ok(()); }; @@ -433,9 +509,35 @@ impl IamSys { return Ok(()); } - self.store.delete_user(access_key, UserType::Svc).await + self.store.delete_user(access_key, UserType::Svc).await?; - // TODO: notification + if notify && !self.has_watcher() { + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.delete_service_account(access_key).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify delete_service_account failed: {}", err); + } + } + } + } + + Ok(()) + } + + async fn notify_for_group(&self, group: &str) { + if self.has_watcher() { + return; + } + + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.load_group(group).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify load_group failed: {}", err); + } + } + } } pub async fn create_user(&self, access_key: &str, args: &AddOrUpdateUserReq) -> Result { @@ -451,8 +553,11 @@ impl IamSys { return Err(IamError::InvalidSecretKeyLength); } - self.store.add_user(access_key, args).await - // TODO: notification + let updated_at = self.store.add_user(access_key, args).await?; + + self.notify_for_user(access_key, false).await; + + Ok(updated_at) } pub async fn set_user_secret_key(&self, access_key: &str, secret_key: &str) -> Result<()> { @@ -495,18 +600,27 @@ impl IamSys { if contains_reserved_chars(group) { return Err(IamError::GroupNameContainsReservedChars); } - self.store.add_users_to_group(group, users).await - // TODO: notification + let updated_at = self.store.add_users_to_group(group, users).await?; + + self.notify_for_group(group).await; + + Ok(updated_at) } pub async fn remove_users_from_group(&self, group: &str, users: Vec) -> Result { - self.store.remove_users_from_group(group, users).await - // TODO: notification + let updated_at = self.store.remove_users_from_group(group, users).await?; + + self.notify_for_group(group).await; + + Ok(updated_at) } pub async fn set_group_status(&self, group: &str, enable: bool) -> Result { - self.store.set_group_status(group, enable).await - // TODO: notification + let updated_at = self.store.set_group_status(group, enable).await?; + + self.notify_for_group(group).await; + + Ok(updated_at) } pub async fn get_group_description(&self, group: &str) -> Result { self.store.get_group_description(group).await @@ -517,8 +631,20 @@ impl IamSys { } pub async fn policy_db_set(&self, name: &str, user_type: UserType, is_group: bool, policy: &str) -> Result { - self.store.policy_db_set(name, user_type, is_group, policy).await - // TODO: notification + let updated_at = self.store.policy_db_set(name, user_type, is_group, policy).await?; + + if !self.has_watcher() { + if let Some(notification_sys) = get_global_notification_sys() { + let resp = notification_sys.load_policy_mapping(name, user_type.to_u64(), is_group).await; + for r in resp { + if let Some(err) = r.err { + warn!("notify load_policy failed: {}", err); + } + } + } + } + + Ok(updated_at) } pub async fn policy_db_get(&self, name: &str, groups: &Option>) -> Result> { diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 4705a94a..b6547458 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -117,6 +117,10 @@ url = { workspace = true } urlencoding = { workspace = true } uuid = { workspace = true } zip = { workspace = true } +futures-util.workspace = true +rmp-serde.workspace = true +flatbuffers.workspace = true +rustfs-lock.workspace = true [target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies] sysctl = { workspace = true } diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index cb6512a0..a916c45d 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -18,6 +18,7 @@ use crate::auth::IAMAuth; use crate::config; use crate::server::{ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer}; use crate::storage; +use crate::storage::tonic_service::make_server; use bytes::Bytes; use http::{HeaderMap, Request as HttpRequest, Response}; use hyper_util::{ @@ -27,7 +28,6 @@ use hyper_util::{ service::TowerToHyperService, }; use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; -use rustfs_ecstore::rpc::make_server; use rustfs_obs::SystemObserver; use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_utils::net::parse_and_resolve_address; diff --git a/rustfs/src/storage/mod.rs b/rustfs/src/storage/mod.rs index 7f4ab203..86c5b977 100644 --- a/rustfs/src/storage/mod.rs +++ b/rustfs/src/storage/mod.rs @@ -16,3 +16,4 @@ pub mod access; pub mod ecfs; // pub mod error; pub mod options; +pub mod tonic_service; diff --git a/crates/ecstore/src/rpc/tonic_service.rs b/rustfs/src/storage/tonic_service.rs similarity index 97% rename from crates/ecstore/src/rpc/tonic_service.rs rename to rustfs/src/storage/tonic_service.rs index 522ffb84..a6f3f8c1 100644 --- a/crates/ecstore/src/rpc/tonic_service.rs +++ b/rustfs/src/storage/tonic_service.rs @@ -15,7 +15,9 @@ use std::{collections::HashMap, io::Cursor, pin::Pin, sync::Arc}; // use common::error::Error as EcsError; -use crate::{ +use futures::Stream; +use futures_util::future::join_all; +use rustfs_ecstore::{ admin_server_info::get_local_server_property, bucket::{metadata::load_bucket_metadata, metadata_sys}, disk::{ @@ -28,14 +30,13 @@ use crate::{ store::{all_local_disk_path, find_local_disk}, store_api::{BucketOptions, DeleteBucketOptions, MakeBucketOptions, StorageAPI}, }; -use futures::Stream; -use futures_util::future::join_all; use rustfs_common::{globals::GLOBAL_Local_Node_Name, heal_channel::HealOpts}; use bytes::Bytes; use rmp_serde::{Deserializer, Serializer}; use rustfs_filemeta::{FileInfo, MetacacheReader}; +use rustfs_iam::{get_global_iam_sys, store::UserType}; use rustfs_lock::{LockClient, LockRequest}; use rustfs_madmin::health::{ get_cpus, get_mem_info, get_os_info, get_partitions, get_proc_info, get_sys_config, get_sys_errors, get_sys_services, @@ -1917,14 +1918,24 @@ impl Node for NodeService { })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(DeletePolicyResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + let resp = iam_sys.delete_policy(&policy, false).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(DeletePolicyResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + Ok(tonic::Response::new(DeletePolicyResponse { + success: true, + error_info: None, + })) } async fn load_policy(&self, request: Request) -> Result, Status> { @@ -1936,13 +1947,24 @@ impl Node for NodeService { error_info: Some("policy name is missing".to_string()), })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(LoadPolicyResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + + let resp = iam_sys.load_policy(&policy).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(LoadPolicyResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + Ok(tonic::Response::new(LoadPolicyResponse { + success: true, + error_info: None, + })) } async fn load_policy_mapping( @@ -1957,15 +1979,30 @@ impl Node for NodeService { error_info: Some("user_or_group name is missing".to_string()), })); } - let _user_type = request.user_type; - let _is_group = request.is_group; - let Some(_store) = new_object_layer_fn() else { + let Some(user_type) = UserType::from_u64(request.user_type) else { + return Ok(tonic::Response::new(LoadPolicyMappingResponse { + success: false, + error_info: Some("invalid user type".to_string()), + })); + }; + let is_group = request.is_group; + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(LoadPolicyMappingResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + let resp = iam_sys.load_policy_mapping(&user_or_group, user_type, is_group).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(LoadPolicyMappingResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + Ok(tonic::Response::new(LoadPolicyMappingResponse { + success: true, + error_info: None, + })) } async fn delete_user(&self, request: Request) -> Result, Status> { @@ -1977,14 +2014,24 @@ impl Node for NodeService { error_info: Some("access_key name is missing".to_string()), })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(DeleteUserResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + let resp = iam_sys.delete_user(&access_key, false).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(DeleteUserResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + Ok(tonic::Response::new(DeleteUserResponse { + success: true, + error_info: None, + })) } async fn delete_service_account( @@ -1999,19 +2046,29 @@ impl Node for NodeService { error_info: Some("access_key name is missing".to_string()), })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(DeleteServiceAccountResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + let resp = iam_sys.delete_service_account(&access_key, false).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(DeleteServiceAccountResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + Ok(tonic::Response::new(DeleteServiceAccountResponse { + success: true, + error_info: None, + })) } async fn load_user(&self, request: Request) -> Result, Status> { let request = request.into_inner(); let access_key = request.access_key; - let _temp = request.temp; + let temp = request.temp; if access_key.is_empty() { return Ok(tonic::Response::new(LoadUserResponse { success: false, @@ -2019,14 +2076,27 @@ impl Node for NodeService { })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(LoadUserResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + let user_type = if temp { UserType::Sts } else { UserType::Reg }; + + let resp = iam_sys.load_user(&access_key, user_type).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(LoadUserResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + + Ok(tonic::Response::new(LoadUserResponse { + success: true, + error_info: None, + })) } async fn load_service_account( @@ -2042,13 +2112,25 @@ impl Node for NodeService { })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(LoadServiceAccountResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + + let resp = iam_sys.load_service_account(&access_key).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(LoadServiceAccountResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + + Ok(tonic::Response::new(LoadServiceAccountResponse { + success: true, + error_info: None, + })) } async fn load_group(&self, request: Request) -> Result, Status> { @@ -2061,13 +2143,24 @@ impl Node for NodeService { })); } - let Some(_store) = new_object_layer_fn() else { + let Some(iam_sys) = get_global_iam_sys() else { return Ok(tonic::Response::new(LoadGroupResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - todo!() + + let resp = iam_sys.load_group(&group).await; + if let Err(err) = resp { + return Ok(tonic::Response::new(LoadGroupResponse { + success: false, + error_info: Some(err.to_string()), + })); + } + Ok(tonic::Response::new(LoadGroupResponse { + success: true, + error_info: None, + })) } async fn reload_site_replication_config(