add iam notification (#604)

move tonic service to rustfs
This commit is contained in:
weisd
2025-09-30 17:32:23 +08:00
committed by GitHub
parent f1dd3a982e
commit 7622b37f7b
11 changed files with 806 additions and 64 deletions

4
Cargo.lock generated
View File

@@ -6204,7 +6204,9 @@ dependencies = [
"clap",
"const-str",
"datafusion",
"flatbuffers",
"futures",
"futures-util",
"http 1.3.1",
"http-body 1.0.1",
"hyper 1.7.0",
@@ -6219,6 +6221,7 @@ dependencies = [
"pin-project-lite",
"pprof",
"reqwest",
"rmp-serde",
"rust-embed",
"rustfs-ahm",
"rustfs-appauth",
@@ -6229,6 +6232,7 @@ dependencies = [
"rustfs-filemeta",
"rustfs-iam",
"rustfs-kms",
"rustfs-lock",
"rustfs-madmin",
"rustfs-notify",
"rustfs-obs",

View File

@@ -16,11 +16,17 @@ use crate::StorageAPI;
use crate::admin_server_info::get_commit_id;
use crate::error::{Error, Result};
use crate::global::{GLOBAL_BOOT_TIME, get_global_endpoints};
use crate::metrics_realtime::{CollectMetricsOpts, MetricType};
use crate::rpc::PeerRestClient;
use crate::{endpoints::EndpointServerPools, new_object_layer_fn};
use futures::future::join_all;
use lazy_static::lazy_static;
use rustfs_madmin::health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService};
use rustfs_madmin::metrics::RealtimeMetrics;
use rustfs_madmin::net::NetInfo;
use rustfs_madmin::{ItemState, ServerProperties};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::OnceLock;
use std::time::SystemTime;
use tracing::{error, warn};
@@ -62,21 +68,122 @@ pub struct NotificationPeerErr {
}
impl NotificationSys {
pub fn rest_client_from_hash(&self, _s: &str) -> Option<PeerRestClient> {
None
}
pub async fn delete_policy(&self) -> Vec<NotificationPeerErr> {
unimplemented!()
}
pub async fn load_policy(&self) -> Vec<NotificationPeerErr> {
unimplemented!()
pub fn rest_client_from_hash(&self, s: &str) -> Option<PeerRestClient> {
if self.all_peer_clients.is_empty() {
return None;
}
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
let idx = (hasher.finish() as usize) % self.all_peer_clients.len();
self.all_peer_clients[idx].clone()
}
pub async fn load_policy_mapping(&self) -> Vec<NotificationPeerErr> {
unimplemented!()
pub async fn delete_policy(&self, policy_name: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let policy = policy_name.to_string();
futures.push(async move {
if let Some(client) = client {
match client.delete_policy(&policy).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn delete_user(&self) -> Vec<NotificationPeerErr> {
unimplemented!()
pub async fn load_policy(&self, policy_name: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let policy = policy_name.to_string();
futures.push(async move {
if let Some(client) = client {
match client.load_policy(&policy).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let uog = user_or_group.to_string();
futures.push(async move {
if let Some(client) = client {
match client.load_policy_mapping(&uog, user_type, is_group).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn delete_user(&self, access_key: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let ak = access_key.to_string();
futures.push(async move {
if let Some(client) = client {
match client.delete_user(&ak).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn storage_info<S: StorageAPI>(&self, api: &S) -> rustfs_madmin::StorageInfo {
@@ -140,6 +247,114 @@ impl NotificationSys {
join_all(futures).await
}
pub async fn load_user(&self, access_key: &str, temp: bool) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let ak = access_key.to_string();
futures.push(async move {
if let Some(client) = client {
match client.load_user(&ak, temp).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn load_group(&self, group: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let gname = group.to_string();
futures.push(async move {
if let Some(client) = client {
match client.load_group(&gname).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn delete_service_account(&self, access_key: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let ak = access_key.to_string();
futures.push(async move {
if let Some(client) = client {
match client.delete_service_account(&ak).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn load_service_account(&self, access_key: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let ak = access_key.to_string();
futures.push(async move {
if let Some(client) = client {
match client.load_service_account(&ak).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn reload_pool_meta(&self) {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {
@@ -202,6 +417,281 @@ impl NotificationSys {
let _ = store.stop_rebalance().await;
warn!("notification stop_rebalance stop_rebalance done");
}
pub async fn load_bucket_metadata(&self, bucket: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let b = bucket.to_string();
futures.push(async move {
if let Some(client) = client {
match client.load_bucket_metadata(&b).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn delete_bucket_metadata(&self, bucket: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let b = bucket.to_string();
futures.push(async move {
if let Some(client) = client {
match client.delete_bucket_metadata(&b).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn start_profiling(&self, profiler: &str) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
let pf = profiler.to_string();
futures.push(async move {
if let Some(client) = client {
match client.start_profiling(&pf).await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn get_cpus(&self) -> Vec<Cpus> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_cpus().await.unwrap_or_default()
} else {
Cpus::default()
}
});
}
join_all(futures).await
}
pub async fn get_net_info(&self) -> Vec<NetInfo> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_net_info().await.unwrap_or_default()
} else {
NetInfo::default()
}
});
}
join_all(futures).await
}
pub async fn get_partitions(&self) -> Vec<Partitions> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_partitions().await.unwrap_or_default()
} else {
Partitions::default()
}
});
}
join_all(futures).await
}
pub async fn get_os_info(&self) -> Vec<OsInfo> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_os_info().await.unwrap_or_default()
} else {
OsInfo::default()
}
});
}
join_all(futures).await
}
pub async fn get_sys_services(&self) -> Vec<SysService> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_se_linux_info().await.unwrap_or_default()
} else {
SysService::default()
}
});
}
join_all(futures).await
}
pub async fn get_sys_config(&self) -> Vec<SysConfig> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_sys_config().await.unwrap_or_default()
} else {
SysConfig::default()
}
});
}
join_all(futures).await
}
pub async fn get_sys_errors(&self) -> Vec<SysErrors> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_sys_errors().await.unwrap_or_default()
} else {
SysErrors::default()
}
});
}
join_all(futures).await
}
pub async fn get_mem_info(&self) -> Vec<MemInfo> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_mem_info().await.unwrap_or_default()
} else {
MemInfo::default()
}
});
}
join_all(futures).await
}
pub async fn get_proc_info(&self) -> Vec<ProcInfo> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_proc_info().await.unwrap_or_default()
} else {
ProcInfo::default()
}
});
}
join_all(futures).await
}
pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Vec<RealtimeMetrics> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
let t_clone = t;
let opts_clone = opts;
futures.push(async move {
if let Some(client) = client {
client.get_metrics(t_clone, opts_clone).await.unwrap_or_default()
} else {
RealtimeMetrics::default()
}
});
}
join_all(futures).await
}
pub async fn reload_site_replication_config(&self) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
futures.push(async move {
if let Some(client) = client {
match client.reload_site_replication_config().await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
pub async fn load_transition_tier_config(&self) -> Vec<NotificationPeerErr> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
futures.push(async move {
if let Some(client) = client {
match client.load_transition_tier_config().await {
Ok(_) => NotificationPeerErr {
host: client.host.to_string(),
err: None,
},
Err(e) => NotificationPeerErr {
host: client.host.to_string(),
err: Some(e),
},
}
} else {
NotificationPeerErr {
host: "".to_string(),
err: Some(Error::other("peer is not reachable")),
}
}
});
}
join_all(futures).await
}
}
fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec<rustfs_madmin::Disk> {

View File

@@ -16,10 +16,8 @@ mod http_auth;
mod peer_rest_client;
mod peer_s3_client;
mod remote_disk;
mod tonic_service;
pub use http_auth::{build_auth_headers, verify_rpc_signature};
pub use peer_rest_client::PeerRestClient;
pub use peer_s3_client::{LocalPeerS3Client, PeerS3Client, RemotePeerS3Client, S3PeerSys};
pub use remote_disk::RemoteDisk;
pub use tonic_service::{NodeService, make_server};

View File

@@ -43,3 +43,7 @@ pub async fn init_iam_sys(ecstore: Arc<ECStore>) -> Result<()> {
pub fn get() -> Result<Arc<IamSys<ObjectStore>>> {
IAM_SYS.get().map(Arc::clone).ok_or(Error::IamSysNotInitialized)
}
pub fn get_global_iam_sys() -> Option<Arc<IamSys<ObjectStore>>> {
IAM_SYS.get().cloned()
}

View File

@@ -23,6 +23,7 @@ use time::OffsetDateTime;
#[async_trait::async_trait]
pub trait Store: Clone + Send + Sync + 'static {
fn has_watcher(&self) -> bool;
async fn save_iam_config<Item: Serialize + Send>(&self, item: Item, path: impl AsRef<str> + Send) -> Result<()>;
async fn load_iam_config<Item: DeserializeOwned>(&self, path: impl AsRef<str> + Send) -> Result<Item>;
async fn delete_iam_config(&self, path: impl AsRef<str> + Send) -> Result<()>;
@@ -89,6 +90,24 @@ impl UserType {
UserType::None => "",
}
}
pub fn to_u64(&self) -> u64 {
match self {
UserType::Svc => 1,
UserType::Sts => 2,
UserType::Reg => 3,
UserType::None => 0,
}
}
pub fn from_u64(u64: u64) -> Option<Self> {
match u64 {
1 => Some(UserType::Svc),
2 => Some(UserType::Sts),
3 => Some(UserType::Reg),
0 => Some(UserType::None),
_ => None,
}
}
}
#[derive(Serialize, Deserialize, Clone)]

View File

@@ -380,6 +380,9 @@ impl ObjectStore {
#[async_trait::async_trait]
impl Store for ObjectStore {
fn has_watcher(&self) -> bool {
false
}
async fn load_iam_config<Item: DeserializeOwned>(&self, path: impl AsRef<str> + Send) -> Result<Item> {
let mut data = read_config(self.object_api.clone(), path.as_ref()).await?;

View File

@@ -25,6 +25,7 @@ use crate::store::Store;
use crate::store::UserType;
use crate::utils::extract_claims;
use rustfs_ecstore::global::get_global_action_cred;
use rustfs_ecstore::notification_sys::get_global_notification_sys;
use rustfs_madmin::AddOrUpdateUserReq;
use rustfs_madmin::GroupDesc;
use rustfs_policy::arn::ARN;
@@ -41,6 +42,7 @@ use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use time::OffsetDateTime;
use tracing::warn;
pub const MAX_SVCSESSION_POLICY_SIZE: usize = 4096;
@@ -63,6 +65,9 @@ impl<T: Store> IamSys<T> {
roles_map: HashMap::new(),
}
}
pub fn has_watcher(&self) -> bool {
self.store.api.has_watcher()
}
pub async fn load_group(&self, name: &str) -> Result<()> {
self.store.group_notification_handler(name).await
@@ -104,8 +109,17 @@ impl<T: Store> IamSys<T> {
self.store.delete_policy(name, notify).await?;
if notify {
// TODO: implement notification
if !notify || self.has_watcher() {
return Ok(());
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.delete_policy(name).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify delete_policy failed: {}", err);
}
}
}
Ok(())
@@ -142,9 +156,20 @@ impl<T: Store> IamSys<T> {
}
pub async fn set_policy(&self, name: &str, policy: Policy) -> Result<OffsetDateTime> {
self.store.set_policy(name, policy).await
let updated_at = self.store.set_policy(name, policy).await?;
// TODO: notification
if !self.has_watcher() {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_policy(name).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_policy failed: {}", err);
}
}
}
}
Ok(updated_at)
}
pub async fn get_role_policy(&self, arn_str: &str) -> Result<(ARN, String)> {
@@ -159,9 +184,51 @@ impl<T: Store> IamSys<T> {
Ok((arn, policy.clone()))
}
pub async fn delete_user(&self, name: &str, _notify: bool) -> Result<()> {
self.store.delete_user(name, UserType::Reg).await
// TODO: notification
pub async fn delete_user(&self, name: &str, notify: bool) -> Result<()> {
self.store.delete_user(name, UserType::Reg).await?;
if notify && !self.has_watcher() {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.delete_user(name).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify delete_user failed: {}", err);
}
}
}
}
Ok(())
}
async fn notify_for_user(&self, name: &str, is_temp: bool) {
if self.has_watcher() {
return;
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_user(name, is_temp).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_user failed: {}", err);
}
}
}
}
async fn notify_for_service_account(&self, name: &str) {
if self.has_watcher() {
return;
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_service_account(name).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_service_account failed: {}", err);
}
}
}
}
pub async fn current_policies(&self, name: &str) -> String {
@@ -177,8 +244,11 @@ impl<T: Store> IamSys<T> {
}
pub async fn set_temp_user(&self, name: &str, cred: &Credentials, policy_name: Option<&str>) -> Result<OffsetDateTime> {
self.store.set_temp_user(name, cred, policy_name).await
// TODO: notification
let updated_at = self.store.set_temp_user(name, cred, policy_name).await?;
self.notify_for_user(&cred.access_key, true).await;
Ok(updated_at)
}
pub async fn is_temp_user(&self, name: &str) -> Result<(bool, String)> {
@@ -208,8 +278,11 @@ impl<T: Store> IamSys<T> {
}
pub async fn set_user_status(&self, name: &str, status: rustfs_madmin::AccountStatus) -> Result<OffsetDateTime> {
self.store.set_user_status(name, status).await
// TODO: notification
let updated_at = self.store.set_user_status(name, status).await?;
self.notify_for_user(name, false).await;
Ok(updated_at)
}
pub async fn new_service_account(
@@ -294,14 +367,17 @@ impl<T: Store> IamSys<T> {
let create_at = self.store.add_service_account(cred.clone()).await?;
self.notify_for_service_account(&cred.access_key).await;
Ok((cred, create_at))
// TODO: notification
}
pub async fn update_service_account(&self, name: &str, opts: UpdateServiceAccountOpts) -> Result<OffsetDateTime> {
self.store.update_service_account(name, opts).await
let updated_at = self.store.update_service_account(name, opts).await?;
// TODO: notification
self.notify_for_service_account(name).await;
Ok(updated_at)
}
pub async fn list_service_accounts(&self, access_key: &str) -> Result<Vec<Credentials>> {
@@ -424,7 +500,7 @@ impl<T: Store> IamSys<T> {
extract_jwt_claims(&u)
}
pub async fn delete_service_account(&self, access_key: &str, _notify: bool) -> Result<()> {
pub async fn delete_service_account(&self, access_key: &str, notify: bool) -> Result<()> {
let Some(u) = self.store.get_user(access_key).await else {
return Ok(());
};
@@ -433,9 +509,35 @@ impl<T: Store> IamSys<T> {
return Ok(());
}
self.store.delete_user(access_key, UserType::Svc).await
self.store.delete_user(access_key, UserType::Svc).await?;
// TODO: notification
if notify && !self.has_watcher() {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.delete_service_account(access_key).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify delete_service_account failed: {}", err);
}
}
}
}
Ok(())
}
async fn notify_for_group(&self, group: &str) {
if self.has_watcher() {
return;
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_group(group).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_group failed: {}", err);
}
}
}
}
pub async fn create_user(&self, access_key: &str, args: &AddOrUpdateUserReq) -> Result<OffsetDateTime> {
@@ -451,8 +553,11 @@ impl<T: Store> IamSys<T> {
return Err(IamError::InvalidSecretKeyLength);
}
self.store.add_user(access_key, args).await
// TODO: notification
let updated_at = self.store.add_user(access_key, args).await?;
self.notify_for_user(access_key, false).await;
Ok(updated_at)
}
pub async fn set_user_secret_key(&self, access_key: &str, secret_key: &str) -> Result<()> {
@@ -495,18 +600,27 @@ impl<T: Store> IamSys<T> {
if contains_reserved_chars(group) {
return Err(IamError::GroupNameContainsReservedChars);
}
self.store.add_users_to_group(group, users).await
// TODO: notification
let updated_at = self.store.add_users_to_group(group, users).await?;
self.notify_for_group(group).await;
Ok(updated_at)
}
pub async fn remove_users_from_group(&self, group: &str, users: Vec<String>) -> Result<OffsetDateTime> {
self.store.remove_users_from_group(group, users).await
// TODO: notification
let updated_at = self.store.remove_users_from_group(group, users).await?;
self.notify_for_group(group).await;
Ok(updated_at)
}
pub async fn set_group_status(&self, group: &str, enable: bool) -> Result<OffsetDateTime> {
self.store.set_group_status(group, enable).await
// TODO: notification
let updated_at = self.store.set_group_status(group, enable).await?;
self.notify_for_group(group).await;
Ok(updated_at)
}
pub async fn get_group_description(&self, group: &str) -> Result<GroupDesc> {
self.store.get_group_description(group).await
@@ -517,8 +631,20 @@ impl<T: Store> IamSys<T> {
}
pub async fn policy_db_set(&self, name: &str, user_type: UserType, is_group: bool, policy: &str) -> Result<OffsetDateTime> {
self.store.policy_db_set(name, user_type, is_group, policy).await
// TODO: notification
let updated_at = self.store.policy_db_set(name, user_type, is_group, policy).await?;
if !self.has_watcher() {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_policy_mapping(name, user_type.to_u64(), is_group).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_policy failed: {}", err);
}
}
}
}
Ok(updated_at)
}
pub async fn policy_db_get(&self, name: &str, groups: &Option<Vec<String>>) -> Result<Vec<String>> {

View File

@@ -117,6 +117,10 @@ url = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true }
zip = { workspace = true }
futures-util.workspace = true
rmp-serde.workspace = true
flatbuffers.workspace = true
rustfs-lock.workspace = true
[target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
sysctl = { workspace = true }

View File

@@ -18,6 +18,7 @@ use crate::auth::IAMAuth;
use crate::config;
use crate::server::{ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer};
use crate::storage;
use crate::storage::tonic_service::make_server;
use bytes::Bytes;
use http::{HeaderMap, Request as HttpRequest, Response};
use hyper_util::{
@@ -27,7 +28,6 @@ use hyper_util::{
service::TowerToHyperService,
};
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_ecstore::rpc::make_server;
use rustfs_obs::SystemObserver;
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_utils::net::parse_and_resolve_address;

View File

@@ -16,3 +16,4 @@ pub mod access;
pub mod ecfs;
// pub mod error;
pub mod options;
pub mod tonic_service;

View File

@@ -15,7 +15,9 @@
use std::{collections::HashMap, io::Cursor, pin::Pin, sync::Arc};
// use common::error::Error as EcsError;
use crate::{
use futures::Stream;
use futures_util::future::join_all;
use rustfs_ecstore::{
admin_server_info::get_local_server_property,
bucket::{metadata::load_bucket_metadata, metadata_sys},
disk::{
@@ -28,14 +30,13 @@ use crate::{
store::{all_local_disk_path, find_local_disk},
store_api::{BucketOptions, DeleteBucketOptions, MakeBucketOptions, StorageAPI},
};
use futures::Stream;
use futures_util::future::join_all;
use rustfs_common::{globals::GLOBAL_Local_Node_Name, heal_channel::HealOpts};
use bytes::Bytes;
use rmp_serde::{Deserializer, Serializer};
use rustfs_filemeta::{FileInfo, MetacacheReader};
use rustfs_iam::{get_global_iam_sys, store::UserType};
use rustfs_lock::{LockClient, LockRequest};
use rustfs_madmin::health::{
get_cpus, get_mem_info, get_os_info, get_partitions, get_proc_info, get_sys_config, get_sys_errors, get_sys_services,
@@ -1917,14 +1918,24 @@ impl Node for NodeService {
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(DeletePolicyResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.delete_policy(&policy, false).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(DeletePolicyResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(DeletePolicyResponse {
success: true,
error_info: None,
}))
}
async fn load_policy(&self, request: Request<LoadPolicyRequest>) -> Result<Response<LoadPolicyResponse>, Status> {
@@ -1936,13 +1947,24 @@ impl Node for NodeService {
error_info: Some("policy name is missing".to_string()),
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(LoadPolicyResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.load_policy(&policy).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(LoadPolicyResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(LoadPolicyResponse {
success: true,
error_info: None,
}))
}
async fn load_policy_mapping(
@@ -1957,15 +1979,30 @@ impl Node for NodeService {
error_info: Some("user_or_group name is missing".to_string()),
}));
}
let _user_type = request.user_type;
let _is_group = request.is_group;
let Some(_store) = new_object_layer_fn() else {
let Some(user_type) = UserType::from_u64(request.user_type) else {
return Ok(tonic::Response::new(LoadPolicyMappingResponse {
success: false,
error_info: Some("invalid user type".to_string()),
}));
};
let is_group = request.is_group;
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(LoadPolicyMappingResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.load_policy_mapping(&user_or_group, user_type, is_group).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(LoadPolicyMappingResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(LoadPolicyMappingResponse {
success: true,
error_info: None,
}))
}
async fn delete_user(&self, request: Request<DeleteUserRequest>) -> Result<Response<DeleteUserResponse>, Status> {
@@ -1977,14 +2014,24 @@ impl Node for NodeService {
error_info: Some("access_key name is missing".to_string()),
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(DeleteUserResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.delete_user(&access_key, false).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(DeleteUserResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(DeleteUserResponse {
success: true,
error_info: None,
}))
}
async fn delete_service_account(
@@ -1999,19 +2046,29 @@ impl Node for NodeService {
error_info: Some("access_key name is missing".to_string()),
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(DeleteServiceAccountResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.delete_service_account(&access_key, false).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(DeleteServiceAccountResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(DeleteServiceAccountResponse {
success: true,
error_info: None,
}))
}
async fn load_user(&self, request: Request<LoadUserRequest>) -> Result<Response<LoadUserResponse>, Status> {
let request = request.into_inner();
let access_key = request.access_key;
let _temp = request.temp;
let temp = request.temp;
if access_key.is_empty() {
return Ok(tonic::Response::new(LoadUserResponse {
success: false,
@@ -2019,14 +2076,27 @@ impl Node for NodeService {
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(LoadUserResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let user_type = if temp { UserType::Sts } else { UserType::Reg };
let resp = iam_sys.load_user(&access_key, user_type).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(LoadUserResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(LoadUserResponse {
success: true,
error_info: None,
}))
}
async fn load_service_account(
@@ -2042,13 +2112,25 @@ impl Node for NodeService {
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(LoadServiceAccountResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.load_service_account(&access_key).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(LoadServiceAccountResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(LoadServiceAccountResponse {
success: true,
error_info: None,
}))
}
async fn load_group(&self, request: Request<LoadGroupRequest>) -> Result<Response<LoadGroupResponse>, Status> {
@@ -2061,13 +2143,24 @@ impl Node for NodeService {
}));
}
let Some(_store) = new_object_layer_fn() else {
let Some(iam_sys) = get_global_iam_sys() else {
return Ok(tonic::Response::new(LoadGroupResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
todo!()
let resp = iam_sys.load_group(&group).await;
if let Err(err) = resp {
return Ok(tonic::Response::new(LoadGroupResponse {
success: false,
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(LoadGroupResponse {
success: true,
error_info: None,
}))
}
async fn reload_site_replication_config(