From 25bfd1bbc4e8936ed236b32fc81aa2d548192016 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 1 Oct 2024 23:06:09 +0800 Subject: [PATCH] init bucketmetadata_sys --- ecstore/src/bucket/error.rs | 5 + ecstore/src/bucket/metadata.rs | 76 +++++++++-- ecstore/src/bucket/metadata_sys.rs | 138 ++++++++++++++++++-- ecstore/src/bucket/mod.rs | 2 + ecstore/src/bucket/utils.rs | 5 + ecstore/src/config/error.rs | 10 ++ ecstore/src/endpoints.rs | 7 +- ecstore/src/global.rs | 62 +++++++++ ecstore/src/lib.rs | 4 + ecstore/src/sets.rs | 6 +- ecstore/src/store.rs | 198 ++++++++++++----------------- rustfs/src/main.rs | 4 +- rustfs/src/storage/ecfs.rs | 2 +- 13 files changed, 372 insertions(+), 147 deletions(-) create mode 100644 ecstore/src/bucket/error.rs create mode 100644 ecstore/src/bucket/utils.rs create mode 100644 ecstore/src/global.rs diff --git a/ecstore/src/bucket/error.rs b/ecstore/src/bucket/error.rs new file mode 100644 index 00000000..781f7f11 --- /dev/null +++ b/ecstore/src/bucket/error.rs @@ -0,0 +1,5 @@ +#[derive(Debug, thiserror::Error)] +pub enum BucketMetadataError { + #[error("tagging not found")] + TaggingNotFound, +} diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index 251ff304..6d53e1b4 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -13,18 +13,30 @@ use time::OffsetDateTime; use tracing::error; use crate::bucket::tags; +use crate::config; use crate::config::common::{read_config, save_config}; -use crate::config::error::ConfigError; use crate::error::{Error, Result}; use crate::disk::BUCKET_META_PREFIX; use crate::store::ECStore; -use crate::store_api::StorageAPI; + +type TypeConfigFile = &'static str; pub const BUCKET_METADATA_FILE: &str = ".metadata.bin"; pub const BUCKET_METADATA_FORMAT: u16 = 1; pub const BUCKET_METADATA_VERSION: u16 = 1; +pub const BUCKET_POLICY_CONFIG: &str = "policy.json"; +pub const BUCKET_NOTIFICATION_CONFIG: &str = "notification.xml"; +pub const BUCKET_LIFECYCLE_CONFIG: &str = "lifecycle.xml"; +pub const BUCKET_SSECONFIG: &str = "bucket-encryption.xml"; +pub const BUCKET_TAGGING_CONFIG: &str = "tagging.xml"; +pub const BUCKET_QUOTA_CONFIG_FILE: &str = "quota.json"; +pub const OBJECT_LOCK_CONFIG: &str = "object-lock.xml"; +pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml"; +pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml"; +pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json"; + #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "PascalCase", default)] pub struct BucketMetadata { @@ -170,7 +182,57 @@ impl BucketMetadata { } } - async fn save(&mut self, api: &ECStore) -> Result<()> { + pub fn update_config(&mut self, config_file: &str, data: Vec) -> Result { + let updated = OffsetDateTime::now_utc(); + + match config_file { + BUCKET_POLICY_CONFIG => { + self.policy_config_json = data; + self.policy_config_updated_at = updated; + } + BUCKET_NOTIFICATION_CONFIG => { + self.notification_config_xml = data; + self.notification_config_updated_at = updated; + } + BUCKET_LIFECYCLE_CONFIG => { + self.lifecycle_config_xml = data; + self.lifecycle_config_updated_at = updated; + } + BUCKET_SSECONFIG => { + self.encryption_config_xml = data; + self.encryption_config_updated_at = updated; + } + BUCKET_TAGGING_CONFIG => { + self.tagging_config_xml = data; + self.tagging_config_updated_at = updated; + } + BUCKET_QUOTA_CONFIG_FILE => { + self.quota_config_json = data; + self.quota_config_updated_at = updated; + } + OBJECT_LOCK_CONFIG => { + self.object_lock_config_xml = data; + self.object_lock_config_updated_at = updated; + } + BUCKET_VERSIONING_CONFIG => { + self.versioning_config_xml = data; + self.versioning_config_updated_at = updated; + } + BUCKET_REPLICATION_CONFIG => { + self.replication_config_xml = data; + self.replication_config_updated_at = updated; + } + BUCKET_TARGETS_FILE => { + self.tagging_config_xml = data; + self.tagging_config_updated_at = updated; + } + _ => return Err(Error::msg(format!("config file not found : {}", config_file))), + } + + Ok(updated) + } + + pub async fn save(&mut self, api: &ECStore) -> Result<()> { self.parse_all_configs(api)?; let mut buf: Vec = vec![0; 4]; @@ -201,14 +263,12 @@ pub async fn load_bucket_metadata(api: &ECStore, bucket: &str) -> Result Result { +pub async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result { let mut bm = match read_bucket_metadata(api, bucket).await { Ok(res) => res, Err(err) => { - if let Some(e) = err.downcast_ref::() { - if !ConfigError::is_not_found(&e) { - return Err(err); - } + if !config::error::is_not_found(&err) { + return Err(err); } BucketMetadata::new(bucket) diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 92d24a4e..a43eda5d 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -1,7 +1,15 @@ use std::{collections::HashMap, sync::Arc}; +use crate::bucket::error::BucketMetadataError; +use crate::bucket::metadata::load_bucket_metadata_parse; +use crate::bucket::utils::is_meta_bucketname; +use crate::config; +use crate::config::error::ConfigError; +use crate::disk::error::DiskError; use crate::error::{Error, Result}; +use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn}; use crate::store::ECStore; +use futures::future::join_all; use lazy_static::lazy_static; use time::OffsetDateTime; use tokio::sync::RwLock; @@ -10,41 +18,132 @@ use super::metadata::{load_bucket_metadata, BucketMetadata}; use super::tags; lazy_static! { - pub static ref GLOBAL_BucketMetadataSys: Arc> = Arc::new(Some(BucketMetadataSys::new())); + static ref GLOBAL_BucketMetadataSys: Arc = Arc::new(BucketMetadataSys::new()); } -pub fn get_bucket_metadata_sys() -> Arc> { +pub fn get_bucket_metadata_sys() -> Arc { GLOBAL_BucketMetadataSys.clone() } #[derive(Debug, Default)] pub struct BucketMetadataSys { metadata_map: RwLock>, - api: Option>, + api: Option, initialized: RwLock, } impl BucketMetadataSys { fn new() -> Self { - Self { ..Default::default() } + Self::default() } - pub fn init(&mut self, api: Arc, buckets: Vec) { - self.api = Some(api); + pub async fn init(&mut self, api: Option, buckets: Vec<&str>) -> Result<()> { + if api.is_none() { + return Err(Error::msg("errServerNotInitialized")); + } + self.api = api; + let _ = self.init_internal(buckets).await; + + Ok(()) + } + async fn init_internal(&self, buckets: Vec<&str>) -> Result<()> { + if self.api.is_none() { + return Err(Error::msg("errServerNotInitialized")); + } + let mut futures = Vec::new(); + let mut errs = Vec::new(); + let mut ress = Vec::new(); + + for &bucket in buckets.iter() { + futures.push(load_bucket_metadata(self.api.as_ref().unwrap(), bucket)); + } + + let results = join_all(futures).await; + + for res in results { + match res { + Ok(entrys) => { + ress.push(Some(entrys)); + errs.push(None); + } + Err(e) => { + ress.push(None); + errs.push(Some(e)); + } + } + } unimplemented!() } + async fn concurrent_load(&self, buckets: Vec<&str>) -> Result> { + unimplemented!() + } + + pub async fn get(&self, bucket: &str) -> Result { + if is_meta_bucketname(bucket) { + return Err(Error::new(ConfigError::NotFound)); + } + + let map = self.metadata_map.read().await; + if let Some(bm) = map.get(bucket) { + Ok(bm.clone()) + } else { + Err(Error::new(ConfigError::NotFound)) + } + } + + pub async fn set(&self, bucket: &str, bm: BucketMetadata) { + if !is_meta_bucketname(bucket) { + let mut map = self.metadata_map.write().await; + map.insert(bucket.to_string(), bm); + } + } + async fn reset(&mut self) { let mut map = self.metadata_map.write().await; map.clear(); } - pub async fn get_config(&self, bucket: String) -> Result<(BucketMetadata, bool)> { + pub async fn update(&mut self, bucket: &str, config_file: &str, data: Vec) -> Result<(OffsetDateTime)> { + self.update_and_parse(bucket, config_file, data, true).await + } + + async fn update_and_parse(&mut self, bucket: &str, config_file: &str, data: Vec, parse: bool) -> Result { + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(Error::msg("errServerNotInitialized")), + }; + + if is_meta_bucketname(&bucket) { + return Err(Error::msg("errInvalidArgument")); + } + + let mut bm = match load_bucket_metadata_parse(store, &bucket, parse).await { + Ok(res) => res, + Err(err) => { + if !is_erasure().await && !is_dist_erasure().await && DiskError::VolumeNotFound.is(&err) { + BucketMetadata::new(&bucket) + } else { + return Err(err); + } + } + }; + + let updated = bm.update_config(config_file, data)?; + + bm.save(store).await?; + + Ok(updated) + } + + pub async fn get_config(&self, bucket: &str) -> Result<(BucketMetadata, bool)> { if let Some(api) = self.api.as_ref() { let has_bm = { let map = self.metadata_map.read().await; - if let Some(bm) = map.get(&bucket) { + if let Some(bm) = map.get(&bucket.to_string()) { Some(bm.clone()) } else { None @@ -54,7 +153,7 @@ impl BucketMetadataSys { if let Some(bm) = has_bm { return Ok((bm, false)); } else { - let bm = match load_bucket_metadata(&api, bucket.as_str()).await { + let bm = match load_bucket_metadata(&api, bucket).await { Ok(res) => res, Err(err) => { if *self.initialized.read().await { @@ -67,7 +166,7 @@ impl BucketMetadataSys { let mut map = self.metadata_map.write().await; - map.insert(bucket, bm.clone()); + map.insert(bucket.to_string(), bm.clone()); Ok((bm, true)) } @@ -76,7 +175,22 @@ impl BucketMetadataSys { } } - pub async fn get_tagging_config(&self, bucket: String) -> Result<(tags::Tags, Option)> { - unimplemented!() + pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::TaggingNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.tagging_config { + Ok((config, bm.tagging_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::TaggingNotFound)) + } } } diff --git a/ecstore/src/bucket/mod.rs b/ecstore/src/bucket/mod.rs index 48d3c5f5..b2df60cc 100644 --- a/ecstore/src/bucket/mod.rs +++ b/ecstore/src/bucket/mod.rs @@ -1,4 +1,5 @@ mod encryption; +mod error; mod event; mod lifecycle; pub mod metadata; @@ -10,5 +11,6 @@ mod replication; mod tags; mod target; mod versioning; +pub mod utils; pub use metadata_sys::get_bucket_metadata_sys; diff --git a/ecstore/src/bucket/utils.rs b/ecstore/src/bucket/utils.rs new file mode 100644 index 00000000..a8cb97b6 --- /dev/null +++ b/ecstore/src/bucket/utils.rs @@ -0,0 +1,5 @@ +use crate::disk::RUSTFS_META_BUCKET; + +pub fn is_meta_bucketname(name: &str) -> bool { + name.starts_with(RUSTFS_META_BUCKET) +} diff --git a/ecstore/src/config/error.rs b/ecstore/src/config/error.rs index 490b8c50..1628f31d 100644 --- a/ecstore/src/config/error.rs +++ b/ecstore/src/config/error.rs @@ -1,3 +1,5 @@ +use crate::error::Error; + #[derive(Debug, thiserror::Error)] pub enum ConfigError { #[error("config not found")] @@ -13,3 +15,11 @@ impl ConfigError { matches!(self, Self::NotFound) } } + +pub fn is_not_found(err: &Error) -> bool { + if let Some(e) = err.downcast_ref::() { + ConfigError::is_not_found(&e) + } else { + false + } +} diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 9af5b963..ad533140 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -10,7 +10,7 @@ use std::{ }; /// enum for setup type. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum SetupType { /// starts with unknown setup type. Unknown, @@ -111,6 +111,7 @@ impl Endpoints { } } +#[derive(Debug)] /// a temporary type to holds the list of endpoints struct PoolEndpointList { inner: Vec, @@ -439,6 +440,10 @@ impl EndpointServerPools { Ok((ret, pool_eps.setup_type)) } + pub fn es_count(&self) -> usize { + self.0.iter().map(|v| v.set_count).count() + } + /// add pool endpoints pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> { let mut exits = HashSet::new(); diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs new file mode 100644 index 00000000..c7ac7b00 --- /dev/null +++ b/ecstore/src/global.rs @@ -0,0 +1,62 @@ +use crate::error::Result; +use lazy_static::lazy_static; +use std::{collections::HashMap, sync::Arc}; +use tokio::{fs, sync::RwLock}; + +use crate::{ + disk::{new_disk, DiskOption, DiskStore}, + endpoints::{EndpointServerPools, SetupType}, + store::ECStore, +}; + +lazy_static! { + pub static ref GLOBAL_OBJECT_API: Arc>> = Arc::new(RwLock::new(None)); + pub static ref GLOBAL_LOCAL_DISK: Arc>>> = Arc::new(RwLock::new(Vec::new())); +} + +pub fn new_object_layer_fn() -> Arc>> { + GLOBAL_OBJECT_API.clone() +} + +pub async fn set_object_layer(o: ECStore) { + let mut global_object_api = GLOBAL_OBJECT_API.write().await; + *global_object_api = Some(o); +} + +lazy_static! { + static ref GLOBAL_IsErasure: RwLock = RwLock::new(false); + static ref GLOBAL_IsDistErasure: RwLock = RwLock::new(false); + static ref GLOBAL_IsErasureSD: RwLock = RwLock::new(false); +} + +pub async fn is_dist_erasure() -> bool { + let lock = GLOBAL_IsDistErasure.read().await; + *lock == true +} + +pub async fn is_erasure() -> bool { + let lock = GLOBAL_IsErasure.read().await; + *lock == true +} + +pub async fn update_erasure_type(setup_type: SetupType) { + let mut is_erasure = GLOBAL_IsErasure.write().await; + *is_erasure = setup_type == SetupType::Erasure; + + let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await; + *is_dist_erasure = setup_type == SetupType::DistErasure; + + if *is_dist_erasure { + *is_erasure = true + } + + let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await; + *is_erasure_sd = setup_type == SetupType::ErasureSD; +} + +type TypeLocalDiskSetDrives = Vec>>>; + +lazy_static! { + pub static ref GLOBAL_LOCAL_DISK_MAP: Arc>>> = Arc::new(RwLock::new(HashMap::new())); + pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); +} diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index d5881427..4210db4c 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -7,6 +7,7 @@ pub mod endpoints; pub mod erasure; pub mod error; mod file_meta; +mod global; pub mod peer; mod quorum; pub mod set_disk; @@ -18,3 +19,6 @@ mod store_init; mod utils; pub mod bucket; + +pub use global::new_object_layer_fn; +pub use global::update_erasure_type; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index f43564b6..e3236fe0 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -15,8 +15,8 @@ use crate::{ }, endpoints::PoolEndpoints, error::{Error, Result}, + global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, set_disk::SetDisks, - store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, store_api::{ BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, @@ -94,7 +94,7 @@ impl Sets { continue; } - if disk.as_ref().unwrap().is_local() && *GLOBAL_IsDistErasure.read().await { + if disk.as_ref().unwrap().is_local() && is_dist_erasure().await { let local_disk = { let local_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.read().await; local_set_drives[pool_idx][i][j].clone() @@ -124,7 +124,7 @@ impl Sets { let set_disks = SetDisks { lockers: lockers[i].clone(), locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(), - ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))), + ns_mutex: Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))), disks: RwLock::new(set_drive), set_drive_count, default_parity_count: partiy_count, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index f53a8202..0e563ada 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,5 +1,6 @@ #![allow(clippy::map_entry)] use crate::disk::endpoint::EndpointType; +use crate::global::{is_dist_erasure, set_object_layer, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES}; use crate::store_api::ObjectIO; use crate::{ bucket::metadata::BucketMetadata, @@ -27,129 +28,12 @@ use std::{ time::Duration, }; use time::OffsetDateTime; +use tokio::fs; use tokio::sync::Semaphore; -use tokio::{fs, sync::RwLock}; + use tracing::{debug, info}; use uuid::Uuid; -use lazy_static::lazy_static; - -lazy_static! { - pub static ref GLOBAL_IsErasure: RwLock = RwLock::new(false); - pub static ref GLOBAL_IsDistErasure: RwLock = RwLock::new(false); - pub static ref GLOBAL_IsErasureSD: RwLock = RwLock::new(false); -} - -pub async fn update_erasure_type(setup_type: SetupType) { - let mut is_erasure = GLOBAL_IsErasure.write().await; - *is_erasure = setup_type == SetupType::Erasure; - - let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await; - *is_dist_erasure = setup_type == SetupType::DistErasure; - - if *is_dist_erasure { - *is_erasure = true - } - - let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await; - *is_erasure_sd = setup_type == SetupType::ErasureSD; -} - -type TypeLocalDiskSetDrives = Vec>>>; - -lazy_static! { - pub static ref GLOBAL_LOCAL_DISK_MAP: Arc>>> = Arc::new(RwLock::new(HashMap::new())); - pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); -} - -pub async fn find_local_disk(disk_path: &String) -> Option { - let disk_path = match fs::canonicalize(disk_path).await { - Ok(disk_path) => disk_path, - Err(_) => return None, - }; - - let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - - let path = disk_path.to_string_lossy().to_string(); - if disk_map.contains_key(&path) { - let a = disk_map[&path].as_ref().cloned(); - - return a; - } - None -} - -pub async fn all_local_disk_path() -> Vec { - let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - disk_map.keys().cloned().collect() -} - -pub async fn all_local_disk() -> Vec { - let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - disk_map - .values() - .filter(|v| v.is_some()) - .map(|v| v.as_ref().unwrap().clone()) - .collect() -} - -// init_local_disks 初始化本地磁盘,server启动前必须初始化成功 -pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> { - let opt = &DiskOption { - cleanup: true, - health_check: true, - }; - - let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await; - for pool_eps in endpoint_pools.as_ref().iter() { - let mut set_count_drives = Vec::with_capacity(pool_eps.set_count); - for _ in 0..pool_eps.set_count { - set_count_drives.push(vec![None; pool_eps.drives_per_set]); - } - - global_set_drives.push(set_count_drives); - } - - let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await; - - for pool_eps in endpoint_pools.as_ref().iter() { - let mut set_drives = HashMap::new(); - for ep in pool_eps.endpoints.as_ref().iter() { - if !ep.is_local { - continue; - } - - let disk = new_disk(ep, opt).await?; - - let path = disk.path().to_string_lossy().to_string(); - - global_local_disk_map.insert(path, Some(disk.clone())); - - set_drives.insert(ep.disk_idx, Some(disk.clone())); - - if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() { - global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone()); - } - } - } - - Ok(()) -} - -lazy_static! { - pub static ref GLOBAL_OBJECT_API: Arc>> = Arc::new(RwLock::new(None)); - pub static ref GLOBAL_LOCAL_DISK: Arc>>> = Arc::new(RwLock::new(Vec::new())); -} - -pub fn new_object_layer_fn() -> Arc>> { - GLOBAL_OBJECT_API.clone() -} - -async fn set_object_layer(o: ECStore) { - let mut global_object_api = GLOBAL_OBJECT_API.write().await; - *global_object_api = Some(o); -} - #[derive(Debug)] pub struct ECStore { pub id: uuid::Uuid, @@ -244,7 +128,7 @@ impl ECStore { } // 替换本地磁盘 - if !*GLOBAL_IsDistErasure.read().await { + if !is_dist_erasure().await { let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await; for disk in local_disks { let path = disk.path().to_string_lossy().to_string(); @@ -403,6 +287,80 @@ impl ECStore { } } +pub async fn find_local_disk(disk_path: &String) -> Option { + let disk_path = match fs::canonicalize(disk_path).await { + Ok(disk_path) => disk_path, + Err(_) => return None, + }; + + let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; + + let path = disk_path.to_string_lossy().to_string(); + if disk_map.contains_key(&path) { + let a = disk_map[&path].as_ref().cloned(); + + return a; + } + None +} + +pub async fn all_local_disk_path() -> Vec { + let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; + disk_map.keys().cloned().collect() +} + +pub async fn all_local_disk() -> Vec { + let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; + disk_map + .values() + .filter(|v| v.is_some()) + .map(|v| v.as_ref().unwrap().clone()) + .collect() +} + +// init_local_disks 初始化本地磁盘,server启动前必须初始化成功 +pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> { + let opt = &DiskOption { + cleanup: true, + health_check: true, + }; + + let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await; + for pool_eps in endpoint_pools.as_ref().iter() { + let mut set_count_drives = Vec::with_capacity(pool_eps.set_count); + for _ in 0..pool_eps.set_count { + set_count_drives.push(vec![None; pool_eps.drives_per_set]); + } + + global_set_drives.push(set_count_drives); + } + + let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await; + + for pool_eps in endpoint_pools.as_ref().iter() { + let mut set_drives = HashMap::new(); + for ep in pool_eps.endpoints.as_ref().iter() { + if !ep.is_local { + continue; + } + + let disk = new_disk(ep, opt).await?; + + let path = disk.path().to_string_lossy().to_string(); + + global_local_disk_map.insert(path, Some(disk.clone())); + + set_drives.insert(ep.disk_idx, Some(disk.clone())); + + if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() { + global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone()); + } + } + } + + Ok(()) +} + async fn internal_get_pool_info_existing_with_opts( pools: &[Arc], bucket: &str, diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 0c32c263..802832a8 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -7,7 +7,8 @@ use clap::Parser; use common::error::{Error, Result}; use ecstore::{ endpoints::EndpointServerPools, - store::{init_local_disks, update_erasure_type, ECStore}, + store::{init_local_disks, ECStore}, + update_erasure_type, }; use grpc::make_server; use hyper_util::{ @@ -91,7 +92,6 @@ async fn run(opt: config::Opt) -> Result<()> { .map_err(|err| Error::from_string(err.to_string()))?; update_erasure_type(setup_type).await; - // 初始化本地磁盘 init_local_disks(endpoint_pools.clone()) .await diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 2ab8aedf..2c9a0325 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -4,7 +4,7 @@ use ecstore::bucket::get_bucket_metadata_sys; use ecstore::bucket_meta::BucketMetadata; use ecstore::disk::error::DiskError; use ecstore::disk::RUSTFS_META_BUCKET; -use ecstore::store::new_object_layer_fn; +use ecstore::new_object_layer_fn; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; use ecstore::store_api::DeleteBucketOptions;