add: monitor_and_connect_endpoints

This commit is contained in:
weisd
2024-09-19 18:06:44 +08:00
parent c71e86b8d1
commit bc9868f926
5 changed files with 163 additions and 82 deletions

View File

@@ -1,7 +1,7 @@
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp,
ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions,
DeleteOptions, DiskAPI, DiskLocation, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq,
ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions,
};
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::{
@@ -452,6 +452,9 @@ impl DiskAPI for LocalDisk {
fn is_local(&self) -> bool {
true
}
async fn is_online(&self) -> bool {
true
}
async fn close(&self) -> Result<()> {
Ok(())
}
@@ -459,6 +462,14 @@ impl DiskAPI for LocalDisk {
self.root.clone()
}
fn get_location(&self) -> DiskLocation {
DiskLocation {
pool_idx: self.endpoint.pool_idx,
set_idx: self.endpoint.set_idx,
disk_idx: self.endpoint.pool_idx,
}
}
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
warn!("local get_disk_id");
// TODO: check format file

View File

@@ -45,10 +45,12 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskS
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
async fn is_online(&self) -> bool;
fn path(&self) -> PathBuf;
async fn close(&self) -> Result<()>;
async fn get_disk_id(&self) -> Result<Option<Uuid>>;
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()>;
fn get_location(&self) -> DiskLocation;
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
@@ -95,6 +97,18 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
pub struct DiskLocation {
pub pool_idx: Option<usize>,
pub set_idx: Option<usize>,
pub disk_idx: Option<usize>,
}
impl DiskLocation {
pub fn valid(&self) -> bool {
self.pool_idx.is_some() && self.set_idx.is_some() && self.disk_idx.is_some()
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct FileInfoVersions {
// Name of the volume.

View File

@@ -28,9 +28,9 @@ use crate::{
};
use super::{
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskOption, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp, VolumeInfo,
WalkDirOptions,
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskLocation, DiskOption, FileInfoVersions, FileReader, FileWriter,
MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp,
VolumeInfo, WalkDirOptions,
};
#[derive(Debug)]
@@ -39,6 +39,7 @@ pub struct RemoteDisk {
channel: Arc<RwLock<Option<Channel>>>,
url: url::Url,
pub root: PathBuf,
endpoint: Endpoint,
}
impl RemoteDisk {
@@ -50,6 +51,7 @@ impl RemoteDisk {
url: ep.url.clone(),
root,
id: Mutex::new(None),
endpoint: ep.clone(),
})
}
@@ -98,6 +100,13 @@ impl DiskAPI for RemoteDisk {
fn is_local(&self) -> bool {
false
}
async fn is_online(&self) -> bool {
// TODO: 连接状态
if let Ok(_) = self.get_client_v2().await {
return true;
}
false
}
async fn close(&self) -> Result<()> {
Ok(())
}
@@ -105,6 +114,14 @@ impl DiskAPI for RemoteDisk {
self.root.clone()
}
fn get_location(&self) -> DiskLocation {
DiskLocation {
pool_idx: self.endpoint.pool_idx,
set_idx: self.endpoint.set_idx,
disk_idx: self.endpoint.pool_idx,
}
}
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
Ok(self.id.lock().await.clone())
}
@@ -253,11 +270,11 @@ impl DiskAPI for RemoteDisk {
});
let response = client.walk_dir(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let entries = response
.meta_cache_entry
.into_iter()
@@ -288,7 +305,7 @@ impl DiskAPI for RemoteDisk {
});
let response = client.rename_data(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
@@ -363,7 +380,7 @@ impl DiskAPI for RemoteDisk {
});
let response = client.stat_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}

View File

@@ -1,9 +1,4 @@
use std::collections::HashMap;
use futures::future::join_all;
use http::HeaderMap;
use tracing::warn;
use uuid::Uuid;
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::{
disk::{
@@ -21,13 +16,19 @@ use crate::{
},
utils::hash,
};
use futures::future::join_all;
use http::HeaderMap;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use uuid::Uuid;
#[derive(Debug)]
pub struct Sets {
pub id: Uuid,
// pub sets: Vec<Objects>,
// pub disk_set: Vec<Vec<Option<DiskStore>>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub disk_set: Vec<SetDisks>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub disk_set: Vec<Arc<SetDisks>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub pool_idx: usize,
pub endpoints: PoolEndpoints,
pub format: FormatV3,
@@ -35,6 +36,7 @@ pub struct Sets {
pub set_count: usize,
pub set_drive_count: usize,
pub distribution_algo: DistributionAlgoVersion,
ctx: CancellationToken,
}
impl Sets {
@@ -44,7 +46,7 @@ impl Sets {
fm: &FormatV3,
pool_idx: usize,
partiy_count: usize,
) -> Result<Self> {
) -> Result<Arc<Self>> {
let set_count = fm.erasure.sets.len();
let set_drive_count = fm.erasure.sets[0].len();
@@ -52,9 +54,14 @@ impl Sets {
for i in 0..set_count {
let mut set_drive = Vec::with_capacity(set_drive_count);
let mut set_endpoints = Vec::with_capacity(set_drive_count);
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
let mut disk = disks[idx].clone();
let endpoint = endpoints.endpoints.as_ref().get(idx).cloned();
set_endpoints.push(endpoint);
if disk.is_none() {
warn!("sets new set_drive {}-{} is none", i, j);
set_drive.push(None);
@@ -89,17 +96,18 @@ impl Sets {
warn!("sets new set_drive {:?}", &set_drive);
let set_disks = SetDisks {
disks: set_drive,
disks: RwLock::new(set_drive),
set_drive_count,
parity_count: partiy_count,
set_index: i,
pool_index: pool_idx,
set_endpoints,
};
disk_set.push(set_disks);
disk_set.push(Arc::new(set_disks));
}
let sets = Self {
let sets = Arc::new(Self {
id: fm.id,
// sets: todo!(),
disk_set,
@@ -110,15 +118,54 @@ impl Sets {
set_count,
set_drive_count,
distribution_algo: fm.erasure.distribution_algo.clone(),
};
ctx: CancellationToken::new(),
});
let asets = sets.clone();
tokio::spawn(async move { asets.monitor_and_connect_endpoints().await });
Ok(sets)
}
pub fn get_disks(&self, set_idx: usize) -> SetDisks {
pub async fn monitor_and_connect_endpoints(&self) {
tokio::time::sleep(Duration::from_secs(5)).await;
self.connect_disks().await;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(15 * 3));
let cloned_token = self.ctx.clone();
loop {
tokio::select! {
_= interval.tick()=>{
debug!("tick...");
self.connect_disks().await;
interval.reset();
},
_ = cloned_token.cancelled() => {
warn!("ctx cancelled");
break;
}
}
}
warn!("monitor_and_connect_endpoints exit");
}
async fn connect_disks(&self) {
debug!("start connect_disks ...");
for set in self.disk_set.iter() {
set.connect_disks().await;
}
}
pub fn get_disks(&self, set_idx: usize) -> Arc<SetDisks> {
self.disk_set[set_idx].clone()
}
pub fn get_disks_by_key(&self, key: &str) -> SetDisks {
pub fn get_disks_by_key(&self, key: &str) -> Arc<SetDisks> {
self.get_disks(self.get_hashed_set_index(key))
}

View File

@@ -1,8 +1,6 @@
use crate::{
bucket_meta::BucketMetadata,
disk::{
error::DiskError, new_disk, DeleteOptions, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET,
},
disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
endpoints::{EndpointServerPools, SetupType},
error::{Error, Result},
peer::S3PeerSys,
@@ -25,7 +23,7 @@ use std::{
};
use time::OffsetDateTime;
use tokio::{fs, sync::RwLock};
use tracing::{debug, info, warn};
use tracing::{debug, info};
use uuid::Uuid;
use lazy_static::lazy_static;
@@ -150,7 +148,7 @@ pub struct ECStore {
pub id: uuid::Uuid,
// pub disks: Vec<DiskStore>,
pub disk_map: HashMap<usize, Vec<Option<DiskStore>>>,
pub pools: Vec<Sets>,
pub pools: Vec<Arc<Sets>>,
pub peer_sys: S3PeerSys,
// pub local_disks: Vec<DiskStore>,
}
@@ -223,7 +221,6 @@ impl ECStore {
}
let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count).await?;
pools.push(sets);
disk_map.insert(i, disks);
@@ -285,62 +282,54 @@ impl ECStore {
for sets in self.pools.iter() {
for set in sets.disk_set.iter() {
for disk in set.disks.iter() {
if disk.is_none() {
continue;
}
let disk = disk.as_ref().unwrap();
let opts = opts.clone();
// let mut wr = &mut wr;
futures.push(disk.walk_dir(opts));
// tokio::spawn(async move { disk.walk_dir(opts, wr).await });
}
futures.push(set.walk_dir(&opts));
}
}
let results = join_all(futures).await;
let mut errs = Vec::new();
// let mut errs = Vec::new();
let mut ress = Vec::new();
let mut uniq = HashSet::new();
for res in results {
match res {
Ok(entrys) => {
for entry in entrys {
if !uniq.contains(&entry.name) {
uniq.insert(entry.name.clone());
// TODO: 过滤
if opts.limit > 0 && ress.len() as i32 >= opts.limit {
return Ok(ress);
}
for (disks_ress, _disks_errs) in results {
for (_i, disks_res) in disks_ress.iter().enumerate() {
if disks_res.is_none() {
// TODO handle errs
continue;
}
let entrys = disks_res.as_ref().unwrap();
if entry.is_object() {
let fi = entry.to_fileinfo(&opts.bucket)?;
if fi.is_some() {
ress.push(fi.unwrap().into_object_info(&opts.bucket, &entry.name, false));
}
continue;
}
for entry in entrys {
if !uniq.contains(&entry.name) {
uniq.insert(entry.name.clone());
// TODO: 过滤
if opts.limit > 0 && ress.len() as i32 >= opts.limit {
return Ok(ress);
}
if entry.is_dir() {
ress.push(ObjectInfo {
is_dir: true,
bucket: opts.bucket.clone(),
name: entry.name,
..Default::default()
});
if entry.is_object() {
let fi = entry.to_fileinfo(&opts.bucket)?;
if fi.is_some() {
ress.push(fi.unwrap().into_object_info(&opts.bucket, &entry.name, false));
}
continue;
}
if entry.is_dir() {
ress.push(ObjectInfo {
is_dir: true,
bucket: opts.bucket.clone(),
name: entry.name.clone(),
..Default::default()
});
}
}
errs.push(None);
}
Err(e) => errs.push(Some(e)),
}
}
warn!("list_merged errs {:?}", errs);
// warn!("list_merged errs {:?}", errs);
Ok(ress)
}
@@ -349,21 +338,24 @@ impl ECStore {
let mut futures = Vec::new();
for sets in self.pools.iter() {
for set in sets.disk_set.iter() {
for disk in set.disks.iter() {
if disk.is_none() {
continue;
}
futures.push(set.delete_all(bucket, prefix));
// let disks = set.disks.read().await;
// let dd = disks.clone();
// for disk in dd {
// if disk.is_none() {
// continue;
// }
let disk = disk.as_ref().unwrap();
futures.push(disk.delete(
bucket,
prefix,
DeleteOptions {
recursive: true,
immediate: false,
},
));
}
// // let disk = disk.as_ref().unwrap().clone();
// // futures.push(disk.delete(
// // bucket,
// // prefix,
// // DeleteOptions {
// // recursive: true,
// // immediate: false,
// // },
// // ));
// }
}
}
let results = join_all(futures).await;