feat: add IAM export/import

- Add complete IAM configuration export/import functionality with ZIP format
- Add new API endpoints for IAM backup and restore operations
This commit is contained in:
weisd
2025-06-30 14:15:51 +08:00
committed by weisd
parent 8218070ebc
commit e9610f1fb7
11 changed files with 1060 additions and 11 deletions

2
Cargo.lock generated
View File

@@ -8221,6 +8221,7 @@ dependencies = [
"protos",
"query",
"regex",
"reqwest",
"rmp-serde",
"rust-embed",
"rustfs-config",
@@ -8254,6 +8255,7 @@ dependencies = [
"transform-stream",
"urlencoding",
"uuid",
"zip",
]
[[package]]

View File

@@ -195,7 +195,7 @@ s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713
scopeguard = "1.2.0"
shadow-rs = { version = "1.1.1", default-features = false }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_json ={version = "1.0.140", features = ["raw_value"]}
serde-xml-rs = "0.8.1"
serde_urlencoded = "0.7.1"
sha1 = "0.10.6"
@@ -247,6 +247,7 @@ uuid = { version = "1.17.0", features = [
wildmatch = { version = "2.4.0", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "2.2.0"
[profile.wasm-dev]
inherits = "dev"

View File

@@ -17,7 +17,7 @@ async-compression = { version = "0.4.0", features = [
"xz",
] }
async_zip = { version = "0.0.17", features = ["tokio"] }
zip = "2.2.0"
zip = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
tokio-tar = { workspace = true }

View File

@@ -5,6 +5,7 @@ use crate::error::{Error, Result};
use crate::manager::IamCache;
use crate::manager::extract_jwt_claims;
use crate::manager::get_default_policyes;
use crate::store::GroupInfo;
use crate::store::MappedPolicy;
use crate::store::Store;
use crate::store::UserType;
@@ -59,6 +60,10 @@ impl<T: Store> IamSys<T> {
self.store.group_notification_handler(name).await
}
pub async fn load_groups(&self, m: &mut HashMap<String, GroupInfo>) -> Result<()> {
self.store.api.load_groups(m).await
}
pub async fn load_policy(&self, name: &str) -> Result<()> {
self.store.policy_notification_handler(name).await
}
@@ -73,6 +78,11 @@ impl<T: Store> IamSys<T> {
self.store.user_notification_handler(name, user_type).await
}
pub async fn load_users(&self, user_type: UserType, m: &mut HashMap<String, UserIdentity>) -> Result<()> {
self.store.api.load_users(user_type, m).await?;
Ok(())
}
pub async fn load_service_account(&self, name: &str) -> Result<()> {
self.store.user_notification_handler(name, UserType::Svc).await
}
@@ -106,6 +116,15 @@ impl<T: Store> IamSys<T> {
})
}
pub async fn load_mapped_policys(
&self,
user_type: UserType,
is_group: bool,
m: &mut HashMap<String, MappedPolicy>,
) -> Result<()> {
self.store.api.load_mapped_policys(user_type, is_group, m).await
}
pub async fn list_polices(&self, bucket_name: &str) -> Result<HashMap<String, Policy>> {
self.store.list_polices(bucket_name).await
}

View File

@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::value::RawValue;
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -262,6 +263,194 @@ pub struct AccountAccess {
pub write: bool,
}
/// SRSessionPolicy - represents a session policy to be replicated.
#[derive(Debug, Clone)]
pub struct SRSessionPolicy(Option<Box<RawValue>>);
impl SRSessionPolicy {
pub fn new() -> Self {
SRSessionPolicy(None)
}
pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
if json == "null" {
Ok(SRSessionPolicy(None))
} else {
let raw_value = serde_json::from_str(json)?;
Ok(SRSessionPolicy(Some(raw_value)))
}
}
pub fn is_null(&self) -> bool {
self.0.is_none()
}
pub fn as_str(&self) -> Option<&str> {
self.0.as_ref().map(|v| v.get())
}
}
impl Default for SRSessionPolicy {
fn default() -> Self {
Self::new()
}
}
impl PartialEq for SRSessionPolicy {
fn eq(&self, other: &Self) -> bool {
self.0.as_ref().map(|v| v.get()) == other.0.as_ref().map(|v| v.get())
}
}
impl Serialize for SRSessionPolicy {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match &self.0 {
Some(raw_value) => raw_value.serialize(serializer),
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for SRSessionPolicy {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let raw_value: Option<Box<RawValue>> = Option::deserialize(deserializer)?;
Ok(SRSessionPolicy(raw_value))
}
}
/// SRSvcAccCreate - create operation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SRSvcAccCreate {
pub parent: String,
#[serde(rename = "accessKey")]
pub access_key: String,
#[serde(rename = "secretKey")]
pub secret_key: String,
pub groups: Vec<String>,
pub claims: HashMap<String, serde_json::Value>,
#[serde(rename = "sessionPolicy")]
pub session_policy: SRSessionPolicy,
pub status: String,
pub name: String,
pub description: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub expiration: Option<OffsetDateTime>,
#[serde(rename = "apiVersion", skip_serializing_if = "Option::is_none")]
pub api_version: Option<String>,
}
/// ImportIAMResult - represents the structure iam import response
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ImportIAMResult {
/// Skipped entries while import
/// This could be due to groups, policies etc missing for
/// imported entries. We dont fail hard in this case and
pub skipped: IAMEntities,
/// Removed entries - this mostly happens for policies
/// where empty might be getting imported and that's invalid
pub removed: IAMEntities,
/// Newly added entries
pub added: IAMEntities,
/// Failed entries while import. This would have details of
/// failed entities with respective errors
pub failed: IAMErrEntities,
}
/// IAMEntities - represents different IAM entities
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct IAMEntities {
/// List of policy names
pub policies: Vec<String>,
/// List of user names
pub users: Vec<String>,
/// List of group names
pub groups: Vec<String>,
/// List of Service Account names
#[serde(rename = "serviceAccounts")]
pub service_accounts: Vec<String>,
/// List of user policies, each entry in map represents list of policies
/// applicable to the user
#[serde(rename = "userPolicies")]
pub user_policies: Vec<HashMap<String, Vec<String>>>,
/// List of group policies, each entry in map represents list of policies
/// applicable to the group
#[serde(rename = "groupPolicies")]
pub group_policies: Vec<HashMap<String, Vec<String>>>,
/// List of STS policies, each entry in map represents list of policies
/// applicable to the STS
#[serde(rename = "stsPolicies")]
pub sts_policies: Vec<HashMap<String, Vec<String>>>,
}
/// IAMErrEntities - represents errored out IAM entries while import with error
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct IAMErrEntities {
/// List of errored out policies with errors
pub policies: Vec<IAMErrEntity>,
/// List of errored out users with errors
pub users: Vec<IAMErrEntity>,
/// List of errored out groups with errors
pub groups: Vec<IAMErrEntity>,
/// List of errored out service accounts with errors
#[serde(rename = "serviceAccounts")]
pub service_accounts: Vec<IAMErrEntity>,
/// List of errored out user policies with errors
#[serde(rename = "userPolicies")]
pub user_policies: Vec<IAMErrPolicyEntity>,
/// List of errored out group policies with errors
#[serde(rename = "groupPolicies")]
pub group_policies: Vec<IAMErrPolicyEntity>,
/// List of errored out STS policies with errors
#[serde(rename = "stsPolicies")]
pub sts_policies: Vec<IAMErrPolicyEntity>,
}
/// IAMErrEntity - represents an errored IAM entity with error details
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IAMErrEntity {
pub name: String,
pub error: String,
}
/// IAMErrPolicyEntity - represents an errored policy entity with error details
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IAMErrPolicyEntity {
pub name: String,
pub policies: Vec<String>,
pub error: String,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -56,6 +56,7 @@ pin-project-lite.workspace = true
protos.workspace = true
query = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
rmp-serde.workspace = true
rustfs-config = { workspace = true, features = ["constants", "notify"] }
rustfs-notify = { workspace = true }
@@ -98,6 +99,7 @@ rustfs-rio.workspace = true
base64 = { workspace = true }
hmac = { workspace = true }
sha2 = { workspace = true }
zip = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
libsystemd.workspace = true

View File

@@ -4,17 +4,26 @@ use crate::{
};
use ecstore::global::get_global_action_cred;
use http::{HeaderMap, StatusCode};
use madmin::{AccountStatus, AddOrUpdateUserReq};
use iam::store::UserType;
use madmin::{AccountStatus, AddOrUpdateUserReq, SRSessionPolicy, SRSvcAccCreate};
use matchit::Params;
use policy::policy::{
Args,
action::{Action, AdminAction},
};
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use rustfs_utils::path::path_join_buf;
use s3s::{
Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result,
header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE},
s3_error,
};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use std::{collections::HashMap, str::from_utf8};
use std::io::{Read as _, Write};
use std::{collections::HashMap, io::Cursor, str::from_utf8};
use tracing::warn;
use zip::{ZipArchive, ZipWriter, result::ZipError, write::SimpleFileOptions};
#[derive(Debug, Deserialize, Default)]
pub struct AddUserQuery {
@@ -340,3 +349,333 @@ impl Operation for GetUserInfo {
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
const ALL_POLICIES_FILE: &str = "policies.json";
const ALL_USERS_FILE: &str = "users.json";
const ALL_GROUPS_FILE: &str = "groups.json";
const ALL_SVC_ACCTS_FILE: &str = "svcaccts.json";
const USER_POLICY_MAPPINGS_FILE: &str = "user_mappings.json";
const GROUP_POLICY_MAPPINGS_FILE: &str = "group_mappings.json";
const STS_USER_POLICY_MAPPINGS_FILE: &str = "stsuser_mappings.json";
const IAM_ASSETS_DIR: &str = "iam-assets";
const IAM_EXPORT_FILES: &[&str] = &[
ALL_POLICIES_FILE,
ALL_USERS_FILE,
ALL_GROUPS_FILE,
ALL_SVC_ACCTS_FILE,
USER_POLICY_MAPPINGS_FILE,
GROUP_POLICY_MAPPINGS_FILE,
STS_USER_POLICY_MAPPINGS_FILE,
];
pub struct ExportIam {}
#[async_trait::async_trait]
impl Operation for ExportIam {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let mut zip_writer = ZipWriter::new(Cursor::new(Vec::new()));
let options = SimpleFileOptions::default();
for &file in IAM_EXPORT_FILES {
let file_path = path_join_buf(&[IAM_ASSETS_DIR, file]);
match file {
ALL_POLICIES_FILE => {
let policies: HashMap<String, policy::policy::Policy> = iam_store
.list_polices("")
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let json_str = serde_json::to_vec(&policies)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
ALL_USERS_FILE => {
let mut users = HashMap::new();
iam_store
.load_users(UserType::Reg, &mut users)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let users: HashMap<String, AddOrUpdateUserReq> = users
.into_iter()
.map(|(k, v)| {
(
k,
AddOrUpdateUserReq {
secret_key: v.credentials.secret_key,
status: {
if v.credentials.status == "off" {
AccountStatus::Disabled
} else {
AccountStatus::Enabled
}
},
policy: None,
},
)
})
.collect::<HashMap<String, AddOrUpdateUserReq>>();
let json_str = serde_json::to_vec(&users)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
ALL_GROUPS_FILE => {
let mut groups = HashMap::new();
iam_store
.load_groups(&mut groups)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let json_str = serde_json::to_vec(&groups)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
ALL_SVC_ACCTS_FILE => {
let mut service_accounts = HashMap::new();
iam_store
.load_users(UserType::Svc, &mut service_accounts)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let mut svc_accts: HashMap<String, SRSvcAccCreate> = HashMap::new();
for (k, acc) in service_accounts {
if k == "siteReplicatorSvcAcc" {
continue;
}
let claims = iam_store
.get_claims_for_svc_acc(&acc.credentials.access_key)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let (sa, police) = iam_store
.get_service_account(&acc.credentials.access_key)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let police_json = if let Some(police) = police {
serde_json::to_string(&police)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?
} else {
"null".to_string()
};
let svc_acc_create_req = SRSvcAccCreate {
parent: acc.credentials.parent_user,
access_key: k.clone(),
secret_key: acc.credentials.secret_key,
groups: acc.credentials.groups.unwrap_or_default(),
claims,
session_policy: SRSessionPolicy::from_json(&police_json).unwrap_or_default(),
status: acc.credentials.status,
name: sa.name.unwrap_or_default(),
description: sa.description.unwrap_or_default(),
expiration: sa.expiration,
api_version: None,
};
svc_accts.insert(k.clone(), svc_acc_create_req);
}
let json_str = serde_json::to_vec(&svc_accts)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
USER_POLICY_MAPPINGS_FILE => {
let mut user_policy_mappings = HashMap::new();
iam_store
.load_mapped_policys(UserType::Reg, false, &mut user_policy_mappings)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let json_str = serde_json::to_vec(&user_policy_mappings)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
GROUP_POLICY_MAPPINGS_FILE => {
let mut group_policy_mappings = HashMap::new();
iam_store
.load_mapped_policys(UserType::Reg, true, &mut group_policy_mappings)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let json_str = serde_json::to_vec(&group_policy_mappings)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
STS_USER_POLICY_MAPPINGS_FILE => {
let mut sts_user_policy_mappings = HashMap::new();
iam_store
.load_mapped_policys(UserType::Sts, false, &mut sts_user_policy_mappings)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let json_str = serde_json::to_vec(&sts_user_policy_mappings)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.start_file(file_path, options)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
zip_writer
.write_all(&json_str)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
_ => continue,
}
}
let zip_bytes = zip_writer
.finish()
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/zip".parse().unwrap());
header.insert(CONTENT_DISPOSITION, "attachment; filename=iam-assets.zip".parse().unwrap());
header.insert(CONTENT_LENGTH, zip_bytes.get_ref().len().to_string().parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(zip_bytes.into_inner())), header))
}
}
pub struct ImportIam {}
#[async_trait::async_trait]
impl Operation for ImportIam {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let mut input = req.input;
let body = match input.store_all_unlimited().await {
Ok(b) => b,
Err(e) => {
warn!("get body failed, e: {:?}", e);
return Err(s3_error!(InvalidRequest, "get body failed"));
}
};
let mut zip_reader =
ZipArchive::new(Cursor::new(body)).map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_POLICIES_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let policies: HashMap<String, policy::policy::Policy> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (name, policy) in policies {
if policy.id.is_empty() {
iam_store
.delete_policy(&name, true)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
continue;
}
iam_store
.set_policy(&name, policy)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
}
}
let Some(sys_cred) = get_global_action_cred() else {
return Err(s3_error!(InvalidRequest, "get sys cred failed"));
};
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_USERS_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let users: HashMap<String, AddOrUpdateUserReq> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (ak, req) in users {
if ak == sys_cred.access_key {
return Err(s3_error!(InvalidArgument, "can't create user with system access key"));
}
if let Some(u) = iam_store.get_user(&ak).await {
if u.credentials.is_temp() || u.credentials.is_service_account() {
return Err(s3_error!(InvalidArgument, "can't create user with system access key"));
}
} else if has_space_be(&ak) {
return Err(s3_error!(InvalidArgument, "has space be"));
}
iam_store
.create_user(&ak, &req)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
}
}
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}

View File

@@ -229,6 +229,18 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
AdminOperation(&AddServiceAccount {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/export-iam").as_str(),
AdminOperation(&user::ExportIam {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/import-iam").as_str(),
AdminOperation(&user::ImportIam {}),
)?;
r.insert(
Method::GET,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),

View File

@@ -100,15 +100,20 @@ impl Config {
serde_json::to_string(self).unwrap_or_default()
}
pub(crate) fn version(&self) -> String {
pub(crate) fn version_info(&self) -> String {
format!(
"RELEASE.{} (rust {} {})",
"RELEASE.{}@{} (rust {} {})",
self.release.date.clone(),
self.release.version.clone().trim_start_matches('@'),
build::RUST_VERSION,
build::BUILD_TARGET
)
}
pub(crate) fn version(&self) -> String {
self.release.version.clone()
}
pub(crate) fn license(&self) -> String {
format!("{} {}", self.license.name.clone(), self.license.url.clone())
}

View File

@@ -10,6 +10,7 @@ mod logging;
mod server;
mod service;
mod storage;
mod update_checker;
use crate::auth::IAMAuth;
use crate::console::{CONSOLE_CONFIG, init_console_cfg};
@@ -94,7 +95,7 @@ fn print_server_info() {
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
info!("License: {}", cfg.license());
info!("Version: {}", cfg.version());
info!("Version: {}", cfg.version_info());
info!("Docs: {}", cfg.doc());
}
@@ -585,10 +586,40 @@ async fn run(opt: config::Opt) -> Result<()> {
print_server_info();
init_bucket_replication_pool().await;
init_console_cfg(local_ip, server_port);
print_server_info();
// Async update check (optional)
tokio::spawn(async {
use crate::update_checker::{UpdateCheckError, check_updates};
match check_updates().await {
Ok(result) => {
if result.update_available {
if let Some(latest) = &result.latest_version {
info!(
"🚀 New version available: {} -> {} (current: {})",
result.current_version, latest.version, result.current_version
);
if let Some(notes) = &latest.release_notes {
info!("📝 Release notes: {}", notes);
}
if let Some(url) = &latest.download_url {
info!("🔗 Download URL: {}", url);
}
}
} else {
debug!("✅ Current version is up to date: {}", result.current_version);
}
}
Err(UpdateCheckError::HttpError(e)) => {
debug!("Version check network error (this is normal): {}", e);
}
Err(e) => {
debug!("Version check failed (this is normal): {}", e);
}
}
});
if opt.console_enable {
debug!("console is enabled");
let access_key = opt.access_key.clone();

View File

@@ -0,0 +1,449 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info, warn};
/// Update check related errors
#[derive(Error, Debug)]
pub enum UpdateCheckError {
#[error("HTTP request failed: {0}")]
HttpError(#[from] reqwest::Error),
#[error("Version parsing failed: {0}")]
VersionParseError(String),
#[error("Invalid version response: {0}")]
InvalidResponse(String),
}
/// Version information structure
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionInfo {
/// Version number
pub version: String,
/// Release date
pub release_date: Option<String>,
/// Release notes
pub release_notes: Option<String>,
/// Download URL
pub download_url: Option<String>,
}
/// Update check result
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateCheckResult {
/// Whether update is available
pub update_available: bool,
/// Current version
pub current_version: String,
/// Latest version information
pub latest_version: Option<VersionInfo>,
/// Check time
pub check_time: chrono::DateTime<chrono::Utc>,
}
/// Version checker
pub struct VersionChecker {
/// HTTP client
client: reqwest::Client,
/// Version server URL
version_url: String,
/// Request timeout
timeout: Duration,
}
impl Default for VersionChecker {
fn default() -> Self {
Self::new()
}
}
impl VersionChecker {
/// Create a new version checker
pub fn new() -> Self {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.user_agent(format!("RustFS/{}", get_current_version()))
.build()
.unwrap_or_else(|_| reqwest::Client::new());
Self {
client,
version_url: "https://version.rustfs.com".to_string(),
timeout: Duration::from_secs(10),
}
}
/// Create version checker with custom configuration
#[allow(dead_code)]
pub fn with_config(url: String, timeout: Duration) -> Self {
let client = reqwest::Client::builder()
.timeout(timeout)
.user_agent(format!("RustFS/{}", get_current_version()))
.build()
.unwrap_or_else(|_| reqwest::Client::new());
Self {
client,
version_url: url,
timeout,
}
}
/// Check for updates
pub async fn check_for_updates(&self) -> Result<UpdateCheckResult, UpdateCheckError> {
let current_version = get_current_version();
debug!("Checking for updates, current version: {}", current_version);
// Send HTTP GET request to get latest version information
let response = self.client.get(&self.version_url).timeout(self.timeout).send().await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
error!("Version check request failed, status code: {}, response: {}", status, error_text);
return Err(UpdateCheckError::InvalidResponse(format!(
"HTTP status code: {}, response: {}",
status, error_text
)));
}
// Parse response
let response_bytes = response.bytes().await?;
let version_info: VersionInfo = match serde_json::from_slice(&response_bytes) {
Ok(v) => v,
Err(e) => {
let error_text = String::from_utf8_lossy(&response_bytes);
error!("Version check request failed, response: {}", e);
return Err(UpdateCheckError::InvalidResponse(format!(
"JSON parsing failed: {}, response: {}",
e, error_text
)));
}
};
debug!("Retrieved latest version information: {:?}", version_info);
// Compare versions
let update_available = self.is_newer_version(&current_version, &version_info.version)?;
let result = UpdateCheckResult {
update_available,
current_version,
latest_version: Some(version_info),
check_time: chrono::Utc::now(),
};
if result.update_available {
info!(
"New version available: {} -> {}",
result.current_version,
result.latest_version.as_ref().unwrap().version
);
} else {
info!("Current version is up to date: {}", result.current_version);
}
Ok(result)
}
/// Compare version numbers to determine if there's an update
fn is_newer_version(&self, current: &str, latest: &str) -> Result<bool, UpdateCheckError> {
// Clean version numbers, remove prefixes like "v", "RELEASE.", etc.
let current_clean = self.clean_version(current);
let latest_clean = self.clean_version(latest);
debug!("Version comparison: current='{}' vs latest='{}'", current_clean, latest_clean);
// If versions are the same, no update is needed
if current_clean == latest_clean {
return Ok(false);
}
// Try semantic version comparison
match self.compare_semantic_versions(&current_clean, &latest_clean) {
Ok(is_newer) => Ok(is_newer),
Err(_) => {
// If semantic version comparison fails, use string comparison
warn!("Semantic version comparison failed, using string comparison");
Ok(latest_clean > current_clean)
}
}
}
/// Clean version string
fn clean_version(&self, version: &str) -> String {
version
.trim()
.trim_start_matches('v')
.trim_start_matches("RELEASE.")
.trim_start_matches('@')
.to_string()
}
/// Semantic version comparison
fn compare_semantic_versions(&self, current: &str, latest: &str) -> Result<bool, UpdateCheckError> {
let current_parts = self.parse_version_parts(current)?;
let latest_parts = self.parse_version_parts(latest)?;
// Compare major version
if latest_parts.0 > current_parts.0 {
return Ok(true);
} else if latest_parts.0 < current_parts.0 {
return Ok(false);
}
// Compare minor version
if latest_parts.1 > current_parts.1 {
return Ok(true);
} else if latest_parts.1 < current_parts.1 {
return Ok(false);
}
// Compare patch version
Ok(latest_parts.2 > current_parts.2)
}
/// Parse version parts (major, minor, patch)
fn parse_version_parts(&self, version: &str) -> Result<(u32, u32, u32), UpdateCheckError> {
let parts: Vec<&str> = version.split('.').collect();
if parts.len() < 3 {
return Err(UpdateCheckError::VersionParseError(format!("Invalid version format: {}", version)));
}
let major = parts[0]
.parse::<u32>()
.map_err(|_| UpdateCheckError::VersionParseError(format!("Cannot parse major version: {}", parts[0])))?;
let minor = parts[1]
.parse::<u32>()
.map_err(|_| UpdateCheckError::VersionParseError(format!("Cannot parse minor version: {}", parts[1])))?;
// Patch version may contain other characters, only take numeric part
let patch_str = parts[2].chars().take_while(|c| c.is_numeric()).collect::<String>();
let patch = patch_str
.parse::<u32>()
.map_err(|_| UpdateCheckError::VersionParseError(format!("Cannot parse patch version: {}", parts[2])))?;
Ok((major, minor, patch))
}
}
/// Get current version number
pub fn get_current_version() -> String {
use crate::console::CONSOLE_CONFIG;
if let Some(config) = CONSOLE_CONFIG.get() {
// Extract version from configuration
let version_str = config.version();
// Extract version part, removing extra information
if let Some(release_part) = version_str.split_whitespace().next() {
if release_part.starts_with("RELEASE.") {
release_part.trim_start_matches("RELEASE.").to_string()
} else {
release_part.to_string()
}
} else {
rustfs_config::VERSION.to_string()
}
} else {
// If configuration is not initialized, use constant version
rustfs_config::VERSION.to_string()
}
}
/// Convenience function for async update checking
pub async fn check_updates() -> Result<UpdateCheckResult, UpdateCheckError> {
let checker = VersionChecker::new();
checker.check_for_updates().await
}
/// Update check with custom URL
#[allow(dead_code)]
pub async fn check_updates_with_url(url: String) -> Result<UpdateCheckResult, UpdateCheckError> {
let checker = VersionChecker::with_config(url, Duration::from_secs(10));
checker.check_for_updates().await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_clean_version() {
let checker = VersionChecker::new();
assert_eq!(checker.clean_version("v1.0.0"), "1.0.0");
assert_eq!(checker.clean_version("RELEASE.1.0.0"), "1.0.0");
assert_eq!(checker.clean_version("@1.0.0"), "1.0.0");
assert_eq!(checker.clean_version("1.0.0"), "1.0.0");
}
#[test]
fn test_parse_version_parts() {
let checker = VersionChecker::new();
assert_eq!(checker.parse_version_parts("1.0.0").unwrap(), (1, 0, 0));
assert_eq!(checker.parse_version_parts("2.1.3").unwrap(), (2, 1, 3));
assert_eq!(checker.parse_version_parts("1.0.0-beta").unwrap(), (1, 0, 0));
}
#[test]
fn test_version_comparison() {
let checker = VersionChecker::new();
// Test semantic version comparison
assert!(checker.is_newer_version("1.0.0", "1.0.1").unwrap());
assert!(checker.is_newer_version("1.0.0", "1.1.0").unwrap());
assert!(checker.is_newer_version("1.0.0", "2.0.0").unwrap());
assert!(!checker.is_newer_version("1.0.1", "1.0.0").unwrap());
assert!(!checker.is_newer_version("1.0.0", "1.0.0").unwrap());
}
#[tokio::test]
async fn test_get_current_version() {
let version = get_current_version();
assert!(!version.is_empty());
println!("Current version: {}", version);
}
#[test]
fn test_update_check_result() {
use chrono::Utc;
// Test creating UpdateCheckResult with update available
let version_info = VersionInfo {
version: "1.2.0".to_string(),
release_date: Some("2024-01-15T10:00:00Z".to_string()),
release_notes: Some("Bug fixes and new features".to_string()),
download_url: Some("https://github.com/rustfs/rustfs/releases/tag/v1.2.0".to_string()),
};
let check_time = Utc::now();
let result = UpdateCheckResult {
update_available: true,
current_version: "1.1.0".to_string(),
latest_version: Some(version_info.clone()),
check_time,
};
println!("result: {:?}", serde_json::to_string(&result).unwrap());
// Test fields
assert!(result.update_available);
assert_eq!(result.current_version, "1.1.0");
assert!(result.latest_version.is_some());
assert_eq!(result.check_time, check_time);
// Test latest version info
if let Some(latest) = &result.latest_version {
assert_eq!(latest.version, "1.2.0");
assert_eq!(latest.release_date, Some("2024-01-15T10:00:00Z".to_string()));
assert_eq!(latest.release_notes, Some("Bug fixes and new features".to_string()));
assert_eq!(
latest.download_url,
Some("https://github.com/rustfs/rustfs/releases/tag/v1.2.0".to_string())
);
}
// Test Clone functionality
let cloned_result = result.clone();
assert_eq!(cloned_result.update_available, result.update_available);
assert_eq!(cloned_result.current_version, result.current_version);
assert_eq!(cloned_result.check_time, result.check_time);
// Test Debug functionality (should not panic)
let debug_output = format!("{:?}", result);
assert!(debug_output.contains("UpdateCheckResult"));
assert!(debug_output.contains("1.1.0"));
assert!(debug_output.contains("1.2.0"));
// Test creating UpdateCheckResult with no update available
let no_update_result = UpdateCheckResult {
update_available: false,
current_version: "1.2.0".to_string(),
latest_version: Some(VersionInfo {
version: "1.2.0".to_string(),
release_date: Some("2024-01-15T10:00:00Z".to_string()),
release_notes: None,
download_url: None,
}),
check_time: Utc::now(),
};
assert!(!no_update_result.update_available);
assert_eq!(no_update_result.current_version, "1.2.0");
// Test creating UpdateCheckResult with None latest_version (error case)
let error_result = UpdateCheckResult {
update_available: false,
current_version: "1.1.0".to_string(),
latest_version: None,
check_time: Utc::now(),
};
assert!(!error_result.update_available);
assert!(error_result.latest_version.is_none());
println!("✅ UpdateCheckResult tests passed");
}
#[test]
fn test_version_info() {
// Test VersionInfo structure
let version_info = VersionInfo {
version: "2.0.0".to_string(),
release_date: Some("2024-02-01T12:00:00Z".to_string()),
release_notes: Some("Major release with breaking changes".to_string()),
download_url: Some("https://github.com/rustfs/rustfs/releases/tag/v2.0.0".to_string()),
};
// Test fields
assert_eq!(version_info.version, "2.0.0");
assert_eq!(version_info.release_date, Some("2024-02-01T12:00:00Z".to_string()));
assert_eq!(version_info.release_notes, Some("Major release with breaking changes".to_string()));
assert_eq!(
version_info.download_url,
Some("https://github.com/rustfs/rustfs/releases/tag/v2.0.0".to_string())
);
// Test Clone functionality
let cloned_info = version_info.clone();
assert_eq!(cloned_info.version, version_info.version);
assert_eq!(cloned_info.release_date, version_info.release_date);
assert_eq!(cloned_info.release_notes, version_info.release_notes);
assert_eq!(cloned_info.download_url, version_info.download_url);
// Test Debug functionality
let debug_output = format!("{:?}", version_info);
assert!(debug_output.contains("VersionInfo"));
assert!(debug_output.contains("2.0.0"));
// Test minimal VersionInfo with only version
let minimal_info = VersionInfo {
version: "1.0.0".to_string(),
release_date: None,
release_notes: None,
download_url: None,
};
assert_eq!(minimal_info.version, "1.0.0");
assert!(minimal_info.release_date.is_none());
assert!(minimal_info.release_notes.is_none());
assert!(minimal_info.download_url.is_none());
// Test JSON serialization/deserialization
let json_string = serde_json::to_string(&version_info).unwrap();
println!("json_string: {}", json_string);
assert!(json_string.contains("2.0.0"));
assert!(json_string.contains("Major release"));
let deserialized: VersionInfo = serde_json::from_str(&json_string).unwrap();
assert_eq!(deserialized.version, version_info.version);
assert_eq!(deserialized.release_notes, version_info.release_notes);
println!("✅ VersionInfo tests passed");
}
}