mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
feat: add one-shot legacy config migration
This commit is contained in:
@@ -18,15 +18,145 @@ use crate::bucket::metadata::BUCKET_METADATA_FILE;
|
||||
use crate::disk::{BUCKET_META_PREFIX, MIGRATING_META_BUCKET, RUSTFS_META_BUCKET};
|
||||
use crate::store_api::{BucketOptions, ObjectOptions, PutObjReader, StorageAPI};
|
||||
use http::HeaderMap;
|
||||
use rustfs_policy::auth::UserIdentity;
|
||||
use rustfs_policy::policy::PolicyDoc;
|
||||
use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// IAM config prefix under meta bucket (e.g. config/iam/).
|
||||
const IAM_CONFIG_PREFIX: &str = "config/iam";
|
||||
const IAM_FORMAT_FILE_PATH: &str = "config/iam/format.json";
|
||||
const IAM_USERS_PREFIX: &str = "config/iam/users/";
|
||||
const IAM_SERVICE_ACCOUNTS_PREFIX: &str = "config/iam/service-accounts/";
|
||||
const IAM_STS_PREFIX: &str = "config/iam/sts/";
|
||||
const IAM_GROUPS_PREFIX: &str = "config/iam/groups/";
|
||||
const IAM_POLICIES_PREFIX: &str = "config/iam/policies/";
|
||||
const IAM_POLICY_DB_PREFIX: &str = "config/iam/policydb/";
|
||||
const REPLICATION_META_DIR: &str = ".replication";
|
||||
const RESYNC_META_FILE: &str = "resync.bin";
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct CompatIamFormat {
|
||||
#[serde(default)]
|
||||
version: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct CompatGroupInfo {
|
||||
#[serde(default)]
|
||||
version: i64,
|
||||
#[serde(default = "default_group_status")]
|
||||
status: String,
|
||||
#[serde(default)]
|
||||
members: Vec<String>,
|
||||
#[serde(
|
||||
rename = "updatedAt",
|
||||
alias = "update_at",
|
||||
default,
|
||||
with = "rustfs_policy::serde_datetime::option"
|
||||
)]
|
||||
update_at: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct CompatMappedPolicy {
|
||||
#[serde(default)]
|
||||
version: i64,
|
||||
#[serde(rename = "policy", alias = "policies", default)]
|
||||
policy: String,
|
||||
#[serde(
|
||||
rename = "updatedAt",
|
||||
alias = "update_at",
|
||||
default,
|
||||
with = "rustfs_policy::serde_datetime::option"
|
||||
)]
|
||||
update_at: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
fn default_group_status() -> String {
|
||||
"enabled".to_string()
|
||||
}
|
||||
|
||||
fn normalize_iam_config_blob(path: &str, data: &[u8]) -> std::result::Result<Option<Vec<u8>>, String> {
|
||||
if path == IAM_FORMAT_FILE_PATH {
|
||||
let mut format: CompatIamFormat =
|
||||
serde_json::from_slice(data).map_err(|err| format!("parse IAM format failed: {err}"))?;
|
||||
if format.version <= 0 {
|
||||
format.version = 1;
|
||||
}
|
||||
return serde_json::to_vec(&format)
|
||||
.map(Some)
|
||||
.map_err(|err| format!("serialize IAM format failed: {err}"));
|
||||
}
|
||||
|
||||
if is_identity_path(path) {
|
||||
let mut identity: UserIdentity =
|
||||
serde_json::from_slice(data).map_err(|err| format!("parse IAM identity failed: {err}"))?;
|
||||
if identity.update_at.is_none() {
|
||||
identity.update_at = Some(OffsetDateTime::now_utc());
|
||||
}
|
||||
return serde_json::to_vec(&identity)
|
||||
.map(Some)
|
||||
.map_err(|err| format!("serialize IAM identity failed: {err}"));
|
||||
}
|
||||
|
||||
if is_group_path(path) {
|
||||
let mut group: CompatGroupInfo = serde_json::from_slice(data).map_err(|err| format!("parse IAM group failed: {err}"))?;
|
||||
if group.update_at.is_none() {
|
||||
group.update_at = Some(OffsetDateTime::now_utc());
|
||||
}
|
||||
return serde_json::to_vec(&group)
|
||||
.map(Some)
|
||||
.map_err(|err| format!("serialize IAM group failed: {err}"));
|
||||
}
|
||||
|
||||
if is_policy_doc_path(path) {
|
||||
let mut doc = PolicyDoc::try_from(data.to_vec()).map_err(|err| format!("parse IAM policy doc failed: {err}"))?;
|
||||
if doc.create_date.is_none() {
|
||||
doc.create_date = doc.update_date;
|
||||
}
|
||||
if doc.update_date.is_none() {
|
||||
doc.update_date = doc.create_date;
|
||||
}
|
||||
return serde_json::to_vec(&doc)
|
||||
.map(Some)
|
||||
.map_err(|err| format!("serialize IAM policy doc failed: {err}"));
|
||||
}
|
||||
|
||||
if is_policy_mapping_path(path) {
|
||||
let mut mapped: CompatMappedPolicy =
|
||||
serde_json::from_slice(data).map_err(|err| format!("parse IAM policy mapping failed: {err}"))?;
|
||||
if mapped.update_at.is_none() {
|
||||
mapped.update_at = Some(OffsetDateTime::now_utc());
|
||||
}
|
||||
return serde_json::to_vec(&mapped)
|
||||
.map(Some)
|
||||
.map_err(|err| format!("serialize IAM policy mapping failed: {err}"));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn is_identity_path(path: &str) -> bool {
|
||||
(path.starts_with(IAM_USERS_PREFIX) || path.starts_with(IAM_SERVICE_ACCOUNTS_PREFIX) || path.starts_with(IAM_STS_PREFIX))
|
||||
&& path.ends_with("/identity.json")
|
||||
}
|
||||
|
||||
fn is_group_path(path: &str) -> bool {
|
||||
path.starts_with(IAM_GROUPS_PREFIX) && path.ends_with("/members.json")
|
||||
}
|
||||
|
||||
fn is_policy_doc_path(path: &str) -> bool {
|
||||
path.starts_with(IAM_POLICIES_PREFIX) && path.ends_with("/policy.json")
|
||||
}
|
||||
|
||||
fn is_policy_mapping_path(path: &str) -> bool {
|
||||
path.starts_with(IAM_POLICY_DB_PREFIX) && path.ends_with(".json")
|
||||
}
|
||||
|
||||
/// Migrates bucket metadata from legacy format to RustFS.
|
||||
/// Uses list_bucket (from disk volumes) to get bucket names, since list_objects_v2 on the legacy
|
||||
/// meta bucket may not work (legacy format differs from object layer expectations).
|
||||
@@ -178,6 +308,17 @@ pub async fn try_migrate_iam_config<S: StorageAPI>(store: Arc<S>) {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let data = match normalize_iam_config_blob(path, &data) {
|
||||
Ok(Some(normalized)) => normalized,
|
||||
Ok(None) => {
|
||||
debug!("skip unsupported IAM config path during migration: {path}");
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("skip IAM config migration due to incompatible format, path: {path}, err: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(e) = store
|
||||
.put_object(RUSTFS_META_BUCKET, path, &mut PutObjReader::from_vec(data), &opts)
|
||||
.await
|
||||
@@ -199,3 +340,28 @@ pub async fn try_migrate_iam_config<S: StorageAPI>(store: Arc<S>) {
|
||||
info!("IAM migration complete: {} object(s) migrated", total_migrated);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::normalize_iam_config_blob;
|
||||
|
||||
#[test]
|
||||
fn test_normalize_policy_mapping_legacy_timestamp_and_fields() {
|
||||
let path = "config/iam/policydb/users/alice.json";
|
||||
let input = r#"{"version":1,"policies":"readwrite","update_at":"2026-03-09 02:22:44.998954 +00:00:00"}"#;
|
||||
|
||||
let output = normalize_iam_config_blob(path, input.as_bytes())
|
||||
.expect("normalize should succeed")
|
||||
.expect("path should be supported");
|
||||
|
||||
let v: serde_json::Value = serde_json::from_slice(&output).expect("output should be valid JSON");
|
||||
assert_eq!(v.get("policy").and_then(|x| x.as_str()), Some("readwrite"));
|
||||
assert!(v.get("policies").is_none(), "legacy field should be normalized");
|
||||
|
||||
let updated_at = v
|
||||
.get("updatedAt")
|
||||
.and_then(|x| x.as_str())
|
||||
.expect("updatedAt should exist as string");
|
||||
assert!(updated_at.contains('T'), "updatedAt should be RFC3339-like");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::config::{Config, GLOBAL_STORAGE_CLASS, storageclass};
|
||||
use crate::disk::RUSTFS_META_BUCKET;
|
||||
use crate::disk::{MIGRATING_META_BUCKET, RUSTFS_META_BUCKET};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};
|
||||
use http::HeaderMap;
|
||||
@@ -22,7 +22,7 @@ use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::LazyLock;
|
||||
use tracing::{error, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
pub const CONFIG_PREFIX: &str = "config";
|
||||
const CONFIG_FILE: &str = "config.json";
|
||||
@@ -132,6 +132,81 @@ fn get_config_file() -> String {
|
||||
format!("{CONFIG_PREFIX}{SLASH_SEPARATOR}{CONFIG_FILE}")
|
||||
}
|
||||
|
||||
fn normalize_server_config_blob(data: &[u8]) -> Result<Vec<u8>> {
|
||||
let cfg = Config::unmarshal(data)?;
|
||||
cfg.marshal()
|
||||
}
|
||||
|
||||
fn is_object_not_found(err: &Error) -> bool {
|
||||
*err == Error::FileNotFound || matches!(err, Error::ObjectNotFound(_, _))
|
||||
}
|
||||
|
||||
pub async fn try_migrate_server_config<S: StorageAPI>(api: Arc<S>) {
|
||||
let config_file = get_config_file();
|
||||
match api
|
||||
.get_object_info(RUSTFS_META_BUCKET, &config_file, &ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
debug!("server config already exists in RustFS metadata bucket, skip migration");
|
||||
return;
|
||||
}
|
||||
Err(err) if is_object_not_found(&err) => {}
|
||||
Err(err) => {
|
||||
warn!("check target server config failed, skip migration: {:?}", err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let opts = ObjectOptions {
|
||||
max_parity: true,
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut rd = match api
|
||||
.get_object_reader(MIGRATING_META_BUCKET, &config_file, None, HeaderMap::new(), &opts)
|
||||
.await
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
if !is_object_not_found(&err) {
|
||||
warn!("read legacy server config failed: {:?}", err);
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let data = match rd.read_all().await {
|
||||
Ok(v) if !v.is_empty() => v,
|
||||
Ok(_) => {
|
||||
debug!("legacy server config is empty, skip migration");
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("read legacy server config body failed: {:?}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let normalized = match normalize_server_config_blob(&data) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
warn!("legacy server config format is incompatible, skip migration: {:?}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match save_config(api, &config_file, normalized).await {
|
||||
Ok(()) => {
|
||||
info!("Migrated compatible server config from legacy metadata bucket");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("write migrated server config failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the situation where the configuration file does not exist, create and save a new configuration
|
||||
async fn handle_missing_config<S: StorageAPI>(api: Arc<S>, context: &str) -> Result<Config> {
|
||||
warn!("Configuration not found ({}): Start initializing new configuration", context);
|
||||
@@ -228,3 +303,27 @@ async fn apply_dynamic_config_for_sub_sys<S: StorageAPI>(cfg: &mut Config, api:
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::normalize_server_config_blob;
|
||||
use crate::config::Config;
|
||||
|
||||
#[test]
|
||||
fn test_normalize_server_config_accepts_legacy_hidden_if_empty_alias() {
|
||||
let input = r#"{"storage_class":{"_":[{"key":"standard","value":"EC:2","hiddenIfEmpty":true}]}}"#;
|
||||
let normalized = normalize_server_config_blob(input.as_bytes()).expect("normalize should succeed");
|
||||
let cfg = Config::unmarshal(&normalized).expect("normalized config should be readable");
|
||||
let kvs = cfg.get_value("storage_class", "_").expect("storage_class should exist");
|
||||
assert!(kvs.0[0].hidden_if_empty);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_normalize_server_config_accepts_missing_hidden_if_empty() {
|
||||
let input = r#"{"storage_class":{"_":[{"key":"standard","value":"EC:2"}]}}"#;
|
||||
let normalized = normalize_server_config_blob(input.as_bytes()).expect("normalize should succeed");
|
||||
let cfg = Config::unmarshal(&normalized).expect("normalized config should be readable");
|
||||
let kvs = cfg.get_value("storage_class", "_").expect("storage_class should exist");
|
||||
assert!(!kvs.0[0].hidden_if_empty);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,10 +75,15 @@ pub async fn init_global_config_sys(api: Arc<ECStore>) -> Result<()> {
|
||||
GLOBAL_CONFIG_SYS.init(api).await
|
||||
}
|
||||
|
||||
pub async fn try_migrate_server_config(api: Arc<ECStore>) {
|
||||
com::try_migrate_server_config(api).await
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub struct KV {
|
||||
pub key: String,
|
||||
pub value: String,
|
||||
#[serde(default, alias = "hiddenIfEmpty")]
|
||||
pub hidden_if_empty: bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -332,6 +332,7 @@ async fn run(config: config::Config) -> Result<()> {
|
||||
})?;
|
||||
|
||||
ecconfig::init();
|
||||
ecconfig::try_migrate_server_config(store.clone()).await;
|
||||
|
||||
// // Initialize global configuration system
|
||||
let mut retry_count = 0;
|
||||
|
||||
Reference in New Issue
Block a user