Merge pull request #205 from rustfs/feat/admin-api

add walk api
This commit is contained in:
weisd
2025-01-12 22:51:39 +08:00
committed by GitHub
19 changed files with 1097 additions and 348 deletions

5
Cargo.lock generated
View File

@@ -1243,6 +1243,7 @@ dependencies = [
"itertools 0.14.0",
"jsonwebtoken",
"log",
"madmin",
"rand",
"serde",
"serde_json",
@@ -2508,7 +2509,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "s3s"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=3291c0ca0284971569499cbe75bd69ef7bde8321#3291c0ca0284971569499cbe75bd69ef7bde8321"
source = "git+https://github.com/Nugine/s3s.git?rev=05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8#05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8"
dependencies = [
"arrayvec",
"async-trait",
@@ -2555,7 +2556,7 @@ dependencies = [
[[package]]
name = "s3s-policy"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=3291c0ca0284971569499cbe75bd69ef7bde8321#3291c0ca0284971569499cbe75bd69ef7bde8321"
source = "git+https://github.com/Nugine/s3s.git?rev=05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8#05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8"
dependencies = [
"indexmap 2.6.0",
"serde",

View File

@@ -65,10 +65,10 @@ protos = { path = "./common/protos" }
rand = "0.8.5"
rmp = "0.8.14"
rmp-serde = "1.3.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3291c0ca0284971569499cbe75bd69ef7bde8321", default-features = true, features = [
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8", default-features = true, features = [
"tower",
] }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "3291c0ca0284971569499cbe75bd69ef7bde8321" }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "05efc3f87e9f5d1a6c9760c79cf1f70dcdd100c8" }
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.134"
tempfile = "3.13.0"

View File

@@ -7,9 +7,13 @@ pub fn decode_tags(tags: &str) -> Vec<Tag> {
let mut list = Vec::new();
for (k, v) in values {
if k.is_empty() || v.is_empty() {
continue;
}
list.push(Tag {
key: k.to_string(),
value: v.to_string(),
key: Some(k.to_string()),
value: Some(v.to_string()),
});
}
@@ -20,7 +24,9 @@ pub fn encode_tags(tags: Vec<Tag>) -> String {
let mut encoded = form_urlencoded::Serializer::new(String::new());
for tag in tags.iter() {
encoded.append_pair(tag.key.as_str(), tag.value.as_str());
if let (Some(k), Some(v)) = (tag.key.as_ref(), tag.value.as_ref()) {
encoded.append_pair(k.as_str(), v.as_str());
}
}
encoded.finish()

View File

@@ -54,7 +54,7 @@ impl Clone for ListPathRawOptions {
}
pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -> Result<()> {
info!("list_path_raw");
// println!("list_path_raw {},{}", &opts.bucket, &opts.path);
if opts.disks.is_empty() {
info!("list_path_raw 0 drives provided");
return Err(Error::from_string("list_path_raw: 0 drives provided"));
@@ -283,11 +283,9 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
}
info!("read entry should heal: {}", current.name);
if let Some(partial_fn) = opts.partial.as_ref() {
partial_fn(MetaCacheEntries(top_entries), &errs).await;
}
// break;
}
Ok(())
});

View File

@@ -1,3 +1,5 @@
use crate::bucket::metadata_sys::get_versioning_config;
use crate::bucket::versioning::VersioningApi;
use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions};
use crate::disk::error::{is_all_not_found, is_all_volume_not_found, is_err_eof, DiskError};
use crate::disk::{
@@ -9,10 +11,10 @@ use crate::file_meta::merge_file_meta_versions;
use crate::peer::is_reserved_or_invalid_bucket;
use crate::set_disk::SetDisks;
use crate::store::check_list_objs_args;
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::store_api::{FileInfo, ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::store_err::{is_err_bucket_not_found, to_object_err, StorageError};
use crate::utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR};
use crate::StorageAPI;
use crate::{bucket, StorageAPI};
use crate::{store::ECStore, store_api::ListObjectsV2Info};
use futures::future::join_all;
use rand::seq::SliceRandom;
@@ -22,7 +24,7 @@ use std::io::ErrorKind;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::error;
use tracing::{error, warn};
use uuid::Uuid;
const MAX_OBJECT_LIST: i32 = 1000;
@@ -245,7 +247,7 @@ impl ECStore {
..Default::default()
};
// warn!("list_objects_generic opts {:?}", &opts);
warn!("list_objects_generic opts {:?}", &opts);
// use get
if !opts.prefix.is_empty() && opts.limit == 1 && opts.marker.is_none() {
@@ -669,6 +671,287 @@ impl ECStore {
Ok(Vec::new())
}
pub async fn walk(
self: Arc<Self>,
rx: B_Receiver<bool>,
bucket: &str,
prefix: &str,
result: Sender<ObjectInfoOrErr>,
opts: WalkOptions,
) -> Result<()> {
check_list_objs_args(bucket, prefix, &None)?;
let mut futures = Vec::new();
let mut inputs = Vec::new();
for eset in self.pools.iter() {
for set in eset.disk_set.iter() {
let (mut disks, infos, _) = set.get_online_disks_with_healing_and_info(true).await;
let rx = rx.resubscribe();
let opts = opts.clone();
let (sender, list_out_rx) = mpsc::channel::<MetaCacheEntry>(1);
inputs.push(list_out_rx);
futures.push(async move {
let mut ask_disks = get_list_quorum(&opts.ask_disks, set.set_drive_count as i32);
if ask_disks == -1 {
let new_disks = get_quorum_disks(&disks, &infos, (disks.len() + 1) / 2);
if !new_disks.is_empty() {
disks = new_disks;
} else {
ask_disks = get_list_quorum("strict", set.set_drive_count as i32);
}
}
if set.set_drive_count == 4 || ask_disks > disks.len() as i32 {
ask_disks = disks.len() as i32;
}
let fallback_disks = {
if ask_disks > 0 && disks.len() > ask_disks as usize {
let mut rand = thread_rng();
disks.shuffle(&mut rand);
disks.split_off(ask_disks as usize)
} else {
Vec::new()
}
};
let listing_quorum = ((ask_disks + 1) / 2) as usize;
let resolver = MetadataResolutionParams {
dir_quorum: listing_quorum,
obj_quorum: listing_quorum,
bucket: bucket.to_owned(),
..Default::default()
};
let path = base_dir_from_prefix(prefix);
let mut filter_prefix = {
prefix
.trim_start_matches(&path)
.trim_start_matches(SLASH_SEPARATOR)
.trim_end_matches(SLASH_SEPARATOR)
.to_owned()
};
if filter_prefix == path {
filter_prefix = "".to_owned();
}
// let (sender, rx1) = mpsc::channel(100);
let tx1 = sender.clone();
let tx2 = sender.clone();
list_path_raw(
rx.resubscribe(),
ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: bucket.to_owned(),
path,
recursice: true,
filter_prefix: Some(filter_prefix),
forward_to: opts.marker.clone(),
min_disks: listing_quorum,
per_disk_limit: opts.limit as i32,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
Box::pin({
let value = tx1.clone();
async move {
if entry.is_dir() {
return;
}
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<Error>]| {
Box::pin({
let value = tx2.clone();
let resolver = resolver.clone();
async move {
if let Ok(Some(entry)) = entries.resolve(resolver) {
if let Err(err) = value.send(entry).await {
error!("list_path send fail {:?}", err);
}
}
}
})
})),
finished: None,
..Default::default()
},
)
.await
});
}
}
let (merge_tx, mut merge_rx) = mpsc::channel::<MetaCacheEntry>(100);
let bucket = bucket.to_owned();
let vcf = match get_versioning_config(&bucket).await {
Ok((res, _)) => Some(res),
Err(_) => None,
};
tokio::spawn(async move {
let mut sent_err = false;
while let Some(entry) = merge_rx.recv().await {
if opts.latest_only {
let fi = match entry.to_fileinfo(&bucket) {
Ok(res) => res,
Err(err) => {
if !sent_err {
let item = ObjectInfoOrErr {
item: None,
err: Some(err),
};
if let Err(err) = result.send(item).await {
error!("walk result send err {:?}", err);
}
sent_err = true;
return;
}
continue;
}
};
if let Some(fiter) = opts.filter {
if fiter(&fi) {
let item = ObjectInfoOrErr {
item: Some(fi.to_object_info(&bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
})),
err: None,
};
if let Err(err) = result.send(item).await {
error!("walk result send err {:?}", err);
}
}
} else {
let item = ObjectInfoOrErr {
item: Some(fi.to_object_info(&bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
})),
err: None,
};
if let Err(err) = result.send(item).await {
error!("walk result send err {:?}", err);
}
}
continue;
}
let fvs = match entry.file_info_versions(&bucket) {
Ok(res) => res,
Err(err) => {
let item = ObjectInfoOrErr {
item: None,
err: Some(err),
};
if let Err(err) = result.send(item).await {
error!("walk result send err {:?}", err);
}
return;
}
};
if opts.versions_sort == WalkVersionsSortOrder::Ascending {
//TODO: SORT
}
for fi in fvs.versions.iter() {
if let Some(fiter) = opts.filter {
if fiter(fi) {
let item = ObjectInfoOrErr {
item: Some(fi.to_object_info(&bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
})),
err: None,
};
if let Err(err) = result.send(item).await {
error!("walk result send err {:?}", err);
}
}
} else {
let item = ObjectInfoOrErr {
item: Some(fi.to_object_info(&bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
})),
err: None,
};
if let Err(err) = result.send(item).await {
error!("walk result send err {:?}", err);
}
}
}
}
});
tokio::spawn(async move { merge_entry_channels(rx, inputs, merge_tx, 1).await });
join_all(futures).await;
Ok(())
}
}
type WalkFilter = fn(&FileInfo) -> bool;
#[derive(Clone, Default)]
pub struct WalkOptions {
pub filter: Option<WalkFilter>, // return WalkFilter returns 'true/false'
pub marker: Option<String>, // set to skip until this object
pub latest_only: bool, // returns only latest versions for all matching objects
pub ask_disks: String, // dictates how many disks are being listed
pub versions_sort: WalkVersionsSortOrder, // sort order for versions of the same object; default: Ascending order in ModTime
pub limit: usize, // maximum number of items, 0 means no limit
}
#[derive(Clone, Default, PartialEq, Eq)]
pub enum WalkVersionsSortOrder {
#[default]
Ascending,
Descending,
}
#[derive(Debug)]
pub struct ObjectInfoOrErr {
pub item: Option<ObjectInfo>,
pub err: Option<Error>,
}
async fn gather_results(
@@ -1092,260 +1375,294 @@ fn calc_common_counter(infos: &[DiskInfo], read_quorum: usize) -> u64 {
// list_path_raw
// #[cfg(test)]
// mod test {
// use std::sync::Arc;
#[cfg(test)]
mod test {
use std::sync::Arc;
// use crate::cache_value::metacache_set::list_path_raw;
// use crate::cache_value::metacache_set::ListPathRawOptions;
// use crate::disk::endpoint::Endpoint;
// use crate::disk::error::is_err_eof;
// use crate::disk::format::FormatV3;
// use crate::disk::new_disk;
// use crate::disk::DiskAPI;
// use crate::disk::DiskOption;
// use crate::disk::MetaCacheEntries;
// use crate::disk::MetaCacheEntry;
// use crate::disk::WalkDirOptions;
// use crate::endpoints::EndpointServerPools;
// use crate::error::Error;
// use crate::metacache::writer::MetacacheReader;
// use crate::set_disk::SetDisks;
// use crate::store::ECStore;
// use crate::store_list_objects::ListPathOptions;
// use futures::future::join_all;
// use lock::namespace_lock::NsLockMap;
// use tokio::sync::broadcast;
// use tokio::sync::mpsc;
// use tokio::sync::RwLock;
// use uuid::Uuid;
use crate::cache_value::metacache_set::list_path_raw;
use crate::cache_value::metacache_set::ListPathRawOptions;
use crate::disk::endpoint::Endpoint;
use crate::disk::error::is_err_eof;
use crate::disk::format::FormatV3;
use crate::disk::new_disk;
use crate::disk::DiskAPI;
use crate::disk::DiskOption;
use crate::disk::MetaCacheEntries;
use crate::disk::MetaCacheEntry;
use crate::disk::WalkDirOptions;
use crate::endpoints::EndpointServerPools;
use crate::error::Error;
use crate::metacache::writer::MetacacheReader;
use crate::set_disk::SetDisks;
use crate::store::ECStore;
use crate::store_list_objects::ListPathOptions;
use crate::store_list_objects::WalkOptions;
use crate::store_list_objects::WalkVersionsSortOrder;
use futures::future::join_all;
use lock::namespace_lock::NsLockMap;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use uuid::Uuid;
// #[tokio::test]
// async fn test_walk_dir() {
// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap();
// ep.pool_idx = 0;
// ep.set_idx = 0;
// ep.disk_idx = 0;
// ep.is_local = true;
// #[tokio::test]
// async fn test_walk_dir() {
// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap();
// ep.pool_idx = 0;
// ep.set_idx = 0;
// ep.disk_idx = 0;
// ep.is_local = true;
// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail");
// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail");
// // let disk = match LocalDisk::new(&ep, false).await {
// // Ok(res) => res,
// // Err(err) => {
// // println!("LocalDisk::new err {:?}", err);
// // return;
// // }
// // };
// // let disk = match LocalDisk::new(&ep, false).await {
// // Ok(res) => res,
// // Err(err) => {
// // println!("LocalDisk::new err {:?}", err);
// // return;
// // }
// // };
// let (rd, mut wr) = tokio::io::duplex(64);
// let (rd, mut wr) = tokio::io::duplex(64);
// let job = tokio::spawn(async move {
// let opts = WalkDirOptions {
// bucket: "dada".to_owned(),
// base_dir: "".to_owned(),
// recursive: true,
// ..Default::default()
// };
// let job = tokio::spawn(async move {
// let opts = WalkDirOptions {
// bucket: "dada".to_owned(),
// base_dir: "".to_owned(),
// recursive: true,
// ..Default::default()
// };
// println!("walk opts {:?}", opts);
// if let Err(err) = disk.walk_dir(opts, &mut wr).await {
// println!("walk_dir err {:?}", err);
// }
// });
// println!("walk opts {:?}", opts);
// if let Err(err) = disk.walk_dir(opts, &mut wr).await {
// println!("walk_dir err {:?}", err);
// }
// });
// let job2 = tokio::spawn(async move {
// let mut mrd = MetacacheReader::new(rd);
// let job2 = tokio::spawn(async move {
// let mut mrd = MetacacheReader::new(rd);
// loop {
// match mrd.peek().await {
// Ok(res) => {
// if let Some(info) = res {
// println!("info {:?}", info.name)
// } else {
// break;
// }
// }
// Err(err) => {
// if is_err_eof(&err) {
// break;
// }
// loop {
// match mrd.peek().await {
// Ok(res) => {
// if let Some(info) = res {
// println!("info {:?}", info.name)
// } else {
// break;
// }
// }
// Err(err) => {
// if is_err_eof(&err) {
// break;
// }
// println!("get err {:?}", err);
// break;
// }
// }
// }
// });
// join_all(vec![job, job2]).await;
// }
// println!("get err {:?}", err);
// break;
// }
// }
// }
// });
// join_all(vec![job, job2]).await;
// }
// #[tokio::test]
// async fn test_list_path_raw() {
// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap();
// ep.pool_idx = 0;
// ep.set_idx = 0;
// ep.disk_idx = 0;
// ep.is_local = true;
// #[tokio::test]
// async fn test_list_path_raw() {
// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap();
// ep.pool_idx = 0;
// ep.set_idx = 0;
// ep.disk_idx = 0;
// ep.is_local = true;
// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail");
// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail");
// // let disk = match LocalDisk::new(&ep, false).await {
// // Ok(res) => res,
// // Err(err) => {
// // println!("LocalDisk::new err {:?}", err);
// // return;
// // }
// // };
// // let disk = match LocalDisk::new(&ep, false).await {
// // Ok(res) => res,
// // Err(err) => {
// // println!("LocalDisk::new err {:?}", err);
// // return;
// // }
// // };
// let (_, rx) = broadcast::channel(1);
// let bucket = "dada".to_owned();
// let forward_to = None;
// let disks = vec![Some(disk)];
// let fallback_disks = Vec::new();
// let (_, rx) = broadcast::channel(1);
// let bucket = "dada".to_owned();
// let forward_to = None;
// let disks = vec![Some(disk)];
// let fallback_disks = Vec::new();
// list_path_raw(
// rx,
// ListPathRawOptions {
// disks,
// fallback_disks,
// bucket,
// path: "".to_owned(),
// recursice: true,
// forward_to,
// min_disks: 1,
// report_not_found: false,
// agreed: Some(Box::new(move |entry: MetaCacheEntry| {
// Box::pin(async move { println!("get entry: {}", entry.name) })
// })),
// partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<Error>]| {
// Box::pin(async move { println!("get entries: {:?}", entries) })
// })),
// finished: None,
// ..Default::default()
// },
// )
// .await
// .unwrap();
// }
// list_path_raw(
// rx,
// ListPathRawOptions {
// disks,
// fallback_disks,
// bucket,
// path: "".to_owned(),
// recursice: true,
// forward_to,
// min_disks: 1,
// report_not_found: false,
// agreed: Some(Box::new(move |entry: MetaCacheEntry| {
// Box::pin(async move { println!("get entry: {}", entry.name) })
// })),
// partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<Error>]| {
// Box::pin(async move { println!("get entries: {:?}", entries) })
// })),
// finished: None,
// ..Default::default()
// },
// )
// .await
// .unwrap();
// }
// #[tokio::test]
// async fn test_set_list_path() {
// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap();
// ep.pool_idx = 0;
// ep.set_idx = 0;
// ep.disk_idx = 0;
// ep.is_local = true;
// #[tokio::test]
// async fn test_set_list_path() {
// let mut ep = Endpoint::try_from("/Users/weisd/project/weisd/s3-rustfs/target/volume/test").unwrap();
// ep.pool_idx = 0;
// ep.set_idx = 0;
// ep.disk_idx = 0;
// ep.is_local = true;
// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail");
// let _ = disk.set_disk_id(Some(Uuid::new_v4())).await;
// let disk = new_disk(&ep, &DiskOption::default()).await.expect("init disk fail");
// let _ = disk.set_disk_id(Some(Uuid::new_v4())).await;
// let set = SetDisks {
// lockers: Vec::new(),
// locker_owner: String::new(),
// ns_mutex: Arc::new(RwLock::new(NsLockMap::new(false))),
// disks: RwLock::new(vec![Some(disk)]),
// set_endpoints: Vec::new(),
// set_drive_count: 1,
// default_parity_count: 0,
// set_index: 0,
// pool_index: 0,
// format: FormatV3::new(1, 1),
// };
// let set = SetDisks {
// lockers: Vec::new(),
// locker_owner: String::new(),
// ns_mutex: Arc::new(RwLock::new(NsLockMap::new(false))),
// disks: RwLock::new(vec![Some(disk)]),
// set_endpoints: Vec::new(),
// set_drive_count: 1,
// default_parity_count: 0,
// set_index: 0,
// pool_index: 0,
// format: FormatV3::new(1, 1),
// };
// let (_tx, rx) = broadcast::channel(1);
// let (_tx, rx) = broadcast::channel(1);
// let bucket = "dada".to_owned();
// let bucket = "dada".to_owned();
// let opts = ListPathOptions {
// bucket,
// recursive: true,
// ..Default::default()
// };
// let opts = ListPathOptions {
// bucket,
// recursive: true,
// ..Default::default()
// };
// let (sender, mut recv) = mpsc::channel(10);
// let (sender, mut recv) = mpsc::channel(10);
// set.list_path(rx, opts, sender).await.unwrap();
// set.list_path(rx, opts, sender).await.unwrap();
// while let Some(entry) = recv.recv().await {
// println!("get entry {:?}", entry.name)
// }
// }
// while let Some(entry) = recv.recv().await {
// println!("get entry {:?}", entry.name)
// }
// }
// #[tokio::test]
// async fn test_list_merged() {
// let server_address = "localhost:9000";
// #[tokio::test]
//walk() {
// let server_address = "localhost:9000";
// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
// server_address,
// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
// )
// .unwrap();
// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
// server_address,
// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
// )
// .unwrap();
// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
// .await
// .unwrap();
// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
// .await
// .unwrap();
// let (_tx, rx) = broadcast::channel(1);
// let (_tx, rx) = broadcast::channel(1);
// let bucket = "dada".to_owned();
// let opts = ListPathOptions {
// bucket,
// recursive: true,
// ..Default::default()
// };
// let bucket = "dada".to_owned();
// let opts = ListPathOptions {
// bucket,
// recursive: true,
// ..Default::default()
// };
// let (sender, mut recv) = mpsc::channel(10);
// let (sender, mut recv) = mpsc::channel(10);
// store.list_merged(rx, opts, sender).await.unwrap();
// store.list_merged(rx, opts, sender).await.unwrap();
// while let Some(entry) = recv.recv().await {
// println!("get entry {:?}", entry.name)
// }
// }
// while let Some(entry) = recv.recv().await {
// println!("get entry {:?}", entry.name)
// }
// }
// #[tokio::test]
// async fn test_list_path() {
// let server_address = "localhost:9000";
// #[tokio::test]
// async fn test_list_path() {
// let server_address = "localhost:9000";
// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
// server_address,
// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
// )
// .unwrap();
// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
// server_address,
// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
// )
// .unwrap();
// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
// .await
// .unwrap();
// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
// .await
// .unwrap();
// let bucket = "dada".to_owned();
// let opts = ListPathOptions {
// bucket,
// recursive: true,
// limit: 100,
// let bucket = "dada".to_owned();
// let opts = ListPathOptions {
// bucket,
// recursive: true,
// limit: 100,
// ..Default::default()
// };
// ..Default::default()
// };
// let ret = store.list_path(&opts).await.unwrap();
// println!("ret {:?}", ret);
// }
// let ret = store.list_path(&opts).await.unwrap();
// println!("ret {:?}", ret);
// }
// // #[tokio::test]
// // async fn test_list_objects_v2() {
// // let server_address = "localhost:9000";
// #[tokio::test]
// async fn test_list_objects_v2() {
// let server_address = "localhost:9000";
// // let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
// // server_address,
// // vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
// // )
// // .unwrap();
// let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
// server_address,
// vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
// )
// .unwrap();
// // let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
// // .await
// // .unwrap();
// let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
// .await
// .unwrap();
// // let ret = store.list_objects_v2("data", "", "", "", 100, false, "").await.unwrap();
// // println!("ret {:?}", ret);
// // }
// }
// let ret = store.list_objects_v2("data", "", "", "", 100, false, "").await.unwrap();
// println!("ret {:?}", ret);
// }
#[tokio::test]
async fn test_walk() {
let server_address = "localhost:9000";
let (endpoint_pools, _setup_type) = EndpointServerPools::from_volumes(
server_address,
vec!["/Users/weisd/project/weisd/s3-rustfs/target/volume/test".to_string()],
)
.unwrap();
let store = ECStore::new(server_address.to_string(), endpoint_pools.clone())
.await
.unwrap();
ECStore::init(store.clone()).await.unwrap();
let (_tx, rx) = broadcast::channel(1);
let bucket = ".rustfs.sys";
let prefix = "config/iam/sts/";
let (sender, mut recv) = mpsc::channel(10);
let opts = WalkOptions::default();
store.walk(rx, bucket, prefix, sender, opts).await.unwrap();
while let Some(entry) = recv.recv().await {
println!("get entry {:?}", entry)
}
}
}

View File

@@ -28,6 +28,7 @@ rand.workspace = true
base64-simd = "0.8.0"
jsonwebtoken = "9.3.0"
tracing.workspace = true
madmin.workspace = true
[dev-dependencies]
test-case.workspace = true

View File

@@ -1,15 +1,25 @@
mod credentials;
pub use credentials::Credentials;
pub use credentials::CredentialsBuilder;
pub use credentials::*;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct UserIdentity {
pub version: i64,
pub credentials: Credentials,
pub update_at: OffsetDateTime,
pub update_at: Option<OffsetDateTime>,
}
impl UserIdentity {
pub fn new(credentials: Credentials) -> Self {
UserIdentity {
version: 1,
credentials,
update_at: Some(OffsetDateTime::now_utc()),
}
}
}
impl From<Credentials> for UserIdentity {
@@ -17,7 +27,7 @@ impl From<Credentials> for UserIdentity {
UserIdentity {
version: 1,
credentials: value,
update_at: OffsetDateTime::now_utc(),
update_at: Some(OffsetDateTime::now_utc()),
}
}
}

View File

@@ -1,6 +1,8 @@
use crate::policy::{Policy, Validator};
use crate::service_type::ServiceType;
use crate::utils::extract_claims;
use crate::{utils, Error};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::cell::LazyCell;
@@ -13,8 +15,8 @@ const ACCESS_KEY_MAX_LEN: usize = 20;
const SECRET_KEY_MIN_LEN: usize = 8;
const SECRET_KEY_MAX_LEN: usize = 40;
const ACCOUNT_ON: &str = "on";
const ACCOUNT_OFF: &str = "off";
pub const ACCOUNT_ON: &str = "on";
pub const ACCOUNT_OFF: &str = "off";
#[cfg_attr(test, derive(PartialEq, Eq, Debug))]
struct CredentialHeader {
@@ -89,7 +91,7 @@ impl TryFrom<&str> for CredentialHeader {
}
}
#[derive(Serialize, Deserialize, Clone, Default)]
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct Credentials {
pub access_key: String,
pub secret_key: String,
@@ -98,7 +100,7 @@ pub struct Credentials {
pub status: String,
pub parent_user: String,
pub groups: Option<Vec<String>>,
pub claims: Option<HashMap<String, Vec<String>>>,
pub claims: Option<HashMap<String, String>>,
pub name: Option<String>,
pub description: Option<String>,
}
@@ -109,44 +111,6 @@ impl Credentials {
Self::check_key_value(header)
}
pub fn get_new_credentials_with_metadata<T: Serialize>(
claims: &T,
token_secret: &str,
exp: Option<usize>,
) -> crate::Result<Self> {
let ak = utils::gen_access_key(20).unwrap_or_default();
let sk = utils::gen_secret_key(32).unwrap_or_default();
Self::create_new_credentials_with_metadata(&ak, &sk, claims, token_secret, exp)
}
pub fn create_new_credentials_with_metadata<T: Serialize>(
ak: &str,
sk: &str,
claims: &T,
token_secret: &str,
exp: Option<usize>,
) -> crate::Result<Self> {
if ak.len() < ACCESS_KEY_MIN_LEN || ak.len() > ACCESS_KEY_MAX_LEN {
return Err(Error::InvalidAccessKeyLength);
}
if sk.len() < SECRET_KEY_MIN_LEN || sk.len() > SECRET_KEY_MAX_LEN {
return Err(Error::InvalidAccessKeyLength);
}
let token = utils::generate_jwt(claims, token_secret).map_err(Error::JWTError)?;
Ok(Self {
access_key: ak.to_owned(),
secret_key: sk.to_owned(),
session_token: token,
status: ACCOUNT_ON.to_owned(),
expiration: exp.map(|v| OffsetDateTime::now_utc().saturating_add(Duration::seconds(v as i64))),
..Default::default()
})
}
pub fn check_key_value(_header: CredentialHeader) -> crate::Result<Self> {
todo!()
}
@@ -186,6 +150,50 @@ impl Credentials {
}
}
pub fn get_new_credentials_with_metadata<T: Serialize>(
claims: &T,
token_secret: &str,
exp: Option<usize>,
) -> crate::Result<Credentials> {
let ak = utils::gen_access_key(20).unwrap_or_default();
let sk = utils::gen_secret_key(32).unwrap_or_default();
create_new_credentials_with_metadata(&ak, &sk, claims, token_secret, exp)
}
pub fn create_new_credentials_with_metadata<T: Serialize>(
ak: &str,
sk: &str,
claims: &T,
token_secret: &str,
exp: Option<usize>,
) -> crate::Result<Credentials> {
if ak.len() < ACCESS_KEY_MIN_LEN || ak.len() > ACCESS_KEY_MAX_LEN {
return Err(Error::InvalidAccessKeyLength);
}
if sk.len() < SECRET_KEY_MIN_LEN || sk.len() > SECRET_KEY_MAX_LEN {
return Err(Error::InvalidAccessKeyLength);
}
let token = utils::generate_jwt(claims, token_secret).map_err(Error::JWTError)?;
Ok(Credentials {
access_key: ak.to_owned(),
secret_key: sk.to_owned(),
session_token: token,
status: ACCOUNT_ON.to_owned(),
expiration: exp.map(|v| OffsetDateTime::now_utc().saturating_add(Duration::seconds(v as i64))),
..Default::default()
})
}
pub fn get_claims_from_token_with_secret<T: DeserializeOwned>(token: &str, secret: &str) -> crate::Result<T> {
let ms = extract_claims::<T>(token, secret).map_err(Error::JWTError)?;
// TODO SessionPolicyName
Ok(ms.claims)
}
#[derive(Default)]
pub struct CredentialsBuilder {
session_policy: Option<Policy>,

View File

@@ -37,6 +37,17 @@ pub enum Error {
#[error("jwt err {0}")]
JWTError(jsonwebtoken::errors::Error),
#[error("no access key")]
NoAccessKey,
#[error("invalid token")]
InvalidToken,
#[error("invalid access_key")]
InvalidAccessKey,
#[error("action not allowed")]
IAMActionNotAllowed,
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -3,7 +3,10 @@ use ecstore::store::ECStore;
use log::debug;
use manager::IamCache;
use policy::{Args, Policy};
use std::sync::{Arc, OnceLock};
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use store::object::ObjectStore;
use time::OffsetDateTime;
@@ -85,6 +88,24 @@ pub async fn add_service_account(cred: Credentials) -> crate::Result<OffsetDateT
get()?.add_service_account(cred).await
}
pub async fn check_key(ak: &str) -> crate::Result<Option<UserIdentity>> {
pub async fn check_key(ak: &str) -> crate::Result<(Option<UserIdentity>, bool)> {
if let Some(sys_cred) = get_global_action_cred() {
if sys_cred.access_key == ak {
return Ok((Some(UserIdentity::new(sys_cred)), true));
}
}
get()?.check_key(ak).await
}
pub async fn list_users() -> crate::Result<HashMap<String, madmin::UserInfo>> {
get()?.get_users().await
}
pub async fn get_user(ak: &str) -> crate::Result<(Option<UserIdentity>, bool)> {
get()?.check_key(ak).await
}
pub async fn create_user(ak: &str, sk: &str, status: &str) -> crate::Result<OffsetDateTime> {
get()?.add_user(ak, sk, status).await
// notify
}

View File

@@ -8,7 +8,7 @@ use std::{
};
use ecstore::store_err::is_err_object_not_found;
use log::debug;
use log::{debug, warn};
use time::OffsetDateTime;
use tokio::{
select,
@@ -20,7 +20,7 @@ use tokio::{
use crate::{
arn::ARN,
auth::{Credentials, UserIdentity},
auth::{self, Credentials, UserIdentity},
cache::Cache,
format::Format,
handler::Handler,
@@ -188,7 +188,7 @@ where
OffsetDateTime::now_utc(),
);
Ok(user_entiry.update_at)
Ok(user_entiry.update_at.unwrap_or(OffsetDateTime::now_utc()))
}
pub async fn is_allowed<'a>(&self, args: Args<'a>) -> bool {
@@ -209,7 +209,7 @@ where
Ok((u.clone(), None))
}
pub async fn check_key(&self, ak: &str) -> crate::Result<Option<UserIdentity>> {
pub async fn check_key(&self, ak: &str) -> crate::Result<(Option<UserIdentity>, bool)> {
let user = self
.cache
.users
@@ -219,8 +219,14 @@ where
.or_else(|| self.cache.sts_accounts.load().get(ak).cloned());
match user {
Some(u) if u.credentials.is_valid() => Ok(Some(u)),
_ => Ok(None),
Some(u) => {
if u.credentials.is_valid() {
Ok((Some(u), true))
} else {
Ok((Some(u), false))
}
}
_ => Ok((None, false)),
}
}
pub async fn policy_db_get(&self, name: &str, _groups: Option<Vec<String>>) -> crate::Result<Vec<String>> {
@@ -255,7 +261,7 @@ where
self.api.save_iam_config(&user_entiry, path).await?;
Cache::add_or_update(
&self.cache.users,
&self.cache.sts_accounts,
&user_entiry.credentials.access_key,
&user_entiry,
OffsetDateTime::now_utc(),
@@ -263,4 +269,84 @@ where
Ok(())
}
// returns all users (not STS or service accounts)
pub async fn get_users(&self) -> crate::Result<HashMap<String, madmin::UserInfo>> {
let mut m = HashMap::new();
let users = self.cache.users.load();
let policies = self.cache.user_policies.load();
let group_members = self.cache.user_group_memeberships.load();
for (k, v) in users.iter() {
warn!("k: {}, v: {:?}", k, v.credentials);
if v.credentials.is_temp() || v.credentials.is_service_account() {
continue;
}
let mut u = madmin::UserInfo {
status: if v.credentials.is_valid() {
madmin::AccountStatus::Enabled
} else {
madmin::AccountStatus::Disabled
},
updated_at: v.update_at,
..Default::default()
};
if let Some(p) = policies.get(k) {
u.policy_name = Some(p.policies.clone());
u.updated_at = Some(p.update_at);
}
if let Some(members) = group_members.get(k) {
u.member_of = Some(members.iter().cloned().collect());
}
m.insert(k.clone(), u);
}
Ok(m)
}
pub async fn add_user(&self, access_key: &str, secret_key: &str, status: &str) -> crate::Result<OffsetDateTime> {
let status = {
match status {
"disabled" => auth::ACCOUNT_ON,
auth::ACCOUNT_ON => auth::ACCOUNT_ON,
_ => auth::ACCOUNT_OFF,
}
};
let users = self.cache.users.load();
if let Some(x) = users.get(access_key) {
if x.credentials.is_temp() {
return Err(crate::Error::IAMActionNotAllowed);
}
}
let user_entiry = UserIdentity::from(Credentials {
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
status: status.to_string(),
..Default::default()
});
let path = format!(
"config/iam/{}{}/identity.json",
UserType::Reg.prefix(),
user_entiry.credentials.access_key
);
debug!("save object: {path:?}");
self.api.save_iam_config(&user_entiry, path).await?;
Cache::add_or_update(
&self.cache.users,
&user_entiry.credentials.access_key,
&user_entiry,
OffsetDateTime::now_utc(),
);
Ok(user_entiry.update_at.unwrap_or(OffsetDateTime::now_utc()))
}
}

View File

@@ -4,12 +4,14 @@ use ecstore::{
config::error::is_err_config_not_found,
store::ECStore,
store_api::{ObjectIO, ObjectInfo, ObjectOptions, PutObjReader},
utils::path::dir,
StorageAPI,
store_list_objects::{ObjectInfoOrErr, WalkOptions},
utils::path::{dir, SLASH_SEPARATOR},
};
use futures::future::try_join_all;
use log::{debug, warn};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::broadcast;
use tokio::sync::mpsc::{self};
use super::Store;
use crate::{
@@ -31,45 +33,54 @@ impl ObjectStore {
Self { object_api }
}
async fn list_iam_config_items(&self, prefix: &str, items: &[&str]) -> crate::Result<Vec<String>> {
debug!("list iam config items, prefix: {prefix}");
async fn list_iam_config_items(&self, prefix: &str) -> crate::Result<Vec<String>> {
debug!("list iam config items, prefix: {}", &prefix);
// todo, 实现walk使用walk
let mut futures = Vec::with_capacity(items.len());
for item in items {
let prefix = format!("{}{}", prefix, item);
futures.push(async move {
// let items = self
// .object_api
// .clone()
// .list_path(&ListPathOptions {
// bucket: Self::BUCKET_NAME.into(),
// prefix: prefix.clone(),
// ..Default::default()
// })
// .await;
let (ctx_tx, ctx_rx) = broadcast::channel(1);
let items = self
.object_api
.clone()
.list_objects_v2(Self::BUCKET_NAME, &prefix.clone(), None, None, 0, false, None)
.await;
// let prefix = format!("{}{}", prefix, item);
match items {
Ok(items) => Result::<_, crate::Error>::Ok(items.prefixes),
Err(e) => {
if is_err_config_not_found(&e) {
Result::<_, crate::Error>::Ok(vec![])
} else {
Err(Error::StringError(format!("list {prefix} failed, err: {e:?}")))
}
}
}
});
let ctx_rx = ctx_rx.resubscribe();
let (tx, mut rx) = mpsc::channel::<ObjectInfoOrErr>(100);
let store = self.object_api.clone();
let path = prefix.to_owned();
tokio::spawn(async move { store.walk(ctx_rx, Self::BUCKET_NAME, &path, tx, WalkOptions::default()).await });
let mut ret = Vec::new();
while let Some(v) = rx.recv().await {
if let Some(err) = v.err {
warn!("list_iam_config_items {:?}", err);
let _ = ctx_tx.send(true);
return Err(Error::EcstoreError(err));
}
if let Some(item) = v.item {
let name = item.name.trim_start_matches(prefix).trim_end_matches(SLASH_SEPARATOR);
ret.push(name.to_owned());
}
}
Ok(ret)
// match items {
// Ok(items) => Result::<_, crate::Error>::Ok(items.prefixes),
// Err(e) => {
// if is_err_config_not_found(&e) {
// Result::<_, crate::Error>::Ok(vec![])
// } else {
// Err(Error::StringError(format!("list {prefix} failed, err: {e:?}")))
// }
// }
// }
// TODO: FIXME:
Ok(try_join_all(futures).await?.into_iter().flat_map(|x| x.into_iter()).collect())
// Ok(try_join_all(futures).await?.into_iter().flat_map(|x| x.into_iter()).collect())
}
async fn load_policy(&self, name: &str) -> crate::Result<PolicyDoc> {
@@ -152,7 +163,7 @@ impl Store for ObjectStore {
}
async fn load_policy_docs(&self) -> crate::Result<HashMap<String, PolicyDoc>> {
let paths = self.list_iam_config_items("config/iam/", &["policies/"]).await?;
let paths = self.list_iam_config_items("config/iam/policies/").await?;
let mut result = Self::get_default_policyes();
for path in paths {
@@ -174,7 +185,9 @@ impl Store for ObjectStore {
}
async fn load_users(&self, user_type: UserType) -> crate::Result<HashMap<String, UserIdentity>> {
let paths = self.list_iam_config_items("config/iam/", &[user_type.prefix()]).await?;
let paths = self
.list_iam_config_items(format!("{}{}", "config/iam/", user_type.prefix()).as_str())
.await?;
let mut result = HashMap::new();
for path in paths {
@@ -202,21 +215,18 @@ impl Store for ObjectStore {
/// load all and make a new cache.
async fn load_all(&self, cache: &Cache) -> crate::Result<()> {
let items = self
.list_iam_config_items(
"config/iam/",
&[
"policydb/",
"policies/",
"groups/",
"policydb/users/",
"policydb/groups/",
"service-accounts/",
"policydb/sts-users/",
"sts/",
],
)
.await?;
let _items = &[
"policydb/",
"policies/",
"groups/",
"policydb/users/",
"policydb/groups/",
"service-accounts/",
"policydb/sts-users/",
"sts/",
];
let items = self.list_iam_config_items("config/iam/").await?;
debug!("all iam items: {items:?}");
let (policy_docs, users, user_policies, sts_policies, sts_accounts) = (

View File

@@ -1,7 +1,7 @@
use crate::Error;
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
use jsonwebtoken::{encode, Algorithm, DecodingKey, EncodingKey, Header};
use rand::{Rng, RngCore};
use serde::Serialize;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
pub fn gen_access_key(length: usize) -> crate::Result<String> {
const ALPHA_NUMERIC_TABLE: [char; 36] = [
@@ -45,6 +45,17 @@ pub fn generate_jwt<T: Serialize>(claims: &T, secret: &str) -> Result<String, js
encode(&header, &claims, &EncodingKey::from_secret(secret.as_bytes()))
}
pub fn extract_claims<T: DeserializeOwned>(
token: &str,
secret: &str,
) -> Result<jsonwebtoken::TokenData<T>, jsonwebtoken::errors::Error> {
jsonwebtoken::decode::<T>(
token,
&DecodingKey::from_secret(secret.as_bytes()),
&jsonwebtoken::Validation::new(Algorithm::HS512),
)
}
#[cfg(test)]
mod tests {
use super::{gen_access_key, gen_secret_key};

View File

@@ -5,6 +5,8 @@ pub mod metrics;
pub mod net;
pub mod service_commands;
pub mod trace;
pub mod user;
pub mod utils;
pub use info_commands::*;
pub use user::*;

52
madmin/src/user.rs Normal file
View File

@@ -0,0 +1,52 @@
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Serialize, Deserialize, Default)]
pub enum AccountStatus {
#[serde(rename = "enabled")]
Enabled,
#[serde(rename = "disabled")]
#[default]
Disabled,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum UserAuthType {
#[serde(rename = "builtin")]
Builtin,
#[serde(rename = "ldap")]
Ldap,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UserAuthInfo {
#[serde(rename = "type")]
pub auth_type: UserAuthType,
#[serde(rename = "authServer", skip_serializing_if = "Option::is_none")]
pub auth_server: Option<String>,
#[serde(rename = "authServerUserID", skip_serializing_if = "Option::is_none")]
pub auth_server_user_id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct UserInfo {
#[serde(rename = "userAuthInfo", skip_serializing_if = "Option::is_none")]
pub auth_info: Option<UserAuthInfo>,
#[serde(rename = "secretKey", skip_serializing_if = "Option::is_none")]
pub secret_key: Option<String>,
#[serde(rename = "policyName", skip_serializing_if = "Option::is_none")]
pub policy_name: Option<String>,
#[serde(rename = "status")]
pub status: AccountStatus,
#[serde(rename = "memberOf", skip_serializing_if = "Option::is_none")]
pub member_of: Option<Vec<String>>,
#[serde(rename = "updatedAt")]
pub updated_at: Option<OffsetDateTime>,
}

View File

@@ -20,9 +20,10 @@ use ecstore::utils::path::path_join;
use ecstore::utils::xml;
use ecstore::GLOBAL_Endpoints;
use futures::{Stream, StreamExt};
use http::Uri;
use http::{HeaderMap, Uri};
use hyper::StatusCode;
use iam::get_global_action_cred;
use iam::auth::{create_new_credentials_with_metadata, get_claims_from_token_with_secret};
use iam::{auth, get_global_action_cred};
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;
use matchit::Params;
@@ -92,11 +93,21 @@ pub struct AssumeRoleRequest {
// pub parent_user: String,
// }
#[derive(Debug, Serialize, Default)]
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct STSClaims {
parent: String,
exp: usize,
access_key: String,
pub parent: String,
pub exp: usize,
pub access_key: String,
}
impl STSClaims {
pub fn to_map(&self) -> HashMap<String, String> {
let mut m = HashMap::new();
m.insert("parent".to_string(), self.parent.clone());
m.insert("exp".to_string(), self.exp.to_string());
m.insert("access_key".to_string(), self.access_key.clone());
m
}
}
fn get_token_signing_key() -> Option<String> {
@@ -107,6 +118,98 @@ fn get_token_signing_key() -> Option<String> {
}
}
pub async fn check_key_valid(token: Option<String>, ak: &str) -> S3Result<(auth::Credentials, bool)> {
let Some(mut cred) = get_global_action_cred() else {
return Err(s3_error!(InternalError, "action cred not init"));
};
let sys_cred = cred.clone();
if cred.access_key != ak {
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
match iam_store
.check_key(ak)
.await
.map_err(|_e| s3_error!(InternalError, "check key failed"))?
{
(Some(u), true) => {
cred = u.credentials;
}
(Some(u), false) => {
if u.credentials.status == "off" {
return Err(s3_error!(InvalidRequest, "ErrAccessKeyDisabled"));
}
return Err(s3_error!(InvalidRequest, "check key failed"));
}
_ => {
return Err(s3_error!(InvalidRequest, "check key failed"));
}
}
}
if let Some(st) = token {
let claims = check_claims_from_token(&st, &cred)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("check claims failed {}", e)))?;
cred.claims = Some(claims.to_map());
}
let owner = sys_cred.access_key == cred.access_key || cred.parent_user == sys_cred.access_key;
// permitRootAccess
// SessionPolicyName
Ok((cred, owner))
}
pub fn check_claims_from_token(token: &str, cred: &auth::Credentials) -> S3Result<STSClaims> {
if !token.is_empty() && cred.access_key.is_empty() {
return Err(s3_error!(InvalidRequest, "no access key"));
}
if token.is_empty() && cred.is_temp() && !cred.is_service_account() {
return Err(s3_error!(InvalidRequest, "invalid token"));
}
if !token.is_empty() && !cred.is_temp() {
return Err(s3_error!(InvalidRequest, "invalid token"));
}
if !cred.is_service_account() && cred.is_temp() && token != cred.session_token {
return Err(s3_error!(InvalidRequest, "invalid token"));
}
if cred.is_temp() || cred.is_expired() {
return Err(s3_error!(InvalidRequest, "invalid access key"));
}
let Some(sys_cred) = get_global_action_cred() else {
return Err(s3_error!(InternalError, "action cred not init"));
};
let mut secret = sys_cred.secret_key;
let mut token = token;
if cred.is_service_account() {
token = cred.session_token.as_str();
secret = cred.secret_key.clone();
}
if !token.is_empty() {
let claims: STSClaims =
get_claims_from_token_with_secret(token, &secret).map_err(|_e| s3_error!(InvalidRequest, "invalid token"))?;
return Ok(claims);
}
Ok(STSClaims::default())
}
pub fn get_session_token(hds: &HeaderMap) -> Option<String> {
hds.get("x-amz-security-token")
.map(|v| v.to_str().unwrap_or_default().to_string())
}
pub struct AssumeRoleHandle {}
#[async_trait::async_trait]
impl Operation for AssumeRoleHandle {
@@ -164,7 +267,7 @@ impl Operation for AssumeRoleHandle {
claims.access_key = ak.clone();
let mut cred = match iam::auth::Credentials::create_new_credentials_with_metadata(&ak, &sk, &claims, &secret, Some(exp)) {
let mut cred = match create_new_credentials_with_metadata(&ak, &sk, &claims, &secret, Some(exp)) {
Ok(res) => res,
Err(_er) => return Err(s3_error!(InvalidRequest, "")),
};

View File

@@ -1,13 +1,67 @@
use futures::TryFutureExt;
use http::StatusCode;
use iam::get_global_action_cred;
use matchit::Params;
use s3s::{s3_error, Body, S3Request, S3Response, S3Result};
use s3s::{s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use tracing::warn;
use crate::admin::router::Operation;
use crate::admin::{
handlers::{check_key_valid, get_session_token},
router::Operation,
};
#[derive(Debug, Deserialize, Default)]
pub struct AddUserQuery {
#[serde(rename = "accessKey")]
pub access_key: Option<String>,
}
pub struct AddUser {}
#[async_trait::async_trait]
impl Operation for AddUser {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle AddUser");
let query = {
if let Some(query) = req.uri.query() {
let input: AddUserQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?;
input
} else {
AddUserQuery::default()
}
};
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let ak = query.access_key.as_deref().unwrap_or_default();
if let Some(sys_cred) = get_global_action_cred() {
if sys_cred.access_key == ak {
return Err(s3_error!(InvalidArgument, "can't create user with system access key"));
}
}
if let (Some(user), true) = iam::get_user(ak)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("marshal users err {}", e)))?
{
if user.credentials.is_temp() || user.credentials.is_service_account() {
return Err(s3_error!(InvalidArgument, "can't create user with service account access key"));
}
}
let token = get_session_token(&req.headers);
let (cred, _) = check_key_valid(token, &input_cred.access_key).await?;
if (cred.is_temp() || cred.is_service_account()) && cred.parent_user == input_cred.access_key {
return Err(s3_error!(InvalidArgument, "can't create user with service account access key"));
}
return Err(s3_error!(NotImplemented));
}
}
@@ -19,3 +73,35 @@ impl Operation for SetUserStatus {
return Err(s3_error!(NotImplemented));
}
}
pub struct ListUsers {}
#[async_trait::async_trait]
impl Operation for ListUsers {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ListUsers");
let users = iam::list_users()
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let body = serde_json::to_string(&users)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("marshal users err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::from(body))))
}
}
pub struct RemoveUser {}
#[async_trait::async_trait]
impl Operation for RemoveUser {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RemoveUser");
return Err(s3_error!(NotImplemented));
}
}
pub struct GetUserInfo {}
#[async_trait::async_trait]
impl Operation for GetUserInfo {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
return Err(s3_error!(NotImplemented));
}
}

View File

@@ -126,13 +126,32 @@ fn regist_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
format!("{}{}", ADMIN_PREFIX, "/v3/accountinfo").as_str(),
AdminOperation(&handlers::AccountInfoHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-users").as_str(),
AdminOperation(&user::ListUsers {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/user-info").as_str(),
AdminOperation(&user::GetUserInfo {}),
)?;
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-user").as_str(),
AdminOperation(&user::RemoveUser {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/add-user").as_str(),
AdminOperation(&user::AddUser {}),
)?;
r.insert(
Method::GET,
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-user-status").as_str(),
AdminOperation(&user::SetUserStatus {}),
)?;

View File

@@ -1,4 +1,5 @@
use iam::cache::CacheInner;
use log::warn;
use s3s::auth::S3Auth;
use s3s::auth::SecretKey;
use s3s::auth::SimpleAuth;
@@ -27,9 +28,15 @@ impl S3Auth for IAMAuth {
return Ok(key);
}
warn!("Failed to get secret key from simple auth");
if let Ok(iam_store) = iam::get() {
let c = CacheInner::from(&iam_store.cache);
warn!("Failed to get secret key from simple auth, try cache {}", access_key);
warn!("users {:?}", c.users.values());
warn!("sts_accounts {:?}", c.sts_accounts.values());
if let Some(id) = c.get_user(access_key) {
warn!("get cred {:?}", id.credentials);
return Ok(SecretKey::from(id.credentials.secret_key.clone()));
}
}