mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
更新全局disk
This commit is contained in:
@@ -19,18 +19,29 @@ use std::{
|
||||
use time::OffsetDateTime;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::ErrorKind;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FormatInfo {
|
||||
pub id: Option<Uuid>,
|
||||
pub data: Vec<u8>,
|
||||
pub file_info: Option<Metadata>,
|
||||
pub last_check: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
impl FormatInfo {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LocalDisk {
|
||||
pub root: PathBuf,
|
||||
pub id: Uuid,
|
||||
pub _format_data: Vec<u8>,
|
||||
pub _format_meta: Option<Metadata>,
|
||||
pub _format_path: PathBuf,
|
||||
// pub format_legacy: bool, // drop
|
||||
pub _format_last_check: Option<OffsetDateTime>,
|
||||
pub format_info: Mutex<FormatInfo>,
|
||||
// pub id: Mutex<Option<Uuid>>,
|
||||
// pub format_data: Mutex<Vec<u8>>,
|
||||
// pub format_file_info: Mutex<Option<Metadata>>,
|
||||
// pub format_last_check: Mutex<Option<OffsetDateTime>>,
|
||||
}
|
||||
|
||||
impl LocalDisk {
|
||||
@@ -48,7 +59,7 @@ impl LocalDisk {
|
||||
|
||||
let (format_data, format_meta) = read_file_exists(&format_path).await?;
|
||||
|
||||
let mut id = Uuid::nil();
|
||||
let mut id = None;
|
||||
// let mut format_legacy = false;
|
||||
let mut format_last_check = None;
|
||||
|
||||
@@ -61,19 +72,26 @@ impl LocalDisk {
|
||||
return Err(Error::from(DiskError::InconsistentDisk));
|
||||
}
|
||||
|
||||
id = fm.erasure.this;
|
||||
id = Some(fm.erasure.this);
|
||||
// format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1;
|
||||
format_last_check = Some(OffsetDateTime::now_utc());
|
||||
}
|
||||
|
||||
let format_info = FormatInfo {
|
||||
id,
|
||||
data: format_data,
|
||||
file_info: format_meta,
|
||||
last_check: format_last_check,
|
||||
};
|
||||
|
||||
let disk = Self {
|
||||
root,
|
||||
id,
|
||||
_format_meta: format_meta,
|
||||
_format_data: format_data,
|
||||
_format_path: format_path,
|
||||
// format_legacy,
|
||||
_format_last_check: format_last_check,
|
||||
format_info: Mutex::new(format_info),
|
||||
// // format_legacy,
|
||||
// format_file_info: Mutex::new(format_meta),
|
||||
// format_data: Mutex::new(format_data),
|
||||
// format_last_check: Mutex::new(format_last_check),
|
||||
};
|
||||
|
||||
disk.make_meta_volumes().await?;
|
||||
@@ -389,15 +407,26 @@ impl DiskAPI for LocalDisk {
|
||||
fn is_local(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn id(&self) -> Uuid {
|
||||
self.id
|
||||
async fn close(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn path(&self) -> PathBuf {
|
||||
self.root.clone()
|
||||
}
|
||||
|
||||
async fn get_disk_id(&self) -> Option<Uuid> {
|
||||
// TODO: check format file
|
||||
let format_info = self.format_info.lock().await;
|
||||
|
||||
format_info.id.clone()
|
||||
// TODO: 判断源文件id,是否有效
|
||||
}
|
||||
|
||||
async fn set_disk_id(&self, _id: Option<Uuid>) -> Result<()> {
|
||||
// 本地不需要设置
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
let p = self.get_object_path(volume, path)?;
|
||||
|
||||
@@ -46,8 +46,10 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskS
|
||||
#[async_trait::async_trait]
|
||||
pub trait DiskAPI: Debug + Send + Sync + 'static {
|
||||
fn is_local(&self) -> bool;
|
||||
fn id(&self) -> Uuid;
|
||||
fn path(&self) -> PathBuf;
|
||||
async fn close(&self) -> Result<()>;
|
||||
async fn get_disk_id(&self) -> Option<Uuid>;
|
||||
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()>;
|
||||
|
||||
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>;
|
||||
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{path::PathBuf, sync::RwLock, time::Duration};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::lock::Mutex;
|
||||
use protos::{
|
||||
node_service_time_out_client,
|
||||
proto_gen::node_service::{
|
||||
@@ -32,6 +33,7 @@ use super::{
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteDisk {
|
||||
id: Mutex<Option<Uuid>>,
|
||||
channel: RwLock<Option<Channel>>,
|
||||
url: url::Url,
|
||||
pub root: PathBuf,
|
||||
@@ -45,6 +47,7 @@ impl RemoteDisk {
|
||||
channel: RwLock::new(None),
|
||||
url: ep.url.clone(),
|
||||
root,
|
||||
id: Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -83,15 +86,23 @@ impl DiskAPI for RemoteDisk {
|
||||
fn is_local(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn id(&self) -> Uuid {
|
||||
Uuid::nil()
|
||||
async fn close(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn path(&self) -> PathBuf {
|
||||
self.root.clone()
|
||||
}
|
||||
|
||||
async fn get_disk_id(&self) -> Option<Uuid> {
|
||||
self.id.lock().await.clone()
|
||||
}
|
||||
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
|
||||
let mut lock = self.id.lock().await;
|
||||
*lock = id;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
|
||||
let mut client = self.get_client();
|
||||
let request = Request::new(ReadAllRequest {
|
||||
|
||||
@@ -8,8 +8,9 @@ use tonic::transport::{Channel, Endpoint};
|
||||
use tonic::Request;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::store::all_local_disk;
|
||||
use crate::{
|
||||
disk::{self, error::DiskError, DiskStore, VolumeInfo},
|
||||
disk::{self, error::DiskError, VolumeInfo},
|
||||
endpoints::{EndpointServerPools, Node},
|
||||
error::{Error, Result},
|
||||
store_api::{BucketInfo, BucketOptions, MakeBucketOptions},
|
||||
@@ -33,21 +34,20 @@ pub struct S3PeerSys {
|
||||
}
|
||||
|
||||
impl S3PeerSys {
|
||||
pub fn new(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Self {
|
||||
pub fn new(eps: &EndpointServerPools) -> Self {
|
||||
Self {
|
||||
clients: Self::new_clients(eps, local_disks),
|
||||
clients: Self::new_clients(eps),
|
||||
pools_count: eps.as_ref().len(),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_clients(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Vec<Client> {
|
||||
fn new_clients(eps: &EndpointServerPools) -> Vec<Client> {
|
||||
let nodes = eps.get_nodes();
|
||||
let v: Vec<Client> = nodes
|
||||
.iter()
|
||||
.map(|e| {
|
||||
if e.is_local {
|
||||
let cli: Box<dyn PeerS3Client> =
|
||||
Box::new(LocalPeerS3Client::new(local_disks.clone(), Some(e.clone()), Some(e.pools.clone())));
|
||||
let cli: Box<dyn PeerS3Client> = Box::new(LocalPeerS3Client::new(Some(e.clone()), Some(e.pools.clone())));
|
||||
Arc::new(cli)
|
||||
} else {
|
||||
let cli: Box<dyn PeerS3Client> = Box::new(RemotePeerS3Client::new(Some(e.clone()), Some(e.pools.clone())));
|
||||
@@ -216,15 +216,15 @@ impl S3PeerSys {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LocalPeerS3Client {
|
||||
pub local_disks: Vec<DiskStore>,
|
||||
// pub local_disks: Vec<DiskStore>,
|
||||
// pub node: Node,
|
||||
pub pools: Option<Vec<usize>>,
|
||||
}
|
||||
|
||||
impl LocalPeerS3Client {
|
||||
pub fn new(local_disks: Vec<DiskStore>, _node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
|
||||
pub fn new(_node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
|
||||
Self {
|
||||
local_disks,
|
||||
// local_disks,
|
||||
// node,
|
||||
pools,
|
||||
}
|
||||
@@ -237,8 +237,10 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
self.pools.clone()
|
||||
}
|
||||
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
|
||||
let mut futures = Vec::with_capacity(self.local_disks.len());
|
||||
for disk in self.local_disks.iter() {
|
||||
let local_disks = all_local_disk().await;
|
||||
|
||||
let mut futures = Vec::with_capacity(local_disks.len());
|
||||
for disk in local_disks.iter() {
|
||||
futures.push(disk.list_volumes());
|
||||
}
|
||||
|
||||
@@ -283,8 +285,9 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
Ok(buckets)
|
||||
}
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
|
||||
let mut futures = Vec::with_capacity(self.local_disks.len());
|
||||
for disk in self.local_disks.iter() {
|
||||
let local_disks = all_local_disk().await;
|
||||
let mut futures = Vec::with_capacity(local_disks.len());
|
||||
for disk in local_disks.iter() {
|
||||
futures.push(async move {
|
||||
match disk.make_volume(bucket).await {
|
||||
Ok(_) => Ok(()),
|
||||
@@ -316,15 +319,16 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
}
|
||||
|
||||
async fn get_bucket_info(&self, bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
|
||||
let mut futures = Vec::with_capacity(self.local_disks.len());
|
||||
for disk in self.local_disks.iter() {
|
||||
let local_disks = all_local_disk().await;
|
||||
let mut futures = Vec::with_capacity(local_disks.len());
|
||||
for disk in local_disks.iter() {
|
||||
futures.push(disk.stat_volume(bucket));
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
|
||||
let mut ress = Vec::with_capacity(self.local_disks.len());
|
||||
let mut errs = Vec::with_capacity(self.local_disks.len());
|
||||
let mut ress = Vec::with_capacity(local_disks.len());
|
||||
let mut errs = Vec::with_capacity(local_disks.len());
|
||||
|
||||
for res in results {
|
||||
match res {
|
||||
@@ -354,9 +358,10 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
}
|
||||
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
let mut futures = Vec::with_capacity(self.local_disks.len());
|
||||
let local_disks = all_local_disk().await;
|
||||
let mut futures = Vec::with_capacity(local_disks.len());
|
||||
|
||||
for disk in self.local_disks.iter() {
|
||||
for disk in local_disks.iter() {
|
||||
futures.push(disk.delete_volume(bucket));
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::{
|
||||
endpoints::PoolEndpoints,
|
||||
error::{Error, Result},
|
||||
set_disk::SetDisks,
|
||||
store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
|
||||
store_api::{
|
||||
BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info,
|
||||
MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
|
||||
@@ -35,7 +36,7 @@ pub struct Sets {
|
||||
}
|
||||
|
||||
impl Sets {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
disks: Vec<Option<DiskStore>>,
|
||||
endpoints: &PoolEndpoints,
|
||||
fm: &FormatV3,
|
||||
@@ -51,11 +52,32 @@ impl Sets {
|
||||
let mut set_drive = Vec::with_capacity(set_drive_count);
|
||||
for j in 0..set_drive_count {
|
||||
let idx = i * set_drive_count + j;
|
||||
if disks[idx].is_none() {
|
||||
let mut disk = disks[idx].clone();
|
||||
if disk.is_none() {
|
||||
set_drive.push(None);
|
||||
} else {
|
||||
let disk = disks[idx].clone();
|
||||
continue;
|
||||
}
|
||||
|
||||
if disk.as_ref().unwrap().is_local() && *GLOBAL_IsDistErasure.read().await {
|
||||
let local_disk = {
|
||||
let local_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.read().await;
|
||||
local_set_drives[pool_idx][i][j].clone()
|
||||
};
|
||||
|
||||
if local_disk.is_none() {
|
||||
set_drive.push(None);
|
||||
continue;
|
||||
}
|
||||
|
||||
let _ = disk.as_ref().unwrap().close().await;
|
||||
|
||||
disk = local_disk;
|
||||
}
|
||||
|
||||
if let Some(_disk_id) = disk.as_ref().unwrap().get_disk_id().await {
|
||||
set_drive.push(disk);
|
||||
} else {
|
||||
set_drive.push(None);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use crate::{
|
||||
bucket_meta::BucketMetadata,
|
||||
disk::{error::DiskError, DeleteOptions, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
|
||||
endpoints::EndpointServerPools,
|
||||
disk::{
|
||||
error::DiskError, new_disk, DeleteOptions, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET,
|
||||
},
|
||||
endpoints::{EndpointServerPools, SetupType},
|
||||
error::{Error, Result},
|
||||
peer::S3PeerSys,
|
||||
sets::Sets,
|
||||
@@ -20,24 +22,124 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::{fs, sync::RwLock};
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_OBJECT_API: Arc<Mutex<Option<ECStore>>> = Arc::new(Mutex::new(None));
|
||||
pub static ref GLOBAL_LOCAL_DISK: Arc<Mutex<Vec<Option<DiskStore>>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
pub static ref GLOBAL_IsErasure: RwLock<bool> = RwLock::new(false);
|
||||
pub static ref GLOBAL_IsDistErasure: RwLock<bool> = RwLock::new(false);
|
||||
pub static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
|
||||
}
|
||||
|
||||
pub fn new_object_layer_fn() -> Arc<Mutex<Option<ECStore>>> {
|
||||
// 这里不需要显式地锁定和解锁,因为 Arc 提供了必要的线程安全性
|
||||
pub async fn update_erasure_type(setup_type: SetupType) {
|
||||
let mut is_erasure = GLOBAL_IsErasure.write().await;
|
||||
*is_erasure = setup_type == SetupType::Erasure;
|
||||
|
||||
let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await;
|
||||
*is_dist_erasure = setup_type == SetupType::DistErasure;
|
||||
|
||||
if *is_dist_erasure {
|
||||
*is_erasure = true
|
||||
}
|
||||
|
||||
let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await;
|
||||
*is_erasure_sd = setup_type == SetupType::ErasureSD;
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
|
||||
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<Vec<Vec<Vec<Option<DiskStore>>>>>> =
|
||||
Arc::new(RwLock::new(Vec::new()));
|
||||
}
|
||||
|
||||
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
|
||||
let disk_path = match fs::canonicalize(disk_path).await {
|
||||
Ok(disk_path) => disk_path,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
|
||||
let path = disk_path.to_string_lossy().to_string();
|
||||
if disk_map.contains_key(&path) {
|
||||
let a = disk_map[&path].as_ref().cloned();
|
||||
|
||||
return a;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn all_local_disk_path() -> Vec<String> {
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
disk_map.keys().map(|v| v.clone()).collect()
|
||||
}
|
||||
|
||||
pub async fn all_local_disk() -> Vec<DiskStore> {
|
||||
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
|
||||
disk_map
|
||||
.values()
|
||||
.filter(|v| v.is_some())
|
||||
.map(|v| v.as_ref().unwrap().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
// init_local_disks 初始化本地磁盘,server启动前必须初始化成功
|
||||
pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> {
|
||||
let opt = &DiskOption {
|
||||
cleanup: true,
|
||||
health_check: true,
|
||||
};
|
||||
|
||||
let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await;
|
||||
for pool_eps in endpoint_pools.as_ref().iter() {
|
||||
let mut set_count_drives = Vec::with_capacity(pool_eps.set_count);
|
||||
for _ in 0..pool_eps.set_count {
|
||||
set_count_drives.push(vec![None; pool_eps.drives_per_set]);
|
||||
}
|
||||
|
||||
global_set_drives.push(set_count_drives);
|
||||
}
|
||||
|
||||
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
|
||||
|
||||
for pool_eps in endpoint_pools.as_ref().iter() {
|
||||
let mut set_drives = HashMap::new();
|
||||
for ep in pool_eps.endpoints.as_ref().iter() {
|
||||
if !ep.is_local {
|
||||
continue;
|
||||
}
|
||||
|
||||
let disk = new_disk(ep, opt).await?;
|
||||
|
||||
let path = disk.path().to_string_lossy().to_string();
|
||||
|
||||
global_local_disk_map.insert(path, Some(disk.clone()));
|
||||
|
||||
set_drives.insert(ep.disk_idx, Some(disk.clone()));
|
||||
|
||||
if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() {
|
||||
global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_OBJECT_API: Arc<RwLock<Option<ECStore>>> = Arc::new(RwLock::new(None));
|
||||
pub static ref GLOBAL_LOCAL_DISK: Arc<RwLock<Vec<Option<DiskStore>>>> = Arc::new(RwLock::new(Vec::new()));
|
||||
}
|
||||
|
||||
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
|
||||
GLOBAL_OBJECT_API.clone()
|
||||
}
|
||||
|
||||
async fn set_object_layer(o: ECStore) {
|
||||
let mut global_object_api = GLOBAL_OBJECT_API.lock().await;
|
||||
let mut global_object_api = GLOBAL_OBJECT_API.write().await;
|
||||
*global_object_api = Some(o);
|
||||
}
|
||||
|
||||
@@ -48,7 +150,7 @@ pub struct ECStore {
|
||||
pub disk_map: HashMap<usize, Vec<Option<DiskStore>>>,
|
||||
pub pools: Vec<Sets>,
|
||||
pub peer_sys: S3PeerSys,
|
||||
pub local_disks: Vec<DiskStore>,
|
||||
// pub local_disks: Vec<DiskStore>,
|
||||
}
|
||||
|
||||
impl ECStore {
|
||||
@@ -108,20 +210,28 @@ impl ECStore {
|
||||
}
|
||||
}
|
||||
|
||||
let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count)?;
|
||||
let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count).await?;
|
||||
|
||||
pools.push(sets);
|
||||
|
||||
disk_map.insert(i, disks);
|
||||
}
|
||||
|
||||
let peer_sys = S3PeerSys::new(&endpoint_pools, local_disks.clone());
|
||||
// 替换本地磁盘
|
||||
if !*GLOBAL_IsDistErasure.read().await {
|
||||
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
|
||||
for disk in local_disks {
|
||||
let path = disk.path().to_string_lossy().to_string();
|
||||
global_local_disk_map.insert(path, Some(disk.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
let peer_sys = S3PeerSys::new(&endpoint_pools);
|
||||
|
||||
let ec = ECStore {
|
||||
id: deployment_id.unwrap(),
|
||||
disk_map,
|
||||
pools,
|
||||
local_disks,
|
||||
peer_sys,
|
||||
};
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use crate::{
|
||||
disk::error::DiskError,
|
||||
disk::format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
|
||||
disk::{new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
|
||||
disk::{
|
||||
error::DiskError,
|
||||
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
|
||||
new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET,
|
||||
},
|
||||
endpoints::Endpoints,
|
||||
error::{Error, Result},
|
||||
};
|
||||
@@ -10,6 +12,7 @@ use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
fmt::Debug,
|
||||
};
|
||||
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -188,17 +191,22 @@ pub fn default_partiy_count(drive: usize) -> usize {
|
||||
async fn read_format_file_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
|
||||
for ep in disks.iter() {
|
||||
futures.push(read_format_file(ep, heal));
|
||||
for disk in disks.iter() {
|
||||
futures.push(read_format_file(disk, heal));
|
||||
}
|
||||
|
||||
let mut datas = Vec::with_capacity(disks.len());
|
||||
let mut errors = Vec::with_capacity(disks.len());
|
||||
|
||||
let results = join_all(futures).await;
|
||||
let mut i = 0;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(s) => {
|
||||
if !heal {
|
||||
let _ = disks[i].as_ref().unwrap().set_disk_id(Some(s.erasure.this.clone())).await;
|
||||
}
|
||||
|
||||
datas.push(Some(s));
|
||||
errors.push(None);
|
||||
}
|
||||
@@ -207,6 +215,8 @@ async fn read_format_file_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<O
|
||||
errors.push(Some(e));
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
(datas, errors)
|
||||
@@ -238,8 +248,8 @@ async fn read_format_file(disk: &Option<DiskStore>, _heal: bool) -> Result<Forma
|
||||
async fn save_format_file_all(disks: &[Option<DiskStore>], formats: &[Option<FormatV3>]) -> Vec<Option<Error>> {
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
|
||||
for (i, ep) in disks.iter().enumerate() {
|
||||
futures.push(save_format_file(ep, &formats[i]));
|
||||
for (i, disk) in disks.iter().enumerate() {
|
||||
futures.push(save_format_file(disk, &formats[i]));
|
||||
}
|
||||
|
||||
let mut errors = Vec::with_capacity(disks.len());
|
||||
@@ -277,9 +287,7 @@ async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -
|
||||
disk.rename_file(RUSTFS_META_BUCKET, tmpfile.as_str(), RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
|
||||
.await?;
|
||||
|
||||
// let mut disk = disk;
|
||||
|
||||
// disk.set_disk_id(format.erasure.this);
|
||||
disk.set_disk_id(Some(format.erasure.this)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use ecstore::{
|
||||
disk::{DeleteOptions, DiskStore, ReadMultipleReq, ReadOptions, WalkDirOptions},
|
||||
endpoints::EndpointServerPools,
|
||||
erasure::{ReadAt, Write},
|
||||
peer::{LocalPeerS3Client, PeerS3Client},
|
||||
store::{all_local_disk_path, find_local_disk},
|
||||
store_api::{BucketOptions, FileInfo, MakeBucketOptions},
|
||||
};
|
||||
use tokio::fs;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
@@ -29,28 +28,29 @@ struct NodeService {
|
||||
pub local_peer: LocalPeerS3Client,
|
||||
}
|
||||
|
||||
pub fn make_server(endpoint_pools: EndpointServerPools) -> NodeServer<impl Node> {
|
||||
// TODO: 参考rustfs创建 https://github.com/rustfs/s3-rustfs/issues/30#issuecomment-2339664516
|
||||
let local_disks = Vec::new();
|
||||
let local_peer = LocalPeerS3Client::new(local_disks, None, None);
|
||||
pub fn make_server() -> NodeServer<impl Node> {
|
||||
// let local_disks = all_local_disk().await;
|
||||
let local_peer = LocalPeerS3Client::new(None, None);
|
||||
NodeServer::new(NodeService { local_peer })
|
||||
}
|
||||
|
||||
impl NodeService {
|
||||
async fn find_disk(&self, disk_path: &String) -> Option<DiskStore> {
|
||||
let disk_path = match fs::canonicalize(disk_path).await {
|
||||
Ok(disk_path) => disk_path,
|
||||
Err(_) => return None,
|
||||
};
|
||||
self.local_peer.local_disks.iter().find(|&x| x.path() == disk_path).cloned()
|
||||
find_local_disk(disk_path).await
|
||||
// let disk_path = match fs::canonicalize(disk_path).await {
|
||||
// Ok(disk_path) => disk_path,
|
||||
// Err(_) => return None,
|
||||
// };
|
||||
// self.local_peer.local_disks.iter().find(|&x| x.path() == disk_path).cloned()
|
||||
}
|
||||
|
||||
fn all_disk(&self) -> Vec<String> {
|
||||
self.local_peer
|
||||
.local_disks
|
||||
.iter()
|
||||
.map(|disk| disk.path().to_string_lossy().to_string())
|
||||
.collect()
|
||||
async fn all_disk(&self) -> Vec<String> {
|
||||
all_local_disk_path().await
|
||||
// self.local_peer
|
||||
// .local_disks
|
||||
// .iter()
|
||||
// .map(|disk| disk.path().to_string_lossy().to_string())
|
||||
// .collect()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -501,7 +501,7 @@ impl Node for NodeService {
|
||||
} else {
|
||||
Ok(tonic::Response::new(MakeVolumesResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk())),
|
||||
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk().await)),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -522,7 +522,7 @@ impl Node for NodeService {
|
||||
} else {
|
||||
Ok(tonic::Response::new(MakeVolumeResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk())),
|
||||
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk().await)),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,11 @@ mod service;
|
||||
mod storage;
|
||||
|
||||
use clap::Parser;
|
||||
use ecstore::{endpoints::EndpointServerPools, error::Result, store::ECStore};
|
||||
use ecstore::{
|
||||
endpoints::EndpointServerPools,
|
||||
error::Result,
|
||||
store::{init_local_disks, update_erasure_type, ECStore},
|
||||
};
|
||||
use grpc::make_server;
|
||||
use hyper_util::{
|
||||
rt::{TokioExecutor, TokioIo},
|
||||
@@ -72,7 +76,13 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
// };
|
||||
|
||||
// 用于rpc
|
||||
let (endpoint_pools, _) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())?;
|
||||
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())?;
|
||||
|
||||
update_erasure_type(setup_type).await;
|
||||
|
||||
// 初始化本地磁盘
|
||||
init_local_disks(endpoint_pools.clone()).await?;
|
||||
|
||||
// Setup S3 service
|
||||
// 本项目使用s3s库来实现s3服务
|
||||
let service = {
|
||||
@@ -108,7 +118,8 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
|
||||
b.build()
|
||||
};
|
||||
let rpc_service = make_server(endpoint_pools.clone());
|
||||
|
||||
let rpc_service = make_server();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let hyper_service = service.into_shared();
|
||||
|
||||
@@ -61,7 +61,7 @@ impl S3 for FS {
|
||||
let input = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -94,7 +94,7 @@ impl S3 for FS {
|
||||
let input = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -125,7 +125,7 @@ impl S3 for FS {
|
||||
let objects: Vec<ObjectToDelete> = vec![dobj];
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -192,7 +192,7 @@ impl S3 for FS {
|
||||
.collect();
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -233,7 +233,7 @@ impl S3 for FS {
|
||||
let input = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -276,7 +276,7 @@ impl S3 for FS {
|
||||
let opts = &ObjectOptions::default();
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -306,7 +306,7 @@ impl S3 for FS {
|
||||
let input = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -330,7 +330,7 @@ impl S3 for FS {
|
||||
let HeadObjectInput { bucket, key, .. } = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -357,7 +357,7 @@ impl S3 for FS {
|
||||
// mc ls
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -412,7 +412,7 @@ impl S3 for FS {
|
||||
let delimiter = delimiter.unwrap_or_default();
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -499,7 +499,7 @@ impl S3 for FS {
|
||||
let reader = PutObjReader::new(body, content_length as usize);
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -527,7 +527,7 @@ impl S3 for FS {
|
||||
debug!("create_multipart_upload meta {:?}", &metadata);
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -570,7 +570,7 @@ impl S3 for FS {
|
||||
let opts = ObjectOptions::default();
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -632,7 +632,7 @@ impl S3 for FS {
|
||||
}
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
@@ -662,7 +662,7 @@ impl S3 for FS {
|
||||
} = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.lock().await;
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
|
||||
Reference in New Issue
Block a user