diff --git a/Cargo.lock b/Cargo.lock index b62f3925..b9050a57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,6 +1243,7 @@ dependencies = [ "itertools 0.14.0", "jsonwebtoken", "log", + "madmin", "rand", "serde", "serde_json", @@ -2508,7 +2509,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "s3s" version = "0.11.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=3291c0ca0284971569499cbe75bd69ef7bde8321#3291c0ca0284971569499cbe75bd69ef7bde8321" +source = "git+https://github.com/Nugine/s3s.git?rev=05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8#05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8" dependencies = [ "arrayvec", "async-trait", @@ -2555,7 +2556,7 @@ dependencies = [ [[package]] name = "s3s-policy" version = "0.11.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=3291c0ca0284971569499cbe75bd69ef7bde8321#3291c0ca0284971569499cbe75bd69ef7bde8321" +source = "git+https://github.com/Nugine/s3s.git?rev=05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8#05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8" dependencies = [ "indexmap 2.6.0", "serde", diff --git a/Cargo.toml b/Cargo.toml index 0a866530..6777770a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,10 +65,10 @@ protos = { path = "./common/protos" } rand = "0.8.5" rmp = "0.8.14" rmp-serde = "1.3.0" -s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3291c0ca0284971569499cbe75bd69ef7bde8321", default-features = true, features = [ +s3s = { git = "https://github.com/Nugine/s3s.git", rev = "05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8", default-features = true, features = [ "tower", ] } -s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "3291c0ca0284971569499cbe75bd69ef7bde8321" } +s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8" } serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.134" tempfile = "3.13.0" diff --git a/ecstore/src/bucket/tagging/mod.rs b/ecstore/src/bucket/tagging/mod.rs index a7cc219c..f50b85a7 100644 --- a/ecstore/src/bucket/tagging/mod.rs +++ b/ecstore/src/bucket/tagging/mod.rs @@ -7,9 +7,13 @@ pub fn decode_tags(tags: &str) -> Vec { let mut list = Vec::new(); for (k, v) in values { + if k.is_empty() || v.is_empty() { + continue; + } + list.push(Tag { - key: k.to_string(), - value: v.to_string(), + key: Some(k.to_string()), + value: Some(v.to_string()), }); } @@ -20,7 +24,9 @@ pub fn encode_tags(tags: Vec) -> String { let mut encoded = form_urlencoded::Serializer::new(String::new()); for tag in tags.iter() { - encoded.append_pair(tag.key.as_str(), tag.value.as_str()); + if let (Some(k), Some(v)) = (tag.key.as_ref(), tag.value.as_ref()) { + encoded.append_pair(k.as_str(), v.as_str()); + } } encoded.finish() diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index 1f553565..263b7fa0 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -54,7 +54,7 @@ impl Clone for ListPathRawOptions { } pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) -> Result<()> { - info!("list_path_raw"); + // println!("list_path_raw {},{}", &opts.bucket, &opts.path); if opts.disks.is_empty() { info!("list_path_raw 0 drives provided"); return Err(Error::from_string("list_path_raw: 0 drives provided")); @@ -283,11 +283,9 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } } - info!("read entry should heal: {}", current.name); if let Some(partial_fn) = opts.partial.as_ref() { partial_fn(MetaCacheEntries(top_entries), &errs).await; } - // break; } Ok(()) }); diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 6bc16998..5d5bfa1e 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -1,3 +1,5 @@ +use crate::bucket::metadata_sys::get_versioning_config; +use crate::bucket::versioning::VersioningApi; use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions}; use crate::disk::error::{is_all_not_found, is_all_volume_not_found, is_err_eof, DiskError}; use crate::disk::{ @@ -9,10 +11,10 @@ use crate::file_meta::merge_file_meta_versions; use crate::peer::is_reserved_or_invalid_bucket; use crate::set_disk::SetDisks; use crate::store::check_list_objs_args; -use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions}; +use crate::store_api::{FileInfo, ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions}; use crate::store_err::{is_err_bucket_not_found, to_object_err, StorageError}; use crate::utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR}; -use crate::StorageAPI; +use crate::{bucket, StorageAPI}; use crate::{store::ECStore, store_api::ListObjectsV2Info}; use futures::future::join_all; use rand::seq::SliceRandom; @@ -22,7 +24,7 @@ use std::io::ErrorKind; use std::sync::Arc; use tokio::sync::broadcast::{self, Receiver as B_Receiver}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::error; +use tracing::{error, warn}; use uuid::Uuid; const MAX_OBJECT_LIST: i32 = 1000; @@ -245,7 +247,7 @@ impl ECStore { ..Default::default() }; - // warn!("list_objects_generic opts {:?}", &opts); + warn!("list_objects_generic opts {:?}", &opts); // use get if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() { @@ -669,6 +671,287 @@ impl ECStore { Ok(Vec::new()) } + + pub async fn walk( + self: Arc, + rx: B_Receiver, + bucket: &str, + prefix: &str, + result: Sender, + opts: WalkOptions, + ) -> Result<()> { + check_list_objs_args(bucket, prefix, &None)?; + + let mut futures = Vec::new(); + let mut inputs = Vec::new(); + + for eset in self.pools.iter() { + for set in eset.disk_set.iter() { + let (mut disks, infos, _) = set.get_online_disks_with_healing_and_info(true).await; + let rx = rx.resubscribe(); + let opts = opts.clone(); + + let (sender, list_out_rx) = mpsc::channel::(1); + inputs.push(list_out_rx); + futures.push(async move { + let mut ask_disks = get_list_quorum(&opts.ask_disks, set.set_drive_count as i32); + if ask_disks == -1 { + let new_disks = get_quorum_disks(&disks, &infos, (disks.len() + 1) / 2); + if !new_disks.is_empty() { + disks = new_disks; + } else { + ask_disks = get_list_quorum("strict", set.set_drive_count as i32); + } + } + + if set.set_drive_count == 4 || ask_disks > disks.len() as i32 { + ask_disks = disks.len() as i32; + } + + let fallback_disks = { + if ask_disks > 0 && disks.len() > ask_disks as usize { + let mut rand = thread_rng(); + disks.shuffle(&mut rand); + disks.split_off(ask_disks as usize) + } else { + Vec::new() + } + }; + + let listing_quorum = ((ask_disks + 1) / 2) as usize; + + let resolver = MetadataResolutionParams { + dir_quorum: listing_quorum, + obj_quorum: listing_quorum, + bucket: bucket.to_owned(), + ..Default::default() + }; + + let path = base_dir_from_prefix(prefix); + + let mut filter_prefix = { + prefix + .trim_start_matches(&path) + .trim_start_matches(SLASH_SEPARATOR) + .trim_end_matches(SLASH_SEPARATOR) + .to_owned() + }; + + if filter_prefix == path { + filter_prefix = "".to_owned(); + } + + // let (sender, rx1) = mpsc::channel(100); + + let tx1 = sender.clone(); + let tx2 = sender.clone(); + + list_path_raw( + rx.resubscribe(), + ListPathRawOptions { + disks: disks.iter().cloned().map(Some).collect(), + fallback_disks: fallback_disks.iter().cloned().map(Some).collect(), + bucket: bucket.to_owned(), + path, + recursice: true, + filter_prefix: Some(filter_prefix), + forward_to: opts.marker.clone(), + min_disks: listing_quorum, + per_disk_limit: opts.limit as i32, + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + Box::pin({ + let value = tx1.clone(); + async move { + if entry.is_dir() { + return; + } + if let Err(err) = value.send(entry).await { + error!("list_path send fail {:?}", err); + } + } + }) + })), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + Box::pin({ + let value = tx2.clone(); + let resolver = resolver.clone(); + async move { + if let Ok(Some(entry)) = entries.resolve(resolver) { + if let Err(err) = value.send(entry).await { + error!("list_path send fail {:?}", err); + } + } + } + }) + })), + finished: None, + ..Default::default() + }, + ) + .await + }); + } + } + + let (merge_tx, mut merge_rx) = mpsc::channel::(100); + + let bucket = bucket.to_owned(); + + let vcf = match get_versioning_config(&bucket).await { + Ok((res, _)) => Some(res), + Err(_) => None, + }; + + tokio::spawn(async move { + let mut sent_err = false; + while let Some(entry) = merge_rx.recv().await { + if opts.latest_only { + let fi = match entry.to_fileinfo(&bucket) { + Ok(res) => res, + Err(err) => { + if !sent_err { + let item = ObjectInfoOrErr { + item: None, + err: Some(err), + }; + + if let Err(err) = result.send(item).await { + error!("walk result send err {:?}", err); + } + + sent_err = true; + + return; + } + + continue; + } + }; + + if let Some(fiter) = opts.filter { + if fiter(&fi) { + let item = ObjectInfoOrErr { + item: Some(fi.to_object_info(&bucket, &fi.name, { + if let Some(v) = &vcf { + v.versioned(&fi.name) + } else { + false + } + })), + err: None, + }; + + if let Err(err) = result.send(item).await { + error!("walk result send err {:?}", err); + } + } + } else { + let item = ObjectInfoOrErr { + item: Some(fi.to_object_info(&bucket, &fi.name, { + if let Some(v) = &vcf { + v.versioned(&fi.name) + } else { + false + } + })), + err: None, + }; + + if let Err(err) = result.send(item).await { + error!("walk result send err {:?}", err); + } + } + continue; + } + + let fvs = match entry.file_info_versions(&bucket) { + Ok(res) => res, + Err(err) => { + let item = ObjectInfoOrErr { + item: None, + err: Some(err), + }; + + if let Err(err) = result.send(item).await { + error!("walk result send err {:?}", err); + } + return; + } + }; + + if opts.versions_sort == WalkVersionsSortOrder::Ascending { + //TODO: SORT + } + + for fi in fvs.versions.iter() { + if let Some(fiter) = opts.filter { + if fiter(fi) { + let item = ObjectInfoOrErr { + item: Some(fi.to_object_info(&bucket, &fi.name, { + if let Some(v) = &vcf { + v.versioned(&fi.name) + } else { + false + } + })), + err: None, + }; + + if let Err(err) = result.send(item).await { + error!("walk result send err {:?}", err); + } + } + } else { + let item = ObjectInfoOrErr { + item: Some(fi.to_object_info(&bucket, &fi.name, { + if let Some(v) = &vcf { + v.versioned(&fi.name) + } else { + false + } + })), + err: None, + }; + + if let Err(err) = result.send(item).await { + error!("walk result send err {:?}", err); + } + } + } + } + }); + + tokio::spawn(async move { merge_entry_channels(rx, inputs, merge_tx, 1).await }); + + join_all(futures).await; + + Ok(()) + } +} + +type WalkFilter = fn(&FileInfo) -> bool; + +#[derive(Clone, Default)] +pub struct WalkOptions { + pub filter: Option, // return WalkFilter returns 'true/false' + pub marker: Option, // set to skip until this object + pub latest_only: bool, // returns only latest versions for all matching objects + pub ask_disks: String, // dictates how many disks are being listed + pub versions_sort: WalkVersionsSortOrder, // sort order for versions of the same object; default: Ascending order in ModTime + pub limit: usize, // maximum number of items, 0 means no limit +} + +#[derive(Clone, Default, PartialEq, Eq)] +pub enum WalkVersionsSortOrder { + #[default] + Ascending, + Descending, +} + +#[derive(Debug)] +pub struct ObjectInfoOrErr { + pub item: Option, + pub err: Option, } async fn gather_results( @@ -1092,260 +1375,294 @@ fn calc_common_counter(infos: &[DiskInfo], read_quorum: usize) -> u64 { // list_path_raw -// #[cfg(test)] -// mod test { -// use std::sync::Arc; +#[cfg(test)] +mod test { + use std::sync::Arc; -// use crate::cache_value::metacache_set::list_path_raw; -// use crate::cache_value::metacache_set::ListPathRawOptions; -// use crate::disk::endpoint::Endpoint; -// use crate::disk::error::is_err_eof; -// use crate::disk::format::FormatV3; -// use crate::disk::new_disk; -// use crate::disk::DiskAPI; -// use crate::disk::DiskOption; -// use crate::disk::MetaCacheEntries; -// use crate::disk::MetaCacheEntry; -// use crate::disk::WalkDirOptions; -// use crate::endpoints::EndpointServerPools; -// use crate::error::Error; -// use crate::metacache::writer::MetacacheReader; -// use crate::set_disk::SetDisks; -// use crate::store::ECStore; -// use crate::store_list_objects::ListPathOptions; -// use futures::future::join_all; -// use lock::namespace_lock::NsLockMap; -// use tokio::sync::broadcast; -// use tokio::sync::mpsc; -// use tokio::sync::RwLock; -// use uuid::Uuid; + use crate::cache_value::metacache_set::list_path_raw; + use crate::cache_value::metacache_set::ListPathRawOptions; + use crate::disk::endpoint::Endpoint; + use crate::disk::error::is_err_eof; + use crate::disk::format::FormatV3; + use crate::disk::new_disk; + use crate::disk::DiskAPI; + use crate::disk::DiskOption; + use crate::disk::MetaCacheEntries; + use crate::disk::MetaCacheEntry; + use crate::disk::WalkDirOptions; + use crate::endpoints::EndpointServerPools; + use crate::error::Error; + use crate::metacache::writer::MetacacheReader; + use crate::set_disk::SetDisks; + use crate::store::ECStore; + use crate::store_list_objects::ListPathOptions; + use crate::store_list_objects::WalkOptions; + use crate::store_list_objects::WalkVersionsSortOrder; + use futures::future::join_all; + use lock::namespace_lock::NsLockMap; + use tokio::sync::broadcast; + use tokio::sync::mpsc; + use tokio::sync::RwLock; + use uuid::Uuid; -// #[tokio::test] -// async fn test_walk_dir() { -// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap(); -// ep.pool_idx = 0; -// ep.set_idx = 0; -// ep.disk_idx = 0; -// ep.is_local = true; + // #[tokio::test] + // async fn test_walk_dir() { + // let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap(); + // ep.pool_idx = 0; + // ep.set_idx = 0; + // ep.disk_idx = 0; + // ep.is_local = true; -// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail"); + // let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail"); -// // let disk = match LocalDisk::new(&ep, false).await { -// // Ok(res) => res, -// // Err(err) => { -// // println!("LocalDisk::new err {:?}", err); -// // return; -// // } -// // }; + // // let disk = match LocalDisk::new(&ep, false).await { + // // Ok(res) => res, + // // Err(err) => { + // // println!("LocalDisk::new err {:?}", err); + // // return; + // // } + // // }; -// let (rd, mut wr) = tokio::io::duplex(64); + // let (rd, mut wr) = tokio::io::duplex(64); -// let job = tokio::spawn(async move { -// let opts = WalkDirOptions { -// bucket: "dada".to_owned(), -// base_dir: "".to_owned(), -// recursive: true, -// ..Default::default() -// }; + // let job = tokio::spawn(async move { + // let opts = WalkDirOptions { + // bucket: "dada".to_owned(), + // base_dir: "".to_owned(), + // recursive: true, + // ..Default::default() + // }; -// println!("walk opts {:?}", opts); -// if let Err(err) = disk.walk_dir(opts, &mut wr).await { -// println!("walk_dir err {:?}", err); -// } -// }); + // println!("walk opts {:?}", opts); + // if let Err(err) = disk.walk_dir(opts, &mut wr).await { + // println!("walk_dir err {:?}", err); + // } + // }); -// let job2 = tokio::spawn(async move { -// let mut mrd = MetacacheReader::new(rd); + // let job2 = tokio::spawn(async move { + // let mut mrd = MetacacheReader::new(rd); -// loop { -// match mrd.peek().await { -// Ok(res) => { -// if let Some(info) = res { -// println!("info {:?}", info.name) -// } else { -// break; -// } -// } -// Err(err) => { -// if is_err_eof(&err) { -// break; -// } + // loop { + // match mrd.peek().await { + // Ok(res) => { + // if let Some(info) = res { + // println!("info {:?}", info.name) + // } else { + // break; + // } + // } + // Err(err) => { + // if is_err_eof(&err) { + // break; + // } -// println!("get err {:?}", err); -// break; -// } -// } -// } -// }); -// join_all(vec![job, job2]).await; -// } + // println!("get err {:?}", err); + // break; + // } + // } + // } + // }); + // join_all(vec![job, job2]).await; + // } -// #[tokio::test] -// async fn test_list_path_raw() { -// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap(); -// ep.pool_idx = 0; -// ep.set_idx = 0; -// ep.disk_idx = 0; -// ep.is_local = true; + // #[tokio::test] + // async fn test_list_path_raw() { + // let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap(); + // ep.pool_idx = 0; + // ep.set_idx = 0; + // ep.disk_idx = 0; + // ep.is_local = true; -// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail"); + // let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail"); -// // let disk = match LocalDisk::new(&ep, false).await { -// // Ok(res) => res, -// // Err(err) => { -// // println!("LocalDisk::new err {:?}", err); -// // return; -// // } -// // }; + // // let disk = match LocalDisk::new(&ep, false).await { + // // Ok(res) => res, + // // Err(err) => { + // // println!("LocalDisk::new err {:?}", err); + // // return; + // // } + // // }; -// let (_, rx) = broadcast::channel(1); -// let bucket = "dada".to_owned(); -// let forward_to = None; -// let disks = vec![Some(disk)]; -// let fallback_disks = Vec::new(); + // let (_, rx) = broadcast::channel(1); + // let bucket = "dada".to_owned(); + // let forward_to = None; + // let disks = vec![Some(disk)]; + // let fallback_disks = Vec::new(); -// list_path_raw( -// rx, -// ListPathRawOptions { -// disks, -// fallback_disks, -// bucket, -// path: "".to_owned(), -// recursice: true, -// forward_to, -// min_disks: 1, -// report_not_found: false, -// agreed: Some(Box::new(move |entry: MetaCacheEntry| { -// Box::pin(async move { println!("get entry: {}", entry.name) }) -// })), -// partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { -// Box::pin(async move { println!("get entries: {:?}", entries) }) -// })), -// finished: None, -// ..Default::default() -// }, -// ) -// .await -// .unwrap(); -// } + // list_path_raw( + // rx, + // ListPathRawOptions { + // disks, + // fallback_disks, + // bucket, + // path: "".to_owned(), + // recursice: true, + // forward_to, + // min_disks: 1, + // report_not_found: false, + // agreed: Some(Box::new(move |entry: MetaCacheEntry| { + // Box::pin(async move { println!("get entry: {}", entry.name) }) + // })), + // partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + // Box::pin(async move { println!("get entries: {:?}", entries) }) + // })), + // finished: None, + // ..Default::default() + // }, + // ) + // .await + // .unwrap(); + // } -// #[tokio::test] -// async fn test_set_list_path() { -// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap(); -// ep.pool_idx = 0; -// ep.set_idx = 0; -// ep.disk_idx = 0; -// ep.is_local = true; + // #[tokio::test] + // async fn test_set_list_path() { + // let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap(); + // ep.pool_idx = 0; + // ep.set_idx = 0; + // ep.disk_idx = 0; + // ep.is_local = true; -// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail"); -// let _ = disk.set_disk_id(Some(Uuid::new_v4())).await; + // let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail"); + // let _ = disk.set_disk_id(Some(Uuid::new_v4())).await; -// let set = SetDisks { -// lockers: Vec::new(), -// locker_owner: String::new(), -// ns_mutex: Arc::new(RwLock::new(NsLockMap::new(false))), -// disks: RwLock::new(vec![Some(disk)]), -// set_endpoints: Vec::new(), -// set_drive_count: 1, -// default_parity_count: 0, -// set_index: 0, -// pool_index: 0, -// format: FormatV3::new(1, 1), -// }; + // let set = SetDisks { + // lockers: Vec::new(), + // locker_owner: String::new(), + // ns_mutex: Arc::new(RwLock::new(NsLockMap::new(false))), + // disks: RwLock::new(vec![Some(disk)]), + // set_endpoints: Vec::new(), + // set_drive_count: 1, + // default_parity_count: 0, + // set_index: 0, + // pool_index: 0, + // format: FormatV3::new(1, 1), + // }; -// let (_tx, rx) = broadcast::channel(1); + // let (_tx, rx) = broadcast::channel(1); -// let bucket = "dada".to_owned(); + // let bucket = "dada".to_owned(); -// let opts = ListPathOptions { -// bucket, -// recursive: true, -// ..Default::default() -// }; + // let opts = ListPathOptions { + // bucket, + // recursive: true, + // ..Default::default() + // }; -// let (sender, mut recv) = mpsc::channel(10); + // let (sender, mut recv) = mpsc::channel(10); -// set.list_path(rx, opts, sender).await.unwrap(); + // set.list_path(rx, opts, sender).await.unwrap(); -// while let Some(entry) = recv.recv().await { -// println!("get entry {:?}", entry.name) -// } -// } + // while let Some(entry) = recv.recv().await { + // println!("get entry {:?}", entry.name) + // } + // } -// #[tokio::test] -// async fn test_list_merged() { -// let server_address = "localhost:9000"; + // #[tokio::test] + //walk() { + // let server_address = "localhost:9000"; -// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( -// server_address, -// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], -// ) -// .unwrap(); + // let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( + // server_address, + // vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], + // ) + // .unwrap(); -// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) -// .await -// .unwrap(); + // let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) + // .await + // .unwrap(); -// let (_tx, rx) = broadcast::channel(1); + // let (_tx, rx) = broadcast::channel(1); -// let bucket = "dada".to_owned(); -// let opts = ListPathOptions { -// bucket, -// recursive: true, -// ..Default::default() -// }; + // let bucket = "dada".to_owned(); + // let opts = ListPathOptions { + // bucket, + // recursive: true, + // ..Default::default() + // }; -// let (sender, mut recv) = mpsc::channel(10); + // let (sender, mut recv) = mpsc::channel(10); -// store.list_merged(rx, opts, sender).await.unwrap(); + // store.list_merged(rx, opts, sender).await.unwrap(); -// while let Some(entry) = recv.recv().await { -// println!("get entry {:?}", entry.name) -// } -// } + // while let Some(entry) = recv.recv().await { + // println!("get entry {:?}", entry.name) + // } + // } -// #[tokio::test] -// async fn test_list_path() { -// let server_address = "localhost:9000"; + // #[tokio::test] + // async fn test_list_path() { + // let server_address = "localhost:9000"; -// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( -// server_address, -// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], -// ) -// .unwrap(); + // let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( + // server_address, + // vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], + // ) + // .unwrap(); -// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) -// .await -// .unwrap(); + // let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) + // .await + // .unwrap(); -// let bucket = "dada".to_owned(); -// let opts = ListPathOptions { -// bucket, -// recursive: true, -// limit: 100, + // let bucket = "dada".to_owned(); + // let opts = ListPathOptions { + // bucket, + // recursive: true, + // limit: 100, -// ..Default::default() -// }; + // ..Default::default() + // }; -// let ret = store.list_path(&opts).await.unwrap(); -// println!("ret {:?}", ret); -// } + // let ret = store.list_path(&opts).await.unwrap(); + // println!("ret {:?}", ret); + // } -// // #[tokio::test] -// // async fn test_list_objects_v2() { -// // let server_address = "localhost:9000"; + // #[tokio::test] + // async fn test_list_objects_v2() { + // let server_address = "localhost:9000"; -// // let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( -// // server_address, -// // vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], -// // ) -// // .unwrap(); + // let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( + // server_address, + // vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], + // ) + // .unwrap(); -// // let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) -// // .await -// // .unwrap(); + // let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) + // .await + // .unwrap(); -// // let ret = store.list_objects_v2("data", "", "", "", 100, false, "").await.unwrap(); -// // println!("ret {:?}", ret); -// // } -// } + // let ret = store.list_objects_v2("data", "", "", "", 100, false, "").await.unwrap(); + // println!("ret {:?}", ret); + // } + + #[tokio::test] + async fn test_walk() { + let server_address = "localhost:9000"; + + let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes( + server_address, + vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()], + ) + .unwrap(); + + let store = ECStore::new(server_address.to_string(), endpoint_pools.clone()) + .await + .unwrap(); + + ECStore::init(store.clone()).await.unwrap(); + + let (_tx, rx) = broadcast::channel(1); + + let bucket = ".rustfs.sys"; + let prefix = "config/iam/sts/"; + + let (sender, mut recv) = mpsc::channel(10); + + let opts = WalkOptions::default(); + + store.walk(rx, bucket, prefix, sender, opts).await.unwrap(); + + while let Some(entry) = recv.recv().await { + println!("get entry {:?}", entry) + } + } +} diff --git a/iam/Cargo.toml b/iam/Cargo.toml index cc9e7ceb..4b6628fc 100644 --- a/iam/Cargo.toml +++ b/iam/Cargo.toml @@ -28,6 +28,7 @@ rand.workspace = true base64-simd = "0.8.0" jsonwebtoken = "9.3.0" tracing.workspace = true +madmin.workspace = true [dev-dependencies] test-case.workspace = true diff --git a/iam/src/auth.rs b/iam/src/auth.rs index 679dd2bb..79d8eaa3 100644 --- a/iam/src/auth.rs +++ b/iam/src/auth.rs @@ -1,15 +1,25 @@ mod credentials; pub use credentials::Credentials; -pub use credentials::CredentialsBuilder; +pub use credentials::*; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] pub struct UserIdentity { pub version: i64, pub credentials: Credentials, - pub update_at: OffsetDateTime, + pub update_at: Option, +} + +impl UserIdentity { + pub fn new(credentials: Credentials) -> Self { + UserIdentity { + version: 1, + credentials, + update_at: Some(OffsetDateTime::now_utc()), + } + } } impl From for UserIdentity { @@ -17,7 +27,7 @@ impl From for UserIdentity { UserIdentity { version: 1, credentials: value, - update_at: OffsetDateTime::now_utc(), + update_at: Some(OffsetDateTime::now_utc()), } } } diff --git a/iam/src/auth/credentials.rs b/iam/src/auth/credentials.rs index e5d47140..ce33d810 100644 --- a/iam/src/auth/credentials.rs +++ b/iam/src/auth/credentials.rs @@ -1,6 +1,8 @@ use crate::policy::{Policy, Validator}; use crate::service_type::ServiceType; +use crate::utils::extract_claims; use crate::{utils, Error}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::cell::LazyCell; @@ -13,8 +15,8 @@ const ACCESS_KEY_MAX_LEN: usize = 20; const SECRET_KEY_MIN_LEN: usize = 8; const SECRET_KEY_MAX_LEN: usize = 40; -const ACCOUNT_ON: &str = "on"; -const ACCOUNT_OFF: &str = "off"; +pub const ACCOUNT_ON: &str = "on"; +pub const ACCOUNT_OFF: &str = "off"; #[cfg_attr(test, derive(PartialEq, Eq, Debug))] struct CredentialHeader { @@ -89,7 +91,7 @@ impl TryFrom<&str> for CredentialHeader { } } -#[derive(Serialize, Deserialize, Clone, Default)] +#[derive(Serialize, Deserialize, Clone, Default, Debug)] pub struct Credentials { pub access_key: String, pub secret_key: String, @@ -98,7 +100,7 @@ pub struct Credentials { pub status: String, pub parent_user: String, pub groups: Option>, - pub claims: Option>>, + pub claims: Option>, pub name: Option, pub description: Option, } @@ -109,44 +111,6 @@ impl Credentials { Self::check_key_value(header) } - pub fn get_new_credentials_with_metadata( - claims: &T, - token_secret: &str, - exp: Option, - ) -> crate::Result { - let ak = utils::gen_access_key(20).unwrap_or_default(); - let sk = utils::gen_secret_key(32).unwrap_or_default(); - - Self::create_new_credentials_with_metadata(&ak, &sk, claims, token_secret, exp) - } - - pub fn create_new_credentials_with_metadata( - ak: &str, - sk: &str, - claims: &T, - token_secret: &str, - exp: Option, - ) -> crate::Result { - if ak.len() < ACCESS_KEY_MIN_LEN || ak.len() > ACCESS_KEY_MAX_LEN { - return Err(Error::InvalidAccessKeyLength); - } - - if sk.len() < SECRET_KEY_MIN_LEN || sk.len() > SECRET_KEY_MAX_LEN { - return Err(Error::InvalidAccessKeyLength); - } - - let token = utils::generate_jwt(claims, token_secret).map_err(Error::JWTError)?; - - Ok(Self { - access_key: ak.to_owned(), - secret_key: sk.to_owned(), - session_token: token, - status: ACCOUNT_ON.to_owned(), - expiration: exp.map(|v| OffsetDateTime::now_utc().saturating_add(Duration::seconds(v as i64))), - ..Default::default() - }) - } - pub fn check_key_value(_header: CredentialHeader) -> crate::Result { todo!() } @@ -186,6 +150,50 @@ impl Credentials { } } +pub fn get_new_credentials_with_metadata( + claims: &T, + token_secret: &str, + exp: Option, +) -> crate::Result { + let ak = utils::gen_access_key(20).unwrap_or_default(); + let sk = utils::gen_secret_key(32).unwrap_or_default(); + + create_new_credentials_with_metadata(&ak, &sk, claims, token_secret, exp) +} + +pub fn create_new_credentials_with_metadata( + ak: &str, + sk: &str, + claims: &T, + token_secret: &str, + exp: Option, +) -> crate::Result { + if ak.len() < ACCESS_KEY_MIN_LEN || ak.len() > ACCESS_KEY_MAX_LEN { + return Err(Error::InvalidAccessKeyLength); + } + + if sk.len() < SECRET_KEY_MIN_LEN || sk.len() > SECRET_KEY_MAX_LEN { + return Err(Error::InvalidAccessKeyLength); + } + + let token = utils::generate_jwt(claims, token_secret).map_err(Error::JWTError)?; + + Ok(Credentials { + access_key: ak.to_owned(), + secret_key: sk.to_owned(), + session_token: token, + status: ACCOUNT_ON.to_owned(), + expiration: exp.map(|v| OffsetDateTime::now_utc().saturating_add(Duration::seconds(v as i64))), + ..Default::default() + }) +} + +pub fn get_claims_from_token_with_secret(token: &str, secret: &str) -> crate::Result { + let ms = extract_claims::(token, secret).map_err(Error::JWTError)?; + // TODO SessionPolicyName + Ok(ms.claims) +} + #[derive(Default)] pub struct CredentialsBuilder { session_policy: Option, diff --git a/iam/src/error.rs b/iam/src/error.rs index 70117183..25ae2af2 100644 --- a/iam/src/error.rs +++ b/iam/src/error.rs @@ -37,6 +37,17 @@ pub enum Error { #[error("jwt err {0}")] JWTError(jsonwebtoken::errors::Error), + + #[error("no access key")] + NoAccessKey, + + #[error("invalid token")] + InvalidToken, + + #[error("invalid access_key")] + InvalidAccessKey, + #[error("action not allowed")] + IAMActionNotAllowed, } pub type Result = std::result::Result; diff --git a/iam/src/lib.rs b/iam/src/lib.rs index 419e4510..81683b9b 100644 --- a/iam/src/lib.rs +++ b/iam/src/lib.rs @@ -3,7 +3,10 @@ use ecstore::store::ECStore; use log::debug; use manager::IamCache; use policy::{Args, Policy}; -use std::sync::{Arc, OnceLock}; +use std::{ + collections::HashMap, + sync::{Arc, OnceLock}, +}; use store::object::ObjectStore; use time::OffsetDateTime; @@ -85,6 +88,24 @@ pub async fn add_service_account(cred: Credentials) -> crate::Result crate::Result> { +pub async fn check_key(ak: &str) -> crate::Result<(Option, bool)> { + if let Some(sys_cred) = get_global_action_cred() { + if sys_cred.access_key == ak { + return Ok((Some(UserIdentity::new(sys_cred)), true)); + } + } get()?.check_key(ak).await } + +pub async fn list_users() -> crate::Result> { + get()?.get_users().await +} + +pub async fn get_user(ak: &str) -> crate::Result<(Option, bool)> { + get()?.check_key(ak).await +} + +pub async fn create_user(ak: &str, sk: &str, status: &str) -> crate::Result { + get()?.add_user(ak, sk, status).await + // notify +} diff --git a/iam/src/manager.rs b/iam/src/manager.rs index efe57c77..386893e1 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -8,7 +8,7 @@ use std::{ }; use ecstore::store_err::is_err_object_not_found; -use log::debug; +use log::{debug, warn}; use time::OffsetDateTime; use tokio::{ select, @@ -20,7 +20,7 @@ use tokio::{ use crate::{ arn::ARN, - auth::{Credentials, UserIdentity}, + auth::{self, Credentials, UserIdentity}, cache::Cache, format::Format, handler::Handler, @@ -188,7 +188,7 @@ where OffsetDateTime::now_utc(), ); - Ok(user_entiry.update_at) + Ok(user_entiry.update_at.unwrap_or(OffsetDateTime::now_utc())) } pub async fn is_allowed<'a>(&self, args: Args<'a>) -> bool { @@ -209,7 +209,7 @@ where Ok((u.clone(), None)) } - pub async fn check_key(&self, ak: &str) -> crate::Result> { + pub async fn check_key(&self, ak: &str) -> crate::Result<(Option, bool)> { let user = self .cache .users @@ -219,8 +219,14 @@ where .or_else(|| self.cache.sts_accounts.load().get(ak).cloned()); match user { - Some(u) if u.credentials.is_valid() => Ok(Some(u)), - _ => Ok(None), + Some(u) => { + if u.credentials.is_valid() { + Ok((Some(u), true)) + } else { + Ok((Some(u), false)) + } + } + _ => Ok((None, false)), } } pub async fn policy_db_get(&self, name: &str, _groups: Option>) -> crate::Result> { @@ -255,7 +261,7 @@ where self.api.save_iam_config(&user_entiry, path).await?; Cache::add_or_update( - &self.cache.users, + &self.cache.sts_accounts, &user_entiry.credentials.access_key, &user_entiry, OffsetDateTime::now_utc(), @@ -263,4 +269,84 @@ where Ok(()) } + + // returns all users (not STS or service accounts) + pub async fn get_users(&self) -> crate::Result> { + let mut m = HashMap::new(); + + let users = self.cache.users.load(); + let policies = self.cache.user_policies.load(); + let group_members = self.cache.user_group_memeberships.load(); + + for (k, v) in users.iter() { + warn!("k: {}, v: {:?}", k, v.credentials); + + if v.credentials.is_temp() || v.credentials.is_service_account() { + continue; + } + + let mut u = madmin::UserInfo { + status: if v.credentials.is_valid() { + madmin::AccountStatus::Enabled + } else { + madmin::AccountStatus::Disabled + }, + updated_at: v.update_at, + ..Default::default() + }; + + if let Some(p) = policies.get(k) { + u.policy_name = Some(p.policies.clone()); + u.updated_at = Some(p.update_at); + } + + if let Some(members) = group_members.get(k) { + u.member_of = Some(members.iter().cloned().collect()); + } + + m.insert(k.clone(), u); + } + + Ok(m) + } + + pub async fn add_user(&self, access_key: &str, secret_key: &str, status: &str) -> crate::Result { + let status = { + match status { + "disabled" => auth::ACCOUNT_ON, + auth::ACCOUNT_ON => auth::ACCOUNT_ON, + _ => auth::ACCOUNT_OFF, + } + }; + + let users = self.cache.users.load(); + if let Some(x) = users.get(access_key) { + if x.credentials.is_temp() { + return Err(crate::Error::IAMActionNotAllowed); + } + } + + let user_entiry = UserIdentity::from(Credentials { + access_key: access_key.to_string(), + secret_key: secret_key.to_string(), + status: status.to_string(), + ..Default::default() + }); + let path = format!( + "config/iam/{}{}/identity.json", + UserType::Reg.prefix(), + user_entiry.credentials.access_key + ); + debug!("save object: {path:?}"); + self.api.save_iam_config(&user_entiry, path).await?; + + Cache::add_or_update( + &self.cache.users, + &user_entiry.credentials.access_key, + &user_entiry, + OffsetDateTime::now_utc(), + ); + + Ok(user_entiry.update_at.unwrap_or(OffsetDateTime::now_utc())) + } } diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index 9290395f..fd4fadc2 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -4,12 +4,14 @@ use ecstore::{ config::error::is_err_config_not_found, store::ECStore, store_api::{ObjectIO, ObjectInfo, ObjectOptions, PutObjReader}, - utils::path::dir, - StorageAPI, + store_list_objects::{ObjectInfoOrErr, WalkOptions}, + utils::path::{dir, SLASH_SEPARATOR}, }; use futures::future::try_join_all; use log::{debug, warn}; use serde::{de::DeserializeOwned, Serialize}; +use tokio::sync::broadcast; +use tokio::sync::mpsc::{self}; use super::Store; use crate::{ @@ -31,45 +33,54 @@ impl ObjectStore { Self { object_api } } - async fn list_iam_config_items(&self, prefix: &str, items: &[&str]) -> crate::Result> { - debug!("list iam config items, prefix: {prefix}"); + async fn list_iam_config_items(&self, prefix: &str) -> crate::Result> { + debug!("list iam config items, prefix: {}", &prefix); // todo, 实现walk,使用walk - let mut futures = Vec::with_capacity(items.len()); - for item in items { - let prefix = format!("{}{}", prefix, item); - futures.push(async move { - // let items = self - // .object_api - // .clone() - // .list_path(&ListPathOptions { - // bucket: Self::BUCKET_NAME.into(), - // prefix: prefix.clone(), - // ..Default::default() - // }) - // .await; + let (ctx_tx, ctx_rx) = broadcast::channel(1); - let items = self - .object_api - .clone() - .list_objects_v2(Self::BUCKET_NAME, &prefix.clone(), None, None, 0, false, None) - .await; + // let prefix = format!("{}{}", prefix, item); - match items { - Ok(items) => Result::<_, crate::Error>::Ok(items.prefixes), - Err(e) => { - if is_err_config_not_found(&e) { - Result::<_, crate::Error>::Ok(vec![]) - } else { - Err(Error::StringError(format!("list {prefix} failed, err: {e:?}"))) - } - } - } - }); + let ctx_rx = ctx_rx.resubscribe(); + + let (tx, mut rx) = mpsc::channel::(100); + let store = self.object_api.clone(); + + let path = prefix.to_owned(); + tokio::spawn(async move { store.walk(ctx_rx, Self::BUCKET_NAME, &path, tx, WalkOptions::default()).await }); + + let mut ret = Vec::new(); + + while let Some(v) = rx.recv().await { + if let Some(err) = v.err { + warn!("list_iam_config_items {:?}", err); + let _ = ctx_tx.send(true); + + return Err(Error::EcstoreError(err)); + } + + if let Some(item) = v.item { + let name = item.name.trim_start_matches(prefix).trim_end_matches(SLASH_SEPARATOR); + ret.push(name.to_owned()); + } } + + Ok(ret) + + // match items { + // Ok(items) => Result::<_, crate::Error>::Ok(items.prefixes), + // Err(e) => { + // if is_err_config_not_found(&e) { + // Result::<_, crate::Error>::Ok(vec![]) + // } else { + // Err(Error::StringError(format!("list {prefix} failed, err: {e:?}"))) + // } + // } + // } + // TODO: FIXME: - Ok(try_join_all(futures).await?.into_iter().flat_map(|x| x.into_iter()).collect()) + // Ok(try_join_all(futures).await?.into_iter().flat_map(|x| x.into_iter()).collect()) } async fn load_policy(&self, name: &str) -> crate::Result { @@ -152,7 +163,7 @@ impl Store for ObjectStore { } async fn load_policy_docs(&self) -> crate::Result> { - let paths = self.list_iam_config_items("config/iam/", &["policies/"]).await?; + let paths = self.list_iam_config_items("config/iam/policies/").await?; let mut result = Self::get_default_policyes(); for path in paths { @@ -174,7 +185,9 @@ impl Store for ObjectStore { } async fn load_users(&self, user_type: UserType) -> crate::Result> { - let paths = self.list_iam_config_items("config/iam/", &[user_type.prefix()]).await?; + let paths = self + .list_iam_config_items(format!("{}{}", "config/iam/", user_type.prefix()).as_str()) + .await?; let mut result = HashMap::new(); for path in paths { @@ -202,21 +215,18 @@ impl Store for ObjectStore { /// load all and make a new cache. async fn load_all(&self, cache: &Cache) -> crate::Result<()> { - let items = self - .list_iam_config_items( - "config/iam/", - &[ - "policydb/", - "policies/", - "groups/", - "policydb/users/", - "policydb/groups/", - "service-accounts/", - "policydb/sts-users/", - "sts/", - ], - ) - .await?; + let _items = &[ + "policydb/", + "policies/", + "groups/", + "policydb/users/", + "policydb/groups/", + "service-accounts/", + "policydb/sts-users/", + "sts/", + ]; + + let items = self.list_iam_config_items("config/iam/").await?; debug!("all iam items: {items:?}"); let (policy_docs, users, user_policies, sts_policies, sts_accounts) = ( diff --git a/iam/src/utils.rs b/iam/src/utils.rs index f3cd2260..1297ef87 100644 --- a/iam/src/utils.rs +++ b/iam/src/utils.rs @@ -1,7 +1,7 @@ use crate::Error; -use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; +use jsonwebtoken::{encode, Algorithm, DecodingKey, EncodingKey, Header}; use rand::{Rng, RngCore}; -use serde::Serialize; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; pub fn gen_access_key(length: usize) -> crate::Result { const ALPHA_NUMERIC_TABLE: [char; 36] = [ @@ -45,6 +45,17 @@ pub fn generate_jwt(claims: &T, secret: &str) -> Result( + token: &str, + secret: &str, +) -> Result, jsonwebtoken::errors::Error> { + jsonwebtoken::decode::( + token, + &DecodingKey::from_secret(secret.as_bytes()), + &jsonwebtoken::Validation::new(Algorithm::HS512), + ) +} + #[cfg(test)] mod tests { use super::{gen_access_key, gen_secret_key}; diff --git a/madmin/src/lib.rs b/madmin/src/lib.rs index 4f8f29ed..2a9572f9 100644 --- a/madmin/src/lib.rs +++ b/madmin/src/lib.rs @@ -5,6 +5,8 @@ pub mod metrics; pub mod net; pub mod service_commands; pub mod trace; +pub mod user; pub mod utils; pub use info_commands::*; +pub use user::*; diff --git a/madmin/src/user.rs b/madmin/src/user.rs new file mode 100644 index 00000000..5e635e92 --- /dev/null +++ b/madmin/src/user.rs @@ -0,0 +1,52 @@ +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Debug, Serialize, Deserialize, Default)] +pub enum AccountStatus { + #[serde(rename = "enabled")] + Enabled, + #[serde(rename = "disabled")] + #[default] + Disabled, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum UserAuthType { + #[serde(rename = "builtin")] + Builtin, + #[serde(rename = "ldap")] + Ldap, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UserAuthInfo { + #[serde(rename = "type")] + pub auth_type: UserAuthType, + + #[serde(rename = "authServer", skip_serializing_if = "Option::is_none")] + pub auth_server: Option, + + #[serde(rename = "authServerUserID", skip_serializing_if = "Option::is_none")] + pub auth_server_user_id: Option, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct UserInfo { + #[serde(rename = "userAuthInfo", skip_serializing_if = "Option::is_none")] + pub auth_info: Option, + + #[serde(rename = "secretKey", skip_serializing_if = "Option::is_none")] + pub secret_key: Option, + + #[serde(rename = "policyName", skip_serializing_if = "Option::is_none")] + pub policy_name: Option, + + #[serde(rename = "status")] + pub status: AccountStatus, + + #[serde(rename = "memberOf", skip_serializing_if = "Option::is_none")] + pub member_of: Option>, + + #[serde(rename = "updatedAt")] + pub updated_at: Option, +} diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index ec0280e4..d4d95d92 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -20,9 +20,10 @@ use ecstore::utils::path::path_join; use ecstore::utils::xml; use ecstore::GLOBAL_Endpoints; use futures::{Stream, StreamExt}; -use http::Uri; +use http::{HeaderMap, Uri}; use hyper::StatusCode; -use iam::get_global_action_cred; +use iam::auth::{create_new_credentials_with_metadata, get_claims_from_token_with_secret}; +use iam::{auth, get_global_action_cred}; use madmin::metrics::RealtimeMetrics; use madmin::utils::parse_duration; use matchit::Params; @@ -92,11 +93,21 @@ pub struct AssumeRoleRequest { // pub parent_user: String, // } -#[derive(Debug, Serialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default)] pub struct STSClaims { - parent: String, - exp: usize, - access_key: String, + pub parent: String, + pub exp: usize, + pub access_key: String, +} + +impl STSClaims { + pub fn to_map(&self) -> HashMap { + let mut m = HashMap::new(); + m.insert("parent".to_string(), self.parent.clone()); + m.insert("exp".to_string(), self.exp.to_string()); + m.insert("access_key".to_string(), self.access_key.clone()); + m + } } fn get_token_signing_key() -> Option { @@ -107,6 +118,98 @@ fn get_token_signing_key() -> Option { } } +pub async fn check_key_valid(token: Option, ak: &str) -> S3Result<(auth::Credentials, bool)> { + let Some(mut cred) = get_global_action_cred() else { + return Err(s3_error!(InternalError, "action cred not init")); + }; + + let sys_cred = cred.clone(); + + if cred.access_key != ak { + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) }; + + match iam_store + .check_key(ak) + .await + .map_err(|_e| s3_error!(InternalError, "check key failed"))? + { + (Some(u), true) => { + cred = u.credentials; + } + (Some(u), false) => { + if u.credentials.status == "off" { + return Err(s3_error!(InvalidRequest, "ErrAccessKeyDisabled")); + } + + return Err(s3_error!(InvalidRequest, "check key failed")); + } + _ => { + return Err(s3_error!(InvalidRequest, "check key failed")); + } + } + } + + if let Some(st) = token { + let claims = check_claims_from_token(&st, &cred) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("check claims failed {}", e)))?; + cred.claims = Some(claims.to_map()); + } + + let owner = sys_cred.access_key == cred.access_key || cred.parent_user == sys_cred.access_key; + + // permitRootAccess + // SessionPolicyName + Ok((cred, owner)) +} + +pub fn check_claims_from_token(token: &str, cred: &auth::Credentials) -> S3Result { + if !token.is_empty() && cred.access_key.is_empty() { + return Err(s3_error!(InvalidRequest, "no access key")); + } + + if token.is_empty() && cred.is_temp() && !cred.is_service_account() { + return Err(s3_error!(InvalidRequest, "invalid token")); + } + + if !token.is_empty() && !cred.is_temp() { + return Err(s3_error!(InvalidRequest, "invalid token")); + } + + if !cred.is_service_account() && cred.is_temp() && token != cred.session_token { + return Err(s3_error!(InvalidRequest, "invalid token")); + } + + if cred.is_temp() || cred.is_expired() { + return Err(s3_error!(InvalidRequest, "invalid access key")); + } + + let Some(sys_cred) = get_global_action_cred() else { + return Err(s3_error!(InternalError, "action cred not init")); + }; + + let mut secret = sys_cred.secret_key; + + let mut token = token; + + if cred.is_service_account() { + token = cred.session_token.as_str(); + secret = cred.secret_key.clone(); + } + + if !token.is_empty() { + let claims: STSClaims = + get_claims_from_token_with_secret(token, &secret).map_err(|_e| s3_error!(InvalidRequest, "invalid token"))?; + return Ok(claims); + } + + Ok(STSClaims::default()) +} + +pub fn get_session_token(hds: &HeaderMap) -> Option { + hds.get("x-amz-security-token") + .map(|v| v.to_str().unwrap_or_default().to_string()) +} + pub struct AssumeRoleHandle {} #[async_trait::async_trait] impl Operation for AssumeRoleHandle { @@ -164,7 +267,7 @@ impl Operation for AssumeRoleHandle { claims.access_key = ak.clone(); - let mut cred = match iam::auth::Credentials::create_new_credentials_with_metadata(&ak, &sk, &claims, &secret, Some(exp)) { + let mut cred = match create_new_credentials_with_metadata(&ak, &sk, &claims, &secret, Some(exp)) { Ok(res) => res, Err(_er) => return Err(s3_error!(InvalidRequest, "")), }; diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index 3c961fa8..63c24895 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -1,13 +1,67 @@ +use futures::TryFutureExt; use http::StatusCode; +use iam::get_global_action_cred; use matchit::Params; -use s3s::{s3_error, Body, S3Request, S3Response, S3Result}; +use s3s::{s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result}; +use serde::Deserialize; +use serde_urlencoded::from_bytes; +use tracing::warn; -use crate::admin::router::Operation; +use crate::admin::{ + handlers::{check_key_valid, get_session_token}, + router::Operation, +}; + +#[derive(Debug, Deserialize, Default)] +pub struct AddUserQuery { + #[serde(rename = "accessKey")] + pub access_key: Option, +} pub struct AddUser {} #[async_trait::async_trait] impl Operation for AddUser { - async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle AddUser"); + let query = { + if let Some(query) = req.uri.query() { + let input: AddUserQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?; + input + } else { + AddUserQuery::default() + } + }; + + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let ak = query.access_key.as_deref().unwrap_or_default(); + + if let Some(sys_cred) = get_global_action_cred() { + if sys_cred.access_key == ak { + return Err(s3_error!(InvalidArgument, "can't create user with system access key")); + } + } + + if let (Some(user), true) = iam::get_user(ak) + .await + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("marshal users err {}", e)))? + { + if user.credentials.is_temp() || user.credentials.is_service_account() { + return Err(s3_error!(InvalidArgument, "can't create user with service account access key")); + } + } + + let token = get_session_token(&req.headers); + + let (cred, _) = check_key_valid(token, &input_cred.access_key).await?; + + if (cred.is_temp() || cred.is_service_account()) && cred.parent_user == input_cred.access_key { + return Err(s3_error!(InvalidArgument, "can't create user with service account access key")); + } + return Err(s3_error!(NotImplemented)); } } @@ -19,3 +73,35 @@ impl Operation for SetUserStatus { return Err(s3_error!(NotImplemented)); } } + +pub struct ListUsers {} +#[async_trait::async_trait] +impl Operation for ListUsers { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle ListUsers"); + let users = iam::list_users() + .await + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + + let body = serde_json::to_string(&users) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("marshal users err {}", e)))?; + Ok(S3Response::new((StatusCode::OK, Body::from(body)))) + } +} + +pub struct RemoveUser {} +#[async_trait::async_trait] +impl Operation for RemoveUser { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle RemoveUser"); + return Err(s3_error!(NotImplemented)); + } +} + +pub struct GetUserInfo {} +#[async_trait::async_trait] +impl Operation for GetUserInfo { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + return Err(s3_error!(NotImplemented)); + } +} diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 713eab9e..26d002f3 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -126,13 +126,32 @@ fn regist_user_route(r: &mut S3Router) -> Result<()> { format!("{}{}", ADMIN_PREFIX, "/v3/accountinfo").as_str(), AdminOperation(&handlers::AccountInfoHandler {}), )?; + r.insert( Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/list-users").as_str(), + AdminOperation(&user::ListUsers {}), + )?; + + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/user-info").as_str(), + AdminOperation(&user::GetUserInfo {}), + )?; + + r.insert( + Method::DELETE, + format!("{}{}", ADMIN_PREFIX, "/v3/remove-user").as_str(), + AdminOperation(&user::RemoveUser {}), + )?; + + r.insert( + Method::PUT, format!("{}{}", ADMIN_PREFIX, "/v3/add-user").as_str(), AdminOperation(&user::AddUser {}), )?; r.insert( - Method::GET, + Method::PUT, format!("{}{}", ADMIN_PREFIX, "/v3/set-user-status").as_str(), AdminOperation(&user::SetUserStatus {}), )?; diff --git a/rustfs/src/auth.rs b/rustfs/src/auth.rs index f76fdfcb..ee6678c7 100644 --- a/rustfs/src/auth.rs +++ b/rustfs/src/auth.rs @@ -1,4 +1,5 @@ use iam::cache::CacheInner; +use log::warn; use s3s::auth::S3Auth; use s3s::auth::SecretKey; use s3s::auth::SimpleAuth; @@ -27,9 +28,15 @@ impl S3Auth for IAMAuth { return Ok(key); } + warn!("Failed to get secret key from simple auth"); + if let Ok(iam_store) = iam::get() { let c = CacheInner::from(&iam_store.cache); + warn!("Failed to get secret key from simple auth, try cache {}", access_key); + warn!("users {:?}", c.users.values()); + warn!("sts_accounts {:?}", c.sts_accounts.values()); if let Some(id) = c.get_user(access_key) { + warn!("get cred {:?}", id.credentials); return Ok(SecretKey::from(id.credentials.secret_key.clone())); } }