merge dada/admin

This commit is contained in:
weisd
2025-02-24 23:40:09 +08:00
12 changed files with 201 additions and 279 deletions

109
Cargo.lock generated
View File

@@ -2999,7 +2999,7 @@ dependencies = [
"crypto",
"ecstore",
"futures",
"ipnetwork 0.21.1",
"ipnetwork",
"itertools",
"jsonwebtoken",
"lazy_static",
@@ -3258,15 +3258,6 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "ipnetwork"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e"
dependencies = [
"serde",
]
[[package]]
name = "ipnetwork"
version = "0.21.1"
@@ -3888,12 +3879,6 @@ dependencies = [
"memoffset",
]
[[package]]
name = "no-std-net"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65"
[[package]]
name = "nodrop"
version = "0.1.14"
@@ -4611,97 +4596,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "pnet"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "682396b533413cc2e009fbb48aadf93619a149d3e57defba19ff50ce0201bd0d"
dependencies = [
"ipnetwork 0.20.0",
"pnet_base",
"pnet_datalink",
"pnet_packet",
"pnet_sys",
"pnet_transport",
]
[[package]]
name = "pnet_base"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc190d4067df16af3aba49b3b74c469e611cad6314676eaf1157f31aa0fb2f7"
dependencies = [
"no-std-net",
]
[[package]]
name = "pnet_datalink"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e79e70ec0be163102a332e1d2d5586d362ad76b01cec86f830241f2b6452a7b7"
dependencies = [
"ipnetwork 0.20.0",
"libc",
"pnet_base",
"pnet_sys",
"winapi",
]
[[package]]
name = "pnet_macros"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13325ac86ee1a80a480b0bc8e3d30c25d133616112bb16e86f712dcf8a71c863"
dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 2.0.98",
]
[[package]]
name = "pnet_macros_support"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed67a952585d509dd0003049b1fc56b982ac665c8299b124b90ea2bdb3134ab"
dependencies = [
"pnet_base",
]
[[package]]
name = "pnet_packet"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c96ebadfab635fcc23036ba30a7d33a80c39e8461b8bd7dc7bb186acb96560f"
dependencies = [
"glob",
"pnet_base",
"pnet_macros",
"pnet_macros_support",
]
[[package]]
name = "pnet_sys"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d4643d3d4db6b08741050c2f3afa9a892c4244c085a72fcda93c9c2c9a00f4b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "pnet_transport"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f604d98bc2a6591cf719b58d3203fd882bdd6bf1db696c4ac97978e9f4776bf"
dependencies = [
"libc",
"pnet_base",
"pnet_packet",
"pnet_sys",
]
[[package]]
name = "png"
version = "0.17.16"
@@ -5454,7 +5348,6 @@ dependencies = [
"mime_guess",
"netif",
"pin-project-lite",
"pnet",
"prost",
"prost-build",
"prost-types",

View File

@@ -8,6 +8,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use time::macros::offset;
use time::OffsetDateTime;
const ACCESS_KEY_MIN_LEN: usize = 3;
@@ -212,10 +213,19 @@ pub fn create_new_credentials_with_metadata(
return Err(Error::new(IamError::InvalidAccessKeyLength));
}
if token_secret.is_empty() {
return Ok(Credentials {
access_key: ak.to_owned(),
secret_key: sk.to_owned(),
status: ACCOUNT_OFF.to_owned(),
..Default::default()
});
}
let expiration = {
if let Some(v) = claims.get("exp") {
if let Some(expiry) = v.as_i64() {
Some(OffsetDateTime::from_unix_timestamp(expiry)?.to_offset(OffsetDateTime::now_utc().offset()))
Some(OffsetDateTime::from_unix_timestamp(expiry)?.to_offset(offset!(+8)))
} else {
None
}

View File

@@ -4,6 +4,7 @@ use crate::{
cache::{Cache, CacheEntity},
error::{is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user, Error as IamError},
format::Format,
get_global_action_cred,
policy::{Policy, PolicyDoc, DEFAULT_POLICIES},
store::{object::IAM_CONFIG_PREFIX, GroupInfo, MappedPolicy, Store, UserType},
sys::{
@@ -1577,8 +1578,27 @@ fn set_default_canned_policies(policies: &mut HashMap<String, PolicyDoc>) {
}
}
pub fn get_token_signing_key() -> Option<String> {
if let Some(s) = get_global_action_cred() {
Some(s.secret_key.clone())
} else {
None
}
}
pub fn extract_jwt_claims(u: &UserIdentity) -> Result<HashMap<String, Value>> {
get_claims_from_token_with_secret(&u.credentials.session_token, &u.credentials.secret_key)
let Some(sys_key) = get_token_signing_key() else {
return Err(Error::msg("global active sk not init"));
};
let keys = vec![&sys_key, &u.credentials.secret_key];
for key in keys {
if let Ok(claims) = get_claims_from_token_with_secret(&u.credentials.session_token, key) {
return Ok(claims);
}
}
Err(Error::msg("unable to extract claims"))
}
fn filter_policies(cache: &Cache, policy_name: &str, bucket_name: &str) -> (String, Policy) {

View File

@@ -70,7 +70,6 @@ iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
tower-http = { version = "0.6.2", features = ["cors"] }
include_dir = "0.7.4"
pnet = "0.35.0"
mime_guess = "2.0.5"
[build-dependencies]

View File

@@ -18,12 +18,11 @@ use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::StorageAPI;
use ecstore::utils::crypto::base64_encode;
use ecstore::utils::path::path_join;
use ecstore::utils::xml;
use ecstore::GLOBAL_Endpoints;
use futures::{Stream, StreamExt};
use http::{HeaderMap, Uri};
use hyper::StatusCode;
use iam::auth::{get_claims_from_token_with_secret, get_new_credentials_with_metadata};
use iam::auth::get_claims_from_token_with_secret;
use iam::error::Error as IamError;
use iam::policy::Policy;
use iam::sys::SESSION_POLICY_NAME;
@@ -33,10 +32,7 @@ use madmin::utils::parse_duration;
use matchit::Params;
use s3s::header::CONTENT_TYPE;
use s3s::stream::{ByteStream, DynByteStream};
use s3s::{
dto::{AssumeRoleOutput, Credentials, Timestamp},
s3_error, Body, S3Error, S3Request, S3Response, S3Result,
};
use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result};
use s3s::{S3ErrorCode, StdError};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -47,7 +43,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration as std_Duration;
use time::{Duration, OffsetDateTime};
use tokio::sync::mpsc::{self};
use tokio::time::interval;
use tokio::{select, spawn};
@@ -57,32 +52,10 @@ use tracing::{error, info, warn};
pub mod group;
pub mod policy;
pub mod service_account;
pub mod sts;
pub mod trace;
pub mod user;
const ASSUME_ROLE_ACTION: &str = "AssumeRole";
const ASSUME_ROLE_VERSION: &str = "2011-06-15";
#[derive(Deserialize, Debug, Default)]
#[serde(rename_all = "PascalCase", default)]
pub struct AssumeRoleRequest {
pub action: String,
pub duration_seconds: usize,
pub version: String,
pub role_arn: String,
pub role_session_name: String,
pub policy: String,
pub external_id: String,
}
fn get_token_signing_key() -> Option<String> {
if let Some(s) = get_global_action_cred() {
Some(s.secret_key.clone())
} else {
None
}
}
// check_key_valid get auth.cred
pub async fn check_key_valid(security_token: Option<String>, ak: &str) -> S3Result<(auth::Credentials, bool)> {
let Some(mut cred) = get_global_action_cred() else {
@@ -216,114 +189,6 @@ pub fn populate_session_policy(claims: &mut HashMap<String, Value>, policy: &str
Ok(())
}
pub struct AssumeRoleHandle {}
#[async_trait::async_trait]
impl Operation for AssumeRoleHandle {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle AssumeRoleHandle");
let Some(user) = req.credentials else { return Err(s3_error!(InvalidRequest, "get cred failed")) };
let session_token = get_session_token(&req.headers);
if session_token.is_some() {
return Err(s3_error!(InvalidRequest, "AccessDenied1"));
}
let (cred, _owner) = check_key_valid(session_token, &user.access_key).await?;
// // TODO: 判断权限, 不允许sts访问
if cred.is_temp() || cred.is_service_account() {
return Err(s3_error!(InvalidRequest, "AccessDenied"));
}
let mut input = req.input;
let bytes = match input.store_all_unlimited().await {
Ok(b) => b,
Err(e) => {
warn!("get body failed, e: {:?}", e);
return Err(s3_error!(InvalidRequest, "get body failed"));
}
};
let body: AssumeRoleRequest = from_bytes(&bytes).map_err(|_e| s3_error!(InvalidRequest, "get body failed"))?;
if body.action.as_str() != ASSUME_ROLE_ACTION {
return Err(s3_error!(InvalidArgument, "not suport action"));
}
if body.version.as_str() != ASSUME_ROLE_VERSION {
return Err(s3_error!(InvalidArgument, "not suport version"));
}
let mut claims = cred.claims.unwrap_or_default();
populate_session_policy(&mut claims, &body.policy)?;
let exp = {
if body.duration_seconds > 0 {
body.duration_seconds
} else {
3600
}
};
claims.insert(
"exp".to_string(),
serde_json::Value::Number(serde_json::Number::from(OffsetDateTime::now_utc().unix_timestamp() + exp as i64)),
);
claims.insert("parent".to_string(), serde_json::Value::String(cred.access_key.clone()));
// warn!("AssumeRole get cred {:?}", &user);
// warn!("AssumeRole get body {:?}", &body);
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
if let Err(_err) = iam_store.policy_db_get(&cred.access_key, &cred.groups).await {
return Err(s3_error!(InvalidArgument, "invalid policy arg"));
}
let Some(secret) = get_token_signing_key() else {
return Err(s3_error!(InvalidArgument, "global active sk not init"));
};
info!("AssumeRole get claims {:?}", &claims);
let mut new_cred = get_new_credentials_with_metadata(&claims, &secret)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("get new cred failed {}", e)))?;
new_cred.parent_user = cred.access_key.clone();
info!("AssumeRole get new_cred {:?}", &new_cred);
if let Err(_err) = iam_store.set_temp_user(&new_cred.access_key, &new_cred, None).await {
return Err(s3_error!(InternalError, "set_temp_user failed"));
}
// TODO: globalSiteReplicationSys
let resp = AssumeRoleOutput {
credentials: Some(Credentials {
access_key_id: new_cred.access_key,
expiration: Timestamp::from(
new_cred
.expiration
.unwrap_or(OffsetDateTime::now_utc().saturating_add(Duration::seconds(3600))),
),
secret_access_key: new_cred.secret_key,
session_token: new_cred.session_token,
}),
..Default::default()
};
// getAssumeRoleCredentials
let output = xml::serialize::<AssumeRoleOutput>(&resp).unwrap();
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}
#[derive(Debug, Serialize, Default)]
#[serde(rename_all = "PascalCase", default)]
pub struct AccountInfo {

View File

@@ -0,0 +1,139 @@
use crate::admin::{
handlers::{check_key_valid, get_session_token, populate_session_policy},
router::Operation,
};
use ecstore::utils::xml;
use http::StatusCode;
use iam::{auth::get_new_credentials_with_metadata, manager::get_token_signing_key};
use matchit::Params;
use s3s::{
dto::{AssumeRoleOutput, Credentials, Timestamp},
s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result,
};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use time::{Duration, OffsetDateTime};
use tracing::{info, warn};
const ASSUME_ROLE_ACTION: &str = "AssumeRole";
const ASSUME_ROLE_VERSION: &str = "2011-06-15";
#[derive(Deserialize, Debug, Default)]
#[serde(rename_all = "PascalCase", default)]
pub struct AssumeRoleRequest {
pub action: String,
pub duration_seconds: usize,
pub version: String,
pub role_arn: String,
pub role_session_name: String,
pub policy: String,
pub external_id: String,
}
pub struct AssumeRoleHandle {}
#[async_trait::async_trait]
impl Operation for AssumeRoleHandle {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle AssumeRoleHandle");
let Some(user) = req.credentials else { return Err(s3_error!(InvalidRequest, "get cred failed")) };
let session_token = get_session_token(&req.headers);
if session_token.is_some() {
return Err(s3_error!(InvalidRequest, "AccessDenied1"));
}
let (cred, _owner) = check_key_valid(session_token, &user.access_key).await?;
// // TODO: 判断权限, 不允许sts访问
if cred.is_temp() || cred.is_service_account() {
return Err(s3_error!(InvalidRequest, "AccessDenied"));
}
let mut input = req.input;
let bytes = match input.store_all_unlimited().await {
Ok(b) => b,
Err(e) => {
warn!("get body failed, e: {:?}", e);
return Err(s3_error!(InvalidRequest, "get body failed"));
}
};
let body: AssumeRoleRequest = from_bytes(&bytes).map_err(|_e| s3_error!(InvalidRequest, "get body failed"))?;
if body.action.as_str() != ASSUME_ROLE_ACTION {
return Err(s3_error!(InvalidArgument, "not suport action"));
}
if body.version.as_str() != ASSUME_ROLE_VERSION {
return Err(s3_error!(InvalidArgument, "not suport version"));
}
let mut claims = cred.claims.unwrap_or_default();
populate_session_policy(&mut claims, &body.policy)?;
let exp = {
if body.duration_seconds > 0 {
body.duration_seconds
} else {
3600
}
};
claims.insert(
"exp".to_string(),
serde_json::Value::Number(serde_json::Number::from(OffsetDateTime::now_utc().unix_timestamp() + exp as i64)),
);
claims.insert("parent".to_string(), serde_json::Value::String(cred.access_key.clone()));
// warn!("AssumeRole get cred {:?}", &user);
// warn!("AssumeRole get body {:?}", &body);
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
if let Err(_err) = iam_store.policy_db_get(&cred.access_key, &cred.groups).await {
return Err(s3_error!(InvalidArgument, "invalid policy arg"));
}
let Some(secret) = get_token_signing_key() else {
return Err(s3_error!(InvalidArgument, "global active sk not init"));
};
info!("AssumeRole get claims {:?}", &claims);
let mut new_cred = get_new_credentials_with_metadata(&claims, &secret)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("get new cred failed {}", e)))?;
new_cred.parent_user = cred.access_key.clone();
info!("AssumeRole get new_cred {:?}", &new_cred);
if let Err(_err) = iam_store.set_temp_user(&new_cred.access_key, &new_cred, None).await {
return Err(s3_error!(InternalError, "set_temp_user failed"));
}
// TODO: globalSiteReplicationSys
let resp = AssumeRoleOutput {
credentials: Some(Credentials {
access_key_id: new_cred.access_key,
expiration: Timestamp::from(
new_cred
.expiration
.unwrap_or(OffsetDateTime::now_utc().saturating_add(Duration::seconds(3600))),
),
secret_access_key: new_cred.secret_key,
session_token: new_cred.session_token,
}),
..Default::default()
};
// getAssumeRoleCredentials
let output = xml::serialize::<AssumeRoleOutput>(&resp).unwrap();
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}

View File

@@ -7,7 +7,7 @@ use common::error::Result;
use handlers::{
group, policy,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
user,
sts, user,
};
use hyper::Method;
use router::{AdminOperation, S3Router};
@@ -19,7 +19,7 @@ pub fn make_admin_route() -> Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new();
// 1
r.insert(Method::POST, "/", AdminOperation(&handlers::AssumeRoleHandle {}))?;
r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?;
regist_user_route(&mut r)?;

View File

@@ -1,4 +1,3 @@
use log::warn;
use s3s::auth::S3Auth;
use s3s::auth::SecretKey;
use s3s::auth::SimpleAuth;
@@ -27,11 +26,8 @@ impl S3Auth for IAMAuth {
return Ok(key);
}
warn!("Failed to get secret key from simple auth");
if let Ok(iam_store) = iam::get() {
if let Some(id) = iam_store.get_user(access_key).await {
warn!("get cred {:?}", id.credentials);
return Ok(SecretKey::from(id.credentials.secret_key.clone()));
}
}

View File

@@ -8,9 +8,7 @@ use axum::{
use include_dir::{include_dir, Dir};
use mime_guess::from_path;
use pnet::datalink::interfaces;
use serde::Serialize;
use std::net::SocketAddr;
static STATIC_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/static");
@@ -111,36 +109,36 @@ async fn config_handler(axum::extract::Extension(fs_addr): axum::extract::Extens
pub async fn start_static_file_server(addrs: &str, fs_addr: &str) {
// 将字符串解析为 SocketAddr
let socket_addr: SocketAddr = fs_addr.parse().unwrap();
// let socket_addr: SocketAddr = fs_addr.parse().unwrap();
// 提取 IP 地址和端口号
let mut src_ip = socket_addr.ip();
let port = socket_addr.port();
// // 提取 IP 地址和端口号
// let mut src_ip = socket_addr.ip();
// let port = socket_addr.port();
if src_ip.to_string() == "0.0.0.0" {
for iface in interfaces() {
if iface.is_loopback() || !iface.is_up() {
continue;
}
for ip in iface.ips {
if ip.is_ipv4() {
src_ip = ip.ip();
}
}
}
}
// if src_ip.to_string() == "0.0.0.0" {
// for iface in interfaces() {
// if iface.is_loopback() || !iface.is_up() {
// continue;
// }
// for ip in iface.ips {
// if ip.is_ipv4() {
// src_ip = ip.ip();
// }
// }
// }
// }
// FIXME: TODO: protocol from config
let s3_url = format!("http://{}:{}", src_ip, port);
// // FIXME: TODO: protocol from config
// let s3_url = format!("http://{}:{}", src_ip, port);
// 创建路由
let app = Router::new()
.route("/config.json", get(config_handler).layer(axum::extract::Extension(s3_url.clone())))
.route("/config.json", get(config_handler).layer(axum::extract::Extension(fs_addr.to_owned())))
.nest_service("/", get(static_handler));
let listener = tokio::net::TcpListener::bind(addrs).await.unwrap();
println!("console running on: http://{} with s3 api {}", listener.local_addr().unwrap(), s3_url);
println!("console running on: http://{} with s3 api {}", listener.local_addr().unwrap(), fs_addr);
axum::serve(listener, app).await.unwrap();
}

View File

@@ -234,7 +234,7 @@ async fn run(opt: config::Opt) -> Result<()> {
if opt.console_enable {
info!("console is enabled");
tokio::spawn(async move {
console::start_static_file_server(&opt.console_address, server_address.as_str()).await;
console::start_static_file_server(&opt.console_address, &opt.server_endpoint).await;
});
}

1
rustfs/static/readme.md Normal file
View File

@@ -0,0 +1 @@
console static path, do not delete

View File

@@ -24,10 +24,11 @@ export RUSTFS_VOLUMES="./target/volume/test"
export RUSTFS_ADDRESS="0.0.0.0:9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002"
export RUSTFS_SERVER_ENDPOINT="http://localhost:9000"
if [ -n "$1" ]; then
export RUSTFS_VOLUMES="$1"
fi
./target/debug/rustfs
cargo run --bin rustfs