mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
init bucketmetadata_sys
This commit is contained in:
5
ecstore/src/bucket/error.rs
Normal file
5
ecstore/src/bucket/error.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BucketMetadataError {
|
||||
#[error("tagging not found")]
|
||||
TaggingNotFound,
|
||||
}
|
||||
@@ -13,18 +13,30 @@ use time::OffsetDateTime;
|
||||
use tracing::error;
|
||||
|
||||
use crate::bucket::tags;
|
||||
use crate::config;
|
||||
use crate::config::common::{read_config, save_config};
|
||||
use crate::config::error::ConfigError;
|
||||
use crate::error::{Error, Result};
|
||||
|
||||
use crate::disk::BUCKET_META_PREFIX;
|
||||
use crate::store::ECStore;
|
||||
use crate::store_api::StorageAPI;
|
||||
|
||||
type TypeConfigFile = &'static str;
|
||||
|
||||
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
|
||||
pub const BUCKET_METADATA_FORMAT: u16 = 1;
|
||||
pub const BUCKET_METADATA_VERSION: u16 = 1;
|
||||
|
||||
pub const BUCKET_POLICY_CONFIG: &str = "policy.json";
|
||||
pub const BUCKET_NOTIFICATION_CONFIG: &str = "notification.xml";
|
||||
pub const BUCKET_LIFECYCLE_CONFIG: &str = "lifecycle.xml";
|
||||
pub const BUCKET_SSECONFIG: &str = "bucket-encryption.xml";
|
||||
pub const BUCKET_TAGGING_CONFIG: &str = "tagging.xml";
|
||||
pub const BUCKET_QUOTA_CONFIG_FILE: &str = "quota.json";
|
||||
pub const OBJECT_LOCK_CONFIG: &str = "object-lock.xml";
|
||||
pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml";
|
||||
pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml";
|
||||
pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json";
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
#[serde(rename_all = "PascalCase", default)]
|
||||
pub struct BucketMetadata {
|
||||
@@ -170,7 +182,57 @@ impl BucketMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
async fn save(&mut self, api: &ECStore) -> Result<()> {
|
||||
pub fn update_config(&mut self, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
|
||||
let updated = OffsetDateTime::now_utc();
|
||||
|
||||
match config_file {
|
||||
BUCKET_POLICY_CONFIG => {
|
||||
self.policy_config_json = data;
|
||||
self.policy_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_NOTIFICATION_CONFIG => {
|
||||
self.notification_config_xml = data;
|
||||
self.notification_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_LIFECYCLE_CONFIG => {
|
||||
self.lifecycle_config_xml = data;
|
||||
self.lifecycle_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_SSECONFIG => {
|
||||
self.encryption_config_xml = data;
|
||||
self.encryption_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_TAGGING_CONFIG => {
|
||||
self.tagging_config_xml = data;
|
||||
self.tagging_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_QUOTA_CONFIG_FILE => {
|
||||
self.quota_config_json = data;
|
||||
self.quota_config_updated_at = updated;
|
||||
}
|
||||
OBJECT_LOCK_CONFIG => {
|
||||
self.object_lock_config_xml = data;
|
||||
self.object_lock_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_VERSIONING_CONFIG => {
|
||||
self.versioning_config_xml = data;
|
||||
self.versioning_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_REPLICATION_CONFIG => {
|
||||
self.replication_config_xml = data;
|
||||
self.replication_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_TARGETS_FILE => {
|
||||
self.tagging_config_xml = data;
|
||||
self.tagging_config_updated_at = updated;
|
||||
}
|
||||
_ => return Err(Error::msg(format!("config file not found : {}", config_file))),
|
||||
}
|
||||
|
||||
Ok(updated)
|
||||
}
|
||||
|
||||
pub async fn save(&mut self, api: &ECStore) -> Result<()> {
|
||||
self.parse_all_configs(api)?;
|
||||
|
||||
let mut buf: Vec<u8> = vec![0; 4];
|
||||
@@ -201,14 +263,12 @@ pub async fn load_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketM
|
||||
load_bucket_metadata_parse(api, bucket, true).await
|
||||
}
|
||||
|
||||
async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result<BucketMetadata> {
|
||||
pub async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result<BucketMetadata> {
|
||||
let mut bm = match read_bucket_metadata(api, bucket).await {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
if let Some(e) = err.downcast_ref::<ConfigError>() {
|
||||
if !ConfigError::is_not_found(&e) {
|
||||
return Err(err);
|
||||
}
|
||||
if !config::error::is_not_found(&err) {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
BucketMetadata::new(bucket)
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use crate::bucket::error::BucketMetadataError;
|
||||
use crate::bucket::metadata::load_bucket_metadata_parse;
|
||||
use crate::bucket::utils::is_meta_bucketname;
|
||||
use crate::config;
|
||||
use crate::config::error::ConfigError;
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn};
|
||||
use crate::store::ECStore;
|
||||
use futures::future::join_all;
|
||||
use lazy_static::lazy_static;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::RwLock;
|
||||
@@ -10,41 +18,132 @@ use super::metadata::{load_bucket_metadata, BucketMetadata};
|
||||
use super::tags;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_BucketMetadataSys: Arc<Option<BucketMetadataSys>> = Arc::new(Some(BucketMetadataSys::new()));
|
||||
static ref GLOBAL_BucketMetadataSys: Arc<BucketMetadataSys> = Arc::new(BucketMetadataSys::new());
|
||||
}
|
||||
|
||||
pub fn get_bucket_metadata_sys() -> Arc<Option<BucketMetadataSys>> {
|
||||
pub fn get_bucket_metadata_sys() -> Arc<BucketMetadataSys> {
|
||||
GLOBAL_BucketMetadataSys.clone()
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BucketMetadataSys {
|
||||
metadata_map: RwLock<HashMap<String, BucketMetadata>>,
|
||||
api: Option<Arc<ECStore>>,
|
||||
api: Option<ECStore>,
|
||||
initialized: RwLock<bool>,
|
||||
}
|
||||
|
||||
impl BucketMetadataSys {
|
||||
fn new() -> Self {
|
||||
Self { ..Default::default() }
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn init(&mut self, api: Arc<ECStore>, buckets: Vec<String>) {
|
||||
self.api = Some(api);
|
||||
pub async fn init(&mut self, api: Option<ECStore>, buckets: Vec<&str>) -> Result<()> {
|
||||
if api.is_none() {
|
||||
return Err(Error::msg("errServerNotInitialized"));
|
||||
}
|
||||
self.api = api;
|
||||
|
||||
let _ = self.init_internal(buckets).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn init_internal(&self, buckets: Vec<&str>) -> Result<()> {
|
||||
if self.api.is_none() {
|
||||
return Err(Error::msg("errServerNotInitialized"));
|
||||
}
|
||||
let mut futures = Vec::new();
|
||||
let mut errs = Vec::new();
|
||||
let mut ress = Vec::new();
|
||||
|
||||
for &bucket in buckets.iter() {
|
||||
futures.push(load_bucket_metadata(self.api.as_ref().unwrap(), bucket));
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
|
||||
for res in results {
|
||||
match res {
|
||||
Ok(entrys) => {
|
||||
ress.push(Some(entrys));
|
||||
errs.push(None);
|
||||
}
|
||||
Err(e) => {
|
||||
ress.push(None);
|
||||
errs.push(Some(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn concurrent_load(&self, buckets: Vec<&str>) -> Result<Vec<&str>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
pub async fn get(&self, bucket: &str) -> Result<BucketMetadata> {
|
||||
if is_meta_bucketname(bucket) {
|
||||
return Err(Error::new(ConfigError::NotFound));
|
||||
}
|
||||
|
||||
let map = self.metadata_map.read().await;
|
||||
if let Some(bm) = map.get(bucket) {
|
||||
Ok(bm.clone())
|
||||
} else {
|
||||
Err(Error::new(ConfigError::NotFound))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set(&self, bucket: &str, bm: BucketMetadata) {
|
||||
if !is_meta_bucketname(bucket) {
|
||||
let mut map = self.metadata_map.write().await;
|
||||
map.insert(bucket.to_string(), bm);
|
||||
}
|
||||
}
|
||||
|
||||
async fn reset(&mut self) {
|
||||
let mut map = self.metadata_map.write().await;
|
||||
map.clear();
|
||||
}
|
||||
|
||||
pub async fn get_config(&self, bucket: String) -> Result<(BucketMetadata, bool)> {
|
||||
pub async fn update(&mut self, bucket: &str, config_file: &str, data: Vec<u8>) -> Result<(OffsetDateTime)> {
|
||||
self.update_and_parse(bucket, config_file, data, true).await
|
||||
}
|
||||
|
||||
async fn update_and_parse(&mut self, bucket: &str, config_file: &str, data: Vec<u8>, parse: bool) -> Result<OffsetDateTime> {
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(Error::msg("errServerNotInitialized")),
|
||||
};
|
||||
|
||||
if is_meta_bucketname(&bucket) {
|
||||
return Err(Error::msg("errInvalidArgument"));
|
||||
}
|
||||
|
||||
let mut bm = match load_bucket_metadata_parse(store, &bucket, parse).await {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
if !is_erasure().await && !is_dist_erasure().await && DiskError::VolumeNotFound.is(&err) {
|
||||
BucketMetadata::new(&bucket)
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let updated = bm.update_config(config_file, data)?;
|
||||
|
||||
bm.save(store).await?;
|
||||
|
||||
Ok(updated)
|
||||
}
|
||||
|
||||
pub async fn get_config(&self, bucket: &str) -> Result<(BucketMetadata, bool)> {
|
||||
if let Some(api) = self.api.as_ref() {
|
||||
let has_bm = {
|
||||
let map = self.metadata_map.read().await;
|
||||
if let Some(bm) = map.get(&bucket) {
|
||||
if let Some(bm) = map.get(&bucket.to_string()) {
|
||||
Some(bm.clone())
|
||||
} else {
|
||||
None
|
||||
@@ -54,7 +153,7 @@ impl BucketMetadataSys {
|
||||
if let Some(bm) = has_bm {
|
||||
return Ok((bm, false));
|
||||
} else {
|
||||
let bm = match load_bucket_metadata(&api, bucket.as_str()).await {
|
||||
let bm = match load_bucket_metadata(&api, bucket).await {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
if *self.initialized.read().await {
|
||||
@@ -67,7 +166,7 @@ impl BucketMetadataSys {
|
||||
|
||||
let mut map = self.metadata_map.write().await;
|
||||
|
||||
map.insert(bucket, bm.clone());
|
||||
map.insert(bucket.to_string(), bm.clone());
|
||||
|
||||
Ok((bm, true))
|
||||
}
|
||||
@@ -76,7 +175,22 @@ impl BucketMetadataSys {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_tagging_config(&self, bucket: String) -> Result<(tags::Tags, Option<OffsetDateTime>)> {
|
||||
unimplemented!()
|
||||
pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> {
|
||||
let bm = match self.get_config(bucket).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(err) => {
|
||||
if config::error::is_not_found(&err) {
|
||||
return Err(Error::new(BucketMetadataError::TaggingNotFound));
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(config) = bm.tagging_config {
|
||||
Ok((config, bm.tagging_config_updated_at))
|
||||
} else {
|
||||
Err(Error::new(BucketMetadataError::TaggingNotFound))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod encryption;
|
||||
mod error;
|
||||
mod event;
|
||||
mod lifecycle;
|
||||
pub mod metadata;
|
||||
@@ -10,5 +11,6 @@ mod replication;
|
||||
mod tags;
|
||||
mod target;
|
||||
mod versioning;
|
||||
pub mod utils;
|
||||
|
||||
pub use metadata_sys::get_bucket_metadata_sys;
|
||||
|
||||
5
ecstore/src/bucket/utils.rs
Normal file
5
ecstore/src/bucket/utils.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
use crate::disk::RUSTFS_META_BUCKET;
|
||||
|
||||
pub fn is_meta_bucketname(name: &str) -> bool {
|
||||
name.starts_with(RUSTFS_META_BUCKET)
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
use crate::error::Error;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ConfigError {
|
||||
#[error("config not found")]
|
||||
@@ -13,3 +15,11 @@ impl ConfigError {
|
||||
matches!(self, Self::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_not_found(err: &Error) -> bool {
|
||||
if let Some(e) = err.downcast_ref::<ConfigError>() {
|
||||
ConfigError::is_not_found(&e)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::{
|
||||
};
|
||||
|
||||
/// enum for setup type.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub enum SetupType {
|
||||
/// starts with unknown setup type.
|
||||
Unknown,
|
||||
@@ -111,6 +111,7 @@ impl Endpoints {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// a temporary type to holds the list of endpoints
|
||||
struct PoolEndpointList {
|
||||
inner: Vec<Endpoints>,
|
||||
@@ -439,6 +440,10 @@ impl EndpointServerPools {
|
||||
Ok((ret, pool_eps.setup_type))
|
||||
}
|
||||
|
||||
pub fn es_count(&self) -> usize {
|
||||
self.0.iter().map(|v| v.set_count).count()
|
||||
}
|
||||
|
||||
/// add pool endpoints
|
||||
pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> {
|
||||
let mut exits = HashSet::new();
|
||||
|
||||
62
ecstore/src/global.rs
Normal file
62
ecstore/src/global.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use crate::error::Result;
|
||||
use lazy_static::lazy_static;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::{fs, sync::RwLock};
|
||||
|
||||
use crate::{
|
||||
disk::{new_disk, DiskOption, DiskStore},
|
||||
endpoints::{EndpointServerPools, SetupType},
|
||||
store::ECStore,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_OBJECT_API: Arc<RwLock<Option<ECStore>>> = Arc::new(RwLock::new(None));
|
||||
pub static ref GLOBAL_LOCAL_DISK: Arc<RwLock<Vec<Option<DiskStore>>>> = Arc::new(RwLock::new(Vec::new()));
|
||||
}
|
||||
|
||||
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
|
||||
GLOBAL_OBJECT_API.clone()
|
||||
}
|
||||
|
||||
pub async fn set_object_layer(o: ECStore) {
|
||||
let mut global_object_api = GLOBAL_OBJECT_API.write().await;
|
||||
*global_object_api = Some(o);
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref GLOBAL_IsErasure: RwLock<bool> = RwLock::new(false);
|
||||
static ref GLOBAL_IsDistErasure: RwLock<bool> = RwLock::new(false);
|
||||
static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
|
||||
}
|
||||
|
||||
pub async fn is_dist_erasure() -> bool {
|
||||
let lock = GLOBAL_IsDistErasure.read().await;
|
||||
*lock == true
|
||||
}
|
||||
|
||||
pub async fn is_erasure() -> bool {
|
||||
let lock = GLOBAL_IsErasure.read().await;
|
||||
*lock == true
|
||||
}
|
||||
|
||||
pub async fn update_erasure_type(setup_type: SetupType) {
|
||||
let mut is_erasure = GLOBAL_IsErasure.write().await;
|
||||
*is_erasure = setup_type == SetupType::Erasure;
|
||||
|
||||
let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await;
|
||||
*is_dist_erasure = setup_type == SetupType::DistErasure;
|
||||
|
||||
if *is_dist_erasure {
|
||||
*is_erasure = true
|
||||
}
|
||||
|
||||
let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await;
|
||||
*is_erasure_sd = setup_type == SetupType::ErasureSD;
|
||||
}
|
||||
|
||||
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
|
||||
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
|
||||
}
|
||||
@@ -7,6 +7,7 @@ pub mod endpoints;
|
||||
pub mod erasure;
|
||||
pub mod error;
|
||||
mod file_meta;
|
||||
mod global;
|
||||
pub mod peer;
|
||||
mod quorum;
|
||||
pub mod set_disk;
|
||||
@@ -18,3 +19,6 @@ mod store_init;
|
||||
mod utils;
|
||||
|
||||
pub mod bucket;
|
||||
|
||||
pub use global::new_object_layer_fn;
|
||||
pub use global::update_erasure_type;
|
||||
|
||||
@@ -15,8 +15,8 @@ use crate::{
|
||||
},
|
||||
endpoints::PoolEndpoints,
|
||||
error::{Error, Result},
|
||||
global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
|
||||
set_disk::SetDisks,
|
||||
store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
|
||||
store_api::{
|
||||
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
|
||||
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete,
|
||||
@@ -94,7 +94,7 @@ impl Sets {
|
||||
continue;
|
||||
}
|
||||
|
||||
if disk.as_ref().unwrap().is_local() && *GLOBAL_IsDistErasure.read().await {
|
||||
if disk.as_ref().unwrap().is_local() && is_dist_erasure().await {
|
||||
let local_disk = {
|
||||
let local_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.read().await;
|
||||
local_set_drives[pool_idx][i][j].clone()
|
||||
@@ -124,7 +124,7 @@ impl Sets {
|
||||
let set_disks = SetDisks {
|
||||
lockers: lockers[i].clone(),
|
||||
locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(),
|
||||
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))),
|
||||
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))),
|
||||
disks: RwLock::new(set_drive),
|
||||
set_drive_count,
|
||||
default_parity_count: partiy_count,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#![allow(clippy::map_entry)]
|
||||
use crate::disk::endpoint::EndpointType;
|
||||
use crate::global::{is_dist_erasure, set_object_layer, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES};
|
||||
use crate::store_api::ObjectIO;
|
||||
use crate::{
|
||||
bucket::metadata::BucketMetadata,
|
||||
@@ -27,129 +28,12 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::fs;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::{fs, sync::RwLock};
|
||||
|
||||
use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_IsErasure: RwLock<bool> = RwLock::new(false);
|
||||
pub static ref GLOBAL_IsDistErasure: RwLock<bool> = RwLock::new(false);
|
||||
pub static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
|
||||
}
|
||||
|
||||
pub async fn update_erasure_type(setup_type: SetupType) {
|
||||
let mut is_erasure = GLOBAL_IsErasure.write().await;
|
||||
*is_erasure = setup_type == SetupType::Erasure;
|
||||
|
||||
let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await;
|
||||
*is_dist_erasure = setup_type == SetupType::DistErasure;
|
||||
|
||||
if *is_dist_erasure {
|
||||
*is_erasure = true
|
||||
}
|
||||
|
||||
let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await;
|
||||
*is_erasure_sd = setup_type == SetupType::ErasureSD;
|
||||
}
|
||||
|
||||
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
|
||||
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
|
||||
}
|
||||
|
||||
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
|
||||
let disk_path = match fs::canonicalize(disk_path).await {
|
||||
Ok(disk_path) => disk_path,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
|
||||
let path = disk_path.to_string_lossy().to_string();
|
||||
if disk_map.contains_key(&path) {
|
||||
let a = disk_map[&path].as_ref().cloned();
|
||||
|
||||
return a;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn all_local_disk_path() -> Vec<String> {
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
disk_map.keys().cloned().collect()
|
||||
}
|
||||
|
||||
pub async fn all_local_disk() -> Vec<DiskStore> {
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
disk_map
|
||||
.values()
|
||||
.filter(|v| v.is_some())
|
||||
.map(|v| v.as_ref().unwrap().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
// init_local_disks 初始化本地磁盘,server启动前必须初始化成功
|
||||
pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> {
|
||||
let opt = &DiskOption {
|
||||
cleanup: true,
|
||||
health_check: true,
|
||||
};
|
||||
|
||||
let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await;
|
||||
for pool_eps in endpoint_pools.as_ref().iter() {
|
||||
let mut set_count_drives = Vec::with_capacity(pool_eps.set_count);
|
||||
for _ in 0..pool_eps.set_count {
|
||||
set_count_drives.push(vec![None; pool_eps.drives_per_set]);
|
||||
}
|
||||
|
||||
global_set_drives.push(set_count_drives);
|
||||
}
|
||||
|
||||
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
|
||||
|
||||
for pool_eps in endpoint_pools.as_ref().iter() {
|
||||
let mut set_drives = HashMap::new();
|
||||
for ep in pool_eps.endpoints.as_ref().iter() {
|
||||
if !ep.is_local {
|
||||
continue;
|
||||
}
|
||||
|
||||
let disk = new_disk(ep, opt).await?;
|
||||
|
||||
let path = disk.path().to_string_lossy().to_string();
|
||||
|
||||
global_local_disk_map.insert(path, Some(disk.clone()));
|
||||
|
||||
set_drives.insert(ep.disk_idx, Some(disk.clone()));
|
||||
|
||||
if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() {
|
||||
global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_OBJECT_API: Arc<RwLock<Option<ECStore>>> = Arc::new(RwLock::new(None));
|
||||
pub static ref GLOBAL_LOCAL_DISK: Arc<RwLock<Vec<Option<DiskStore>>>> = Arc::new(RwLock::new(Vec::new()));
|
||||
}
|
||||
|
||||
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
|
||||
GLOBAL_OBJECT_API.clone()
|
||||
}
|
||||
|
||||
async fn set_object_layer(o: ECStore) {
|
||||
let mut global_object_api = GLOBAL_OBJECT_API.write().await;
|
||||
*global_object_api = Some(o);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ECStore {
|
||||
pub id: uuid::Uuid,
|
||||
@@ -244,7 +128,7 @@ impl ECStore {
|
||||
}
|
||||
|
||||
// 替换本地磁盘
|
||||
if !*GLOBAL_IsDistErasure.read().await {
|
||||
if !is_dist_erasure().await {
|
||||
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
|
||||
for disk in local_disks {
|
||||
let path = disk.path().to_string_lossy().to_string();
|
||||
@@ -403,6 +287,80 @@ impl ECStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
|
||||
let disk_path = match fs::canonicalize(disk_path).await {
|
||||
Ok(disk_path) => disk_path,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
|
||||
let path = disk_path.to_string_lossy().to_string();
|
||||
if disk_map.contains_key(&path) {
|
||||
let a = disk_map[&path].as_ref().cloned();
|
||||
|
||||
return a;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn all_local_disk_path() -> Vec<String> {
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
disk_map.keys().cloned().collect()
|
||||
}
|
||||
|
||||
pub async fn all_local_disk() -> Vec<DiskStore> {
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
disk_map
|
||||
.values()
|
||||
.filter(|v| v.is_some())
|
||||
.map(|v| v.as_ref().unwrap().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
// init_local_disks 初始化本地磁盘,server启动前必须初始化成功
|
||||
pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> {
|
||||
let opt = &DiskOption {
|
||||
cleanup: true,
|
||||
health_check: true,
|
||||
};
|
||||
|
||||
let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await;
|
||||
for pool_eps in endpoint_pools.as_ref().iter() {
|
||||
let mut set_count_drives = Vec::with_capacity(pool_eps.set_count);
|
||||
for _ in 0..pool_eps.set_count {
|
||||
set_count_drives.push(vec![None; pool_eps.drives_per_set]);
|
||||
}
|
||||
|
||||
global_set_drives.push(set_count_drives);
|
||||
}
|
||||
|
||||
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
|
||||
|
||||
for pool_eps in endpoint_pools.as_ref().iter() {
|
||||
let mut set_drives = HashMap::new();
|
||||
for ep in pool_eps.endpoints.as_ref().iter() {
|
||||
if !ep.is_local {
|
||||
continue;
|
||||
}
|
||||
|
||||
let disk = new_disk(ep, opt).await?;
|
||||
|
||||
let path = disk.path().to_string_lossy().to_string();
|
||||
|
||||
global_local_disk_map.insert(path, Some(disk.clone()));
|
||||
|
||||
set_drives.insert(ep.disk_idx, Some(disk.clone()));
|
||||
|
||||
if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() {
|
||||
global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn internal_get_pool_info_existing_with_opts(
|
||||
pools: &[Arc<Sets>],
|
||||
bucket: &str,
|
||||
|
||||
@@ -7,7 +7,8 @@ use clap::Parser;
|
||||
use common::error::{Error, Result};
|
||||
use ecstore::{
|
||||
endpoints::EndpointServerPools,
|
||||
store::{init_local_disks, update_erasure_type, ECStore},
|
||||
store::{init_local_disks, ECStore},
|
||||
update_erasure_type,
|
||||
};
|
||||
use grpc::make_server;
|
||||
use hyper_util::{
|
||||
@@ -91,7 +92,6 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
.map_err(|err| Error::from_string(err.to_string()))?;
|
||||
|
||||
update_erasure_type(setup_type).await;
|
||||
|
||||
// 初始化本地磁盘
|
||||
init_local_disks(endpoint_pools.clone())
|
||||
.await
|
||||
|
||||
@@ -4,7 +4,7 @@ use ecstore::bucket::get_bucket_metadata_sys;
|
||||
use ecstore::bucket_meta::BucketMetadata;
|
||||
use ecstore::disk::error::DiskError;
|
||||
use ecstore::disk::RUSTFS_META_BUCKET;
|
||||
use ecstore::store::new_object_layer_fn;
|
||||
use ecstore::new_object_layer_fn;
|
||||
use ecstore::store_api::BucketOptions;
|
||||
use ecstore::store_api::CompletePart;
|
||||
use ecstore::store_api::DeleteBucketOptions;
|
||||
|
||||
Reference in New Issue
Block a user