From bc9868f926782746be8cfc47dec2a78d5c564e35 Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 19 Sep 2024 18:06:44 +0800 Subject: [PATCH] add: monitor_and_connect_endpoints --- ecstore/src/disk/local.rs | 15 ++++- ecstore/src/disk/mod.rs | 14 +++++ ecstore/src/disk/remote.rs | 31 ++++++++--- ecstore/src/sets.rs | 75 ++++++++++++++++++++----- ecstore/src/store.rs | 110 +++++++++++++++++-------------------- 5 files changed, 163 insertions(+), 82 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index e0a1679a..816ed6aa 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1,7 +1,7 @@ use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; use super::{ - DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, - ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions, + DeleteOptions, DiskAPI, DiskLocation, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, + ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions, }; use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE}; use crate::{ @@ -452,6 +452,9 @@ impl DiskAPI for LocalDisk { fn is_local(&self) -> bool { true } + async fn is_online(&self) -> bool { + true + } async fn close(&self) -> Result<()> { Ok(()) } @@ -459,6 +462,14 @@ impl DiskAPI for LocalDisk { self.root.clone() } + fn get_location(&self) -> DiskLocation { + DiskLocation { + pool_idx: self.endpoint.pool_idx, + set_idx: self.endpoint.set_idx, + disk_idx: self.endpoint.pool_idx, + } + } + async fn get_disk_id(&self) -> Result> { warn!("local get_disk_id"); // TODO: check format file diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 487edd40..8c089c8e 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -45,10 +45,12 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result bool; + async fn is_online(&self) -> bool; fn path(&self) -> PathBuf; async fn close(&self) -> Result<()>; async fn get_disk_id(&self) -> Result>; async fn set_disk_id(&self, id: Option) -> Result<()>; + fn get_location(&self) -> DiskLocation; async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>; async fn read_all(&self, volume: &str, path: &str) -> Result; @@ -95,6 +97,18 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; } +pub struct DiskLocation { + pub pool_idx: Option, + pub set_idx: Option, + pub disk_idx: Option, +} + +impl DiskLocation { + pub fn valid(&self) -> bool { + self.pool_idx.is_some() && self.set_idx.is_some() && self.disk_idx.is_some() + } +} + #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct FileInfoVersions { // Name of the volume. diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index d9e47a90..f8f4eff7 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -28,9 +28,9 @@ use crate::{ }; use super::{ - endpoint::Endpoint, DeleteOptions, DiskAPI, DiskOption, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, - ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp, VolumeInfo, - WalkDirOptions, + endpoint::Endpoint, DeleteOptions, DiskAPI, DiskLocation, DiskOption, FileInfoVersions, FileReader, FileWriter, + MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp, + VolumeInfo, WalkDirOptions, }; #[derive(Debug)] @@ -39,6 +39,7 @@ pub struct RemoteDisk { channel: Arc>>, url: url::Url, pub root: PathBuf, + endpoint: Endpoint, } impl RemoteDisk { @@ -50,6 +51,7 @@ impl RemoteDisk { url: ep.url.clone(), root, id: Mutex::new(None), + endpoint: ep.clone(), }) } @@ -98,6 +100,13 @@ impl DiskAPI for RemoteDisk { fn is_local(&self) -> bool { false } + async fn is_online(&self) -> bool { + // TODO: 连接状态 + if let Ok(_) = self.get_client_v2().await { + return true; + } + false + } async fn close(&self) -> Result<()> { Ok(()) } @@ -105,6 +114,14 @@ impl DiskAPI for RemoteDisk { self.root.clone() } + fn get_location(&self) -> DiskLocation { + DiskLocation { + pool_idx: self.endpoint.pool_idx, + set_idx: self.endpoint.set_idx, + disk_idx: self.endpoint.pool_idx, + } + } + async fn get_disk_id(&self) -> Result> { Ok(self.id.lock().await.clone()) } @@ -253,11 +270,11 @@ impl DiskAPI for RemoteDisk { }); let response = client.walk_dir(request).await?.into_inner(); - + if !response.success { return Err(Error::from_string(response.error_info.unwrap_or("".to_string()))); } - + let entries = response .meta_cache_entry .into_iter() @@ -288,7 +305,7 @@ impl DiskAPI for RemoteDisk { }); let response = client.rename_data(request).await?.into_inner(); - + if !response.success { return Err(Error::from_string(response.error_info.unwrap_or("".to_string()))); } @@ -363,7 +380,7 @@ impl DiskAPI for RemoteDisk { }); let response = client.stat_volume(request).await?.into_inner(); - + if !response.success { return Err(Error::from_string(response.error_info.unwrap_or("".to_string()))); } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index c9e049d5..dc6dd3dd 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,9 +1,4 @@ -use std::collections::HashMap; - -use futures::future::join_all; -use http::HeaderMap; -use tracing::warn; -use uuid::Uuid; +use std::{collections::HashMap, sync::Arc, time::Duration}; use crate::{ disk::{ @@ -21,13 +16,19 @@ use crate::{ }, utils::hash, }; +use futures::future::join_all; +use http::HeaderMap; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; +use uuid::Uuid; #[derive(Debug)] pub struct Sets { pub id: Uuid, // pub sets: Vec, // pub disk_set: Vec>>, // [set_count_idx][set_drive_count_idx] = disk_idx - pub disk_set: Vec, // [set_count_idx][set_drive_count_idx] = disk_idx + pub disk_set: Vec>, // [set_count_idx][set_drive_count_idx] = disk_idx pub pool_idx: usize, pub endpoints: PoolEndpoints, pub format: FormatV3, @@ -35,6 +36,7 @@ pub struct Sets { pub set_count: usize, pub set_drive_count: usize, pub distribution_algo: DistributionAlgoVersion, + ctx: CancellationToken, } impl Sets { @@ -44,7 +46,7 @@ impl Sets { fm: &FormatV3, pool_idx: usize, partiy_count: usize, - ) -> Result { + ) -> Result> { let set_count = fm.erasure.sets.len(); let set_drive_count = fm.erasure.sets[0].len(); @@ -52,9 +54,14 @@ impl Sets { for i in 0..set_count { let mut set_drive = Vec::with_capacity(set_drive_count); + let mut set_endpoints = Vec::with_capacity(set_drive_count); for j in 0..set_drive_count { let idx = i * set_drive_count + j; let mut disk = disks[idx].clone(); + + let endpoint = endpoints.endpoints.as_ref().get(idx).cloned(); + set_endpoints.push(endpoint); + if disk.is_none() { warn!("sets new set_drive {}-{} is none", i, j); set_drive.push(None); @@ -89,17 +96,18 @@ impl Sets { warn!("sets new set_drive {:?}", &set_drive); let set_disks = SetDisks { - disks: set_drive, + disks: RwLock::new(set_drive), set_drive_count, parity_count: partiy_count, set_index: i, pool_index: pool_idx, + set_endpoints, }; - disk_set.push(set_disks); + disk_set.push(Arc::new(set_disks)); } - let sets = Self { + let sets = Arc::new(Self { id: fm.id, // sets: todo!(), disk_set, @@ -110,15 +118,54 @@ impl Sets { set_count, set_drive_count, distribution_algo: fm.erasure.distribution_algo.clone(), - }; + ctx: CancellationToken::new(), + }); + + let asets = sets.clone(); + + tokio::spawn(async move { asets.monitor_and_connect_endpoints().await }); Ok(sets) } - pub fn get_disks(&self, set_idx: usize) -> SetDisks { + + pub async fn monitor_and_connect_endpoints(&self) { + tokio::time::sleep(Duration::from_secs(5)).await; + + self.connect_disks().await; + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(15 * 3)); + let cloned_token = self.ctx.clone(); + loop { + tokio::select! { + _= interval.tick()=>{ + debug!("tick..."); + self.connect_disks().await; + + interval.reset(); + }, + + _ = cloned_token.cancelled() => { + warn!("ctx cancelled"); + break; + } + } + } + + warn!("monitor_and_connect_endpoints exit"); + } + + async fn connect_disks(&self) { + debug!("start connect_disks ..."); + for set in self.disk_set.iter() { + set.connect_disks().await; + } + } + + pub fn get_disks(&self, set_idx: usize) -> Arc { self.disk_set[set_idx].clone() } - pub fn get_disks_by_key(&self, key: &str) -> SetDisks { + pub fn get_disks_by_key(&self, key: &str) -> Arc { self.get_disks(self.get_hashed_set_index(key)) } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 44473c3b..d3a51059 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,8 +1,6 @@ use crate::{ bucket_meta::BucketMetadata, - disk::{ - error::DiskError, new_disk, DeleteOptions, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET, - }, + disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, endpoints::{EndpointServerPools, SetupType}, error::{Error, Result}, peer::S3PeerSys, @@ -25,7 +23,7 @@ use std::{ }; use time::OffsetDateTime; use tokio::{fs, sync::RwLock}; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; use uuid::Uuid; use lazy_static::lazy_static; @@ -150,7 +148,7 @@ pub struct ECStore { pub id: uuid::Uuid, // pub disks: Vec, pub disk_map: HashMap>>, - pub pools: Vec, + pub pools: Vec>, pub peer_sys: S3PeerSys, // pub local_disks: Vec, } @@ -223,7 +221,6 @@ impl ECStore { } let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count).await?; - pools.push(sets); disk_map.insert(i, disks); @@ -285,62 +282,54 @@ impl ECStore { for sets in self.pools.iter() { for set in sets.disk_set.iter() { - for disk in set.disks.iter() { - if disk.is_none() { - continue; - } - - let disk = disk.as_ref().unwrap(); - let opts = opts.clone(); - // let mut wr = &mut wr; - futures.push(disk.walk_dir(opts)); - // tokio::spawn(async move { disk.walk_dir(opts, wr).await }); - } + futures.push(set.walk_dir(&opts)); } } let results = join_all(futures).await; - let mut errs = Vec::new(); + // let mut errs = Vec::new(); let mut ress = Vec::new(); let mut uniq = HashSet::new(); - for res in results { - match res { - Ok(entrys) => { - for entry in entrys { - if !uniq.contains(&entry.name) { - uniq.insert(entry.name.clone()); - // TODO: 过滤 - if opts.limit > 0 && ress.len() as i32 >= opts.limit { - return Ok(ress); - } + for (disks_ress, _disks_errs) in results { + for (_i, disks_res) in disks_ress.iter().enumerate() { + if disks_res.is_none() { + // TODO handle errs + continue; + } + let entrys = disks_res.as_ref().unwrap(); - if entry.is_object() { - let fi = entry.to_fileinfo(&opts.bucket)?; - if fi.is_some() { - ress.push(fi.unwrap().into_object_info(&opts.bucket, &entry.name, false)); - } - continue; - } + for entry in entrys { + if !uniq.contains(&entry.name) { + uniq.insert(entry.name.clone()); + // TODO: 过滤 + if opts.limit > 0 && ress.len() as i32 >= opts.limit { + return Ok(ress); + } - if entry.is_dir() { - ress.push(ObjectInfo { - is_dir: true, - bucket: opts.bucket.clone(), - name: entry.name, - ..Default::default() - }); + if entry.is_object() { + let fi = entry.to_fileinfo(&opts.bucket)?; + if fi.is_some() { + ress.push(fi.unwrap().into_object_info(&opts.bucket, &entry.name, false)); } + continue; + } + + if entry.is_dir() { + ress.push(ObjectInfo { + is_dir: true, + bucket: opts.bucket.clone(), + name: entry.name.clone(), + ..Default::default() + }); } } - errs.push(None); } - Err(e) => errs.push(Some(e)), } } - warn!("list_merged errs {:?}", errs); + // warn!("list_merged errs {:?}", errs); Ok(ress) } @@ -349,21 +338,24 @@ impl ECStore { let mut futures = Vec::new(); for sets in self.pools.iter() { for set in sets.disk_set.iter() { - for disk in set.disks.iter() { - if disk.is_none() { - continue; - } + futures.push(set.delete_all(bucket, prefix)); + // let disks = set.disks.read().await; + // let dd = disks.clone(); + // for disk in dd { + // if disk.is_none() { + // continue; + // } - let disk = disk.as_ref().unwrap(); - futures.push(disk.delete( - bucket, - prefix, - DeleteOptions { - recursive: true, - immediate: false, - }, - )); - } + // // let disk = disk.as_ref().unwrap().clone(); + // // futures.push(disk.delete( + // // bucket, + // // prefix, + // // DeleteOptions { + // // recursive: true, + // // immediate: false, + // // }, + // // )); + // } } } let results = join_all(futures).await;