From 52342f2f8eb895206fbb008acc8704580cd9e243 Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 13 Jun 2025 18:07:40 +0800 Subject: [PATCH] feat(grpc): walk_dir http fix(ecstore): rebalance loop --- Makefile | 10 + crates/filemeta/src/error.rs | 15 +- crates/rio/src/http_reader.rs | 22 +- ecstore/src/cache_value/metacache_set.rs | 60 +++- ecstore/src/config/com.rs | 15 +- ecstore/src/disk/error.rs | 2 +- ecstore/src/disk/local.rs | 15 +- ecstore/src/disk/remote.rs | 122 +++++--- ecstore/src/erasure_coding/decode.rs | 3 + ecstore/src/erasure_coding/encode.rs | 8 + ecstore/src/notification_sys.rs | 13 +- ecstore/src/peer_rest_client.rs | 2 +- ecstore/src/rebalance.rs | 340 ++++++++++++++--------- ecstore/src/set_disk.rs | 96 +++++-- ecstore/src/store.rs | 19 +- ecstore/src/store_list_objects.rs | 1 + iam/src/manager.rs | 59 ++-- rustfs/src/admin/handlers/rebalance.rs | 67 +++-- rustfs/src/admin/rpc.rs | 78 ++++++ rustfs/src/grpc.rs | 27 ++ scripts/dev_clear.sh | 11 + scripts/{dev.sh => dev_deploy.sh} | 21 +- scripts/dev_rustfs.env | 11 + scripts/dev_rustfs.sh | 199 +++++++++++++ 24 files changed, 940 insertions(+), 276 deletions(-) create mode 100644 scripts/dev_clear.sh rename scripts/{dev.sh => dev_deploy.sh} (66%) create mode 100644 scripts/dev_rustfs.env create mode 100644 scripts/dev_rustfs.sh diff --git a/Makefile b/Makefile index b401a2ed..f7e69fe7 100644 --- a/Makefile +++ b/Makefile @@ -79,3 +79,13 @@ build: BUILD_CMD = /root/.cargo/bin/cargo build --release --bin rustfs --target- build: $(DOCKER_CLI) build -t $(ROCKYLINUX_BUILD_IMAGE_NAME) -f $(DOCKERFILE_PATH)/Dockerfile.$(BUILD_OS) . $(DOCKER_CLI) run --rm --name $(ROCKYLINUX_BUILD_CONTAINER_NAME) -v $(shell pwd):/root/s3-rustfs -it $(ROCKYLINUX_BUILD_IMAGE_NAME) $(BUILD_CMD) + +.PHONY: build-musl +build-musl: + @echo "🔨 Building rustfs for x86_64-unknown-linux-musl..." + cargo build --target x86_64-unknown-linux-musl --bin rustfs -r + +.PHONY: deploy-dev +deploy-dev: build-musl + @echo "🚀 Deploying to dev server: $${IP}" + ./scripts/dev_deploy.sh $${IP} diff --git a/crates/filemeta/src/error.rs b/crates/filemeta/src/error.rs index 142436e1..8cdfb40b 100644 --- a/crates/filemeta/src/error.rs +++ b/crates/filemeta/src/error.rs @@ -111,7 +111,20 @@ impl Clone for Error { impl From for Error { fn from(e: std::io::Error) -> Self { - Error::Io(e) + match e.kind() { + std::io::ErrorKind::UnexpectedEof => Error::Unexpected, + _ => Error::Io(e), + } + } +} + +impl From for std::io::Error { + fn from(e: Error) -> Self { + match e { + Error::Unexpected => std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Unexpected EOF"), + Error::Io(e) => e, + _ => std::io::Error::other(e.to_string()), + } } } diff --git a/crates/rio/src/http_reader.rs b/crates/rio/src/http_reader.rs index 240ef70c..e0cfc89c 100644 --- a/crates/rio/src/http_reader.rs +++ b/crates/rio/src/http_reader.rs @@ -3,6 +3,7 @@ use futures::{Stream, StreamExt}; use http::HeaderMap; use pin_project_lite::pin_project; use reqwest::{Client, Method, RequestBuilder}; +use std::error::Error as _; use std::io::{self, Error}; use std::pin::Pin; use std::sync::LazyLock; @@ -43,12 +44,18 @@ pin_project! { } impl HttpReader { - pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result { + pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option>) -> io::Result { http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}"); - Self::with_capacity(url, method, headers, 0).await + Self::with_capacity(url, method, headers, body, 0).await } /// Create a new HttpReader from a URL. The request is performed immediately. - pub async fn with_capacity(url: String, method: Method, headers: HeaderMap, mut read_buf_size: usize) -> io::Result { + pub async fn with_capacity( + url: String, + method: Method, + headers: HeaderMap, + body: Option>, + mut read_buf_size: usize, + ) -> io::Result { http_log!( "[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}", read_buf_size @@ -60,12 +67,12 @@ impl HttpReader { Ok(resp) => { http_log!("[HttpReader::new] HEAD status: {}", resp.status()); if !resp.status().is_success() { - return Err(Error::other(format!("HEAD failed: status {}", resp.status()))); + return Err(Error::other(format!("HEAD failed: url: {}, status {}", url, resp.status()))); } } Err(e) => { http_log!("[HttpReader::new] HEAD error: {e}"); - return Err(Error::other(format!("HEAD request failed: {e}"))); + return Err(Error::other(e.source().map(|s| s.to_string()).unwrap_or_else(|| e.to_string()))); } } @@ -80,7 +87,10 @@ impl HttpReader { let (err_tx, err_rx) = oneshot::channel::(); tokio::spawn(async move { let client = get_http_client(); - let request: RequestBuilder = client.request(method_clone, url_clone).headers(headers_clone); + let mut request: RequestBuilder = client.request(method_clone, url_clone).headers(headers_clone); + if let Some(body) = body { + request = request.body(body); + } let response = request.send().await; match response { diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index a99c16cd..7f6e895c 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -1,7 +1,7 @@ use crate::disk::error::DiskError; use crate::disk::{self, DiskAPI, DiskStore, WalkDirOptions}; use futures::future::join_all; -use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader}; +use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof}; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{spawn, sync::broadcast::Receiver as B_Receiver}; use tracing::error; @@ -50,7 +50,6 @@ impl Clone for ListPathRawOptions { } pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) -> disk::error::Result<()> { - // println!("list_path_raw {},{}", &opts.bucket, &opts.path); if opts.disks.is_empty() { return Err(DiskError::other("list_path_raw: 0 drives provided")); } @@ -59,12 +58,13 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - let mut readers = Vec::with_capacity(opts.disks.len()); let fds = Arc::new(opts.fallback_disks.clone()); + let (cancel_tx, cancel_rx) = tokio::sync::broadcast::channel::(1); + for disk in opts.disks.iter() { let opdisk = disk.clone(); let opts_clone = opts.clone(); let fds_clone = fds.clone(); - // let (m_tx, m_rx) = mpsc::channel::(100); - // readers.push(m_rx); + let mut cancel_rx_clone = cancel_rx.resubscribe(); let (rd, mut wr) = tokio::io::duplex(64); readers.push(MetacacheReader::new(rd)); jobs.push(spawn(async move { @@ -92,7 +92,13 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - need_fallback = true; } + if cancel_rx_clone.try_recv().is_ok() { + // warn!("list_path_raw: cancel_rx_clone.try_recv().await.is_ok()"); + return Ok(()); + } + while need_fallback { + // warn!("list_path_raw: while need_fallback start"); let disk = match fds_clone.iter().find(|d| d.is_some()) { Some(d) => { if let Some(disk) = d.clone() { @@ -130,6 +136,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } } + // warn!("list_path_raw: while need_fallback done"); Ok(()) })); } @@ -143,9 +150,15 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - loop { let mut current = MetaCacheEntry::default(); + // warn!( + // "list_path_raw: loop start, bucket: {}, path: {}, current: {:?}", + // opts.bucket, opts.path, ¤t.name + // ); + if rx.try_recv().is_ok() { return Err(DiskError::other("canceled")); } + let mut top_entries: Vec> = vec![None; readers.len()]; let mut at_eof = 0; @@ -168,31 +181,47 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } else { // eof at_eof += 1; - + // warn!("list_path_raw: peek eof, disk: {}", i); continue; } } Err(err) => { if err == rustfs_filemeta::Error::Unexpected { at_eof += 1; + // warn!("list_path_raw: peek err eof, disk: {}", i); continue; - } else if err == rustfs_filemeta::Error::FileNotFound { + } + + // warn!("list_path_raw: peek err00, err: {:?}", err); + + if is_io_eof(&err) { + at_eof += 1; + // warn!("list_path_raw: peek eof, disk: {}", i); + continue; + } + + if err == rustfs_filemeta::Error::FileNotFound { at_eof += 1; fnf += 1; + // warn!("list_path_raw: peek fnf, disk: {}", i); continue; } else if err == rustfs_filemeta::Error::VolumeNotFound { at_eof += 1; fnf += 1; vnf += 1; + // warn!("list_path_raw: peek vnf, disk: {}", i); continue; } else { has_err += 1; errs[i] = Some(err.into()); + // warn!("list_path_raw: peek err, disk: {}", i); continue; } } }; + // warn!("list_path_raw: loop entry: {:?}, disk: {}", &entry.name, i); + // If no current, add it. if current.name.is_empty() { top_entries[i] = Some(entry.clone()); @@ -228,10 +257,12 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } if vnf > 0 && vnf >= (readers.len() - opts.min_disks) { + // warn!("list_path_raw: vnf > 0 && vnf >= (readers.len() - opts.min_disks) break"); return Err(DiskError::VolumeNotFound); } if fnf > 0 && fnf >= (readers.len() - opts.min_disks) { + // warn!("list_path_raw: fnf > 0 && fnf >= (readers.len() - opts.min_disks) break"); return Err(DiskError::FileNotFound); } @@ -250,6 +281,10 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - _ => {} }); + error!( + "list_path_raw: has_err > 0 && has_err > opts.disks.len() - opts.min_disks break, err: {:?}", + &combined_err.join(", ") + ); return Err(DiskError::other(combined_err.join(", "))); } @@ -263,6 +298,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } } + // error!("list_path_raw: at_eof + has_err == readers.len() break {:?}", &errs); break; } @@ -272,12 +308,16 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } if let Some(agreed_fn) = opts.agreed.as_ref() { + // warn!("list_path_raw: agreed_fn start, current: {:?}", ¤t.name); agreed_fn(current).await; + // warn!("list_path_raw: agreed_fn done"); } continue; } + // warn!("list_path_raw: skip start, current: {:?}", ¤t.name); + for (i, r) in readers.iter_mut().enumerate() { if top_entries[i].is_some() { let _ = r.skip(1).await; @@ -291,7 +331,12 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - Ok(()) }); - jobs.push(revjob); + if let Err(err) = revjob.await.map_err(std::io::Error::other)? { + error!("list_path_raw: revjob err {:?}", err); + let _ = cancel_tx.send(true); + + return Err(err); + } let results = join_all(jobs).await; for result in results { @@ -300,5 +345,6 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } } + // warn!("list_path_raw: done"); Ok(()) } diff --git a/ecstore/src/config/com.rs b/ecstore/src/config/com.rs index ac9daf56..ae5cf962 100644 --- a/ecstore/src/config/com.rs +++ b/ecstore/src/config/com.rs @@ -41,6 +41,7 @@ pub async fn read_config_with_metadata( if err == Error::FileNotFound || matches!(err, Error::ObjectNotFound(_, _)) { Error::ConfigNotFound } else { + warn!("read_config_with_metadata: err: {:?}, file: {}", err, file); err } })?; @@ -92,9 +93,19 @@ pub async fn delete_config(api: Arc, file: &str) -> Result<()> } pub async fn save_config_with_opts(api: Arc, file: &str, data: Vec, opts: &ObjectOptions) -> Result<()> { - let _ = api + warn!( + "save_config_with_opts, bucket: {}, file: {}, data len: {}", + RUSTFS_META_BUCKET, + file, + data.len() + ); + if let Err(err) = api .put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::from_vec(data), opts) - .await?; + .await + { + warn!("save_config_with_opts: err: {:?}, file: {}", err, file); + return Err(err); + } Ok(()) } diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index 427a8608..c3dab9a1 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -124,7 +124,7 @@ pub enum DiskError { #[error("erasure read quorum")] ErasureReadQuorum, - #[error("io error")] + #[error("io error {0}")] Io(io::Error), } diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index e13dbd54..a66d6407 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -773,7 +773,7 @@ impl LocalDisk { Ok(res) => res, Err(e) => { if e != DiskError::VolumeNotFound && e != Error::FileNotFound { - info!("scan list_dir {}, err {:?}", ¤t, &e); + warn!("scan list_dir {}, err {:?}", ¤t, &e); } if opts.report_notfound && e == Error::FileNotFound && current == &opts.base_dir { @@ -785,6 +785,7 @@ impl LocalDisk { }; if entries.is_empty() { + warn!("scan list_dir {}, entries is empty", ¤t); return Ok(()); } @@ -800,6 +801,7 @@ impl LocalDisk { let entry = item.clone(); // check limit if opts.limit > 0 && *objs_returned >= opts.limit { + warn!("scan list_dir {}, limit reached", ¤t); return Ok(()); } // check prefix @@ -843,13 +845,14 @@ impl LocalDisk { let name = decode_dir_object(format!("{}/{}", ¤t, &name).as_str()); out.write_obj(&MetaCacheEntry { - name, + name: name.clone(), metadata, ..Default::default() }) .await?; *objs_returned += 1; + // warn!("scan list_dir {}, write_obj done, name: {:?}", ¤t, &name); return Ok(()); } } @@ -870,6 +873,7 @@ impl LocalDisk { for entry in entries.iter() { if opts.limit > 0 && *objs_returned >= opts.limit { + // warn!("scan list_dir {}, limit reached 2", ¤t); return Ok(()); } @@ -945,6 +949,7 @@ impl LocalDisk { while let Some(dir) = dir_stack.pop() { if opts.limit > 0 && *objs_returned >= opts.limit { + // warn!("scan list_dir {}, limit reached 3", ¤t); return Ok(()); } @@ -965,6 +970,7 @@ impl LocalDisk { } } + // warn!("scan list_dir {}, done", ¤t); Ok(()) } } @@ -1568,6 +1574,11 @@ impl DiskAPI for LocalDisk { let mut current = opts.base_dir.clone(); self.scan_dir(&mut current, &opts, &mut out, &mut objs_returned).await?; + warn!( + "walk_dir: done, volume_dir: {:?}, base_dir: {}", + volume_dir.to_string_lossy(), + opts.base_dir + ); Ok(()) } diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 595391e3..25fd11eb 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -2,20 +2,20 @@ use std::path::PathBuf; use bytes::Bytes; use futures::lock::Mutex; -use http::{HeaderMap, Method}; +use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE}; use protos::{ node_service_time_out_client, proto_gen::node_service::{ CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest, - StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest, + StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, }, }; -use rmp_serde::Serializer; -use rustfs_filemeta::{FileInfo, MetaCacheEntry, MetacacheWriter, RawFileInfo}; + +use rustfs_filemeta::{FileInfo, RawFileInfo}; use rustfs_rio::{HttpReader, HttpWriter}; -use serde::Serialize; + use tokio::{ io::AsyncWrite, sync::mpsc::{self, Sender}, @@ -256,47 +256,55 @@ impl DiskAPI for RemoteDisk { Ok(()) } - // FIXME: TODO: use writer - #[tracing::instrument(skip(self, wr))] - async fn walk_dir(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> { - let now = std::time::SystemTime::now(); - info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix); - let mut wr = wr; - let mut out = MetacacheWriter::new(&mut wr); - let mut buf = Vec::new(); - opts.serialize(&mut Serializer::new(&mut buf))?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {}", err)))?; - let request = Request::new(WalkDirRequest { - disk: self.endpoint.to_string(), - walk_dir_options: buf.into(), - }); - let mut response = client.walk_dir(request).await?.into_inner(); + // // FIXME: TODO: use writer + // #[tracing::instrument(skip(self, wr))] + // async fn walk_dir(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> { + // let now = std::time::SystemTime::now(); + // info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix); + // let mut wr = wr; + // let mut out = MetacacheWriter::new(&mut wr); + // let mut buf = Vec::new(); + // opts.serialize(&mut Serializer::new(&mut buf))?; + // let mut client = node_service_time_out_client(&self.addr) + // .await + // .map_err(|err| Error::other(format!("can not get client, err: {}", err)))?; + // let request = Request::new(WalkDirRequest { + // disk: self.endpoint.to_string(), + // walk_dir_options: buf.into(), + // }); + // let mut response = client.walk_dir(request).await?.into_inner(); - loop { - match response.next().await { - Some(Ok(resp)) => { - if !resp.success { - return Err(Error::other(resp.error_info.unwrap_or_default())); - } - let entry = serde_json::from_str::(&resp.meta_cache_entry) - .map_err(|_| Error::other(format!("Unexpected response: {:?}", response)))?; - out.write_obj(&entry).await?; - } - None => break, - _ => return Err(Error::other(format!("Unexpected response: {:?}", response))), - } - } + // loop { + // match response.next().await { + // Some(Ok(resp)) => { + // if !resp.success { + // if let Some(err) = resp.error_info { + // if err == "Unexpected EOF" { + // return Err(Error::Io(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, err))); + // } else { + // return Err(Error::other(err)); + // } + // } - info!( - "walk_dir {}/{:?} done {:?}", - opts.bucket, - opts.filter_prefix, - now.elapsed().unwrap_or_default() - ); - Ok(()) - } + // return Err(Error::other("unknown error")); + // } + // let entry = serde_json::from_str::(&resp.meta_cache_entry) + // .map_err(|_| Error::other(format!("Unexpected response: {:?}", response)))?; + // out.write_obj(&entry).await?; + // } + // None => break, + // _ => return Err(Error::other(format!("Unexpected response: {:?}", response))), + // } + // } + + // info!( + // "walk_dir {}/{:?} done {:?}", + // opts.bucket, + // opts.filter_prefix, + // now.elapsed().unwrap_or_default() + // ); + // Ok(()) + // } #[tracing::instrument(skip(self))] async fn delete_version( @@ -559,6 +567,28 @@ impl DiskAPI for RemoteDisk { Ok(response.volumes) } + #[tracing::instrument(skip(self, wr))] + async fn walk_dir(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> { + info!("walk_dir {}", self.endpoint.to_string()); + + let url = format!( + "{}/rustfs/rpc/walk_dir?disk={}", + self.endpoint.grid_host(), + urlencoding::encode(self.endpoint.to_string().as_str()), + ); + + let opts = serde_json::to_vec(&opts)?; + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + let mut reader = HttpReader::new(url, Method::GET, headers, Some(opts)).await?; + + tokio::io::copy(&mut reader, wr).await?; + + Ok(()) + } + #[tracing::instrument(level = "debug", skip(self))] async fn read_file(&self, volume: &str, path: &str) -> Result { info!("read_file {}/{}", volume, path); @@ -573,7 +603,7 @@ impl DiskAPI for RemoteDisk { 0 ); - Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new()).await?)) + Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new(), None).await?)) } #[tracing::instrument(level = "debug", skip(self))] @@ -589,7 +619,7 @@ impl DiskAPI for RemoteDisk { length ); - Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new()).await?)) + Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new(), None).await?)) } #[tracing::instrument(level = "debug", skip(self))] diff --git a/ecstore/src/erasure_coding/decode.rs b/ecstore/src/erasure_coding/decode.rs index fb7aa91a..b97d2b34 100644 --- a/ecstore/src/erasure_coding/decode.rs +++ b/ecstore/src/erasure_coding/decode.rs @@ -242,12 +242,14 @@ impl Erasure { } if !reader.can_decode(&shards) { + error!("erasure decode can_decode errs: {:?}", &errs); ret_err = Some(Error::ErasureReadQuorum.into()); break; } // Decode the shards if let Err(e) = self.decode_data(&mut shards) { + error!("erasure decode decode_data err: {:?}", e); ret_err = Some(e); break; } @@ -255,6 +257,7 @@ impl Erasure { let n = match write_data_blocks(writer, &shards, self.data_shards, block_offset, block_length).await { Ok(n) => n, Err(e) => { + error!("erasure decode write_data_blocks err: {:?}", e); ret_err = Some(e); break; } diff --git a/ecstore/src/erasure_coding/encode.rs b/ecstore/src/erasure_coding/encode.rs index a8da5a1a..c9dcac1b 100644 --- a/ecstore/src/erasure_coding/encode.rs +++ b/ecstore/src/erasure_coding/encode.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::vec; use tokio::io::AsyncRead; use tokio::sync::mpsc; +use tracing::error; pub(crate) struct MultiWriter<'a> { writers: &'a mut [Option], @@ -60,6 +61,13 @@ impl<'a> MultiWriter<'a> { } if let Some(write_err) = reduce_write_quorum_errs(&self.errs, OBJECT_OP_IGNORED_ERRS, self.write_quorum) { + error!( + "reduce_write_quorum_errs: {:?}, offline-disks={}/{}, errs={:?}", + write_err, + count_errs(&self.errs, &Error::DiskNotFound), + self.writers.len(), + self.errs + ); return Err(std::io::Error::other(format!( "Failed to write data: {} (offline-disks={}/{})", write_err, diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index ec71fc63..232de8ab 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -143,7 +143,11 @@ impl NotificationSys { #[tracing::instrument(skip(self))] pub async fn load_rebalance_meta(&self, start: bool) { let mut futures = Vec::with_capacity(self.peer_clients.len()); - for client in self.peer_clients.iter().flatten() { + for (i, client) in self.peer_clients.iter().flatten().enumerate() { + warn!( + "notification load_rebalance_meta start: {}, index: {}, client: {:?}", + start, i, client.host + ); futures.push(client.load_rebalance_meta(start)); } @@ -158,11 +162,16 @@ impl NotificationSys { } pub async fn stop_rebalance(&self) { + warn!("notification stop_rebalance start"); let Some(store) = new_object_layer_fn() else { error!("stop_rebalance: not init"); return; }; + // warn!("notification stop_rebalance load_rebalance_meta"); + // self.load_rebalance_meta(false).await; + // warn!("notification stop_rebalance load_rebalance_meta done"); + let mut futures = Vec::with_capacity(self.peer_clients.len()); for client in self.peer_clients.iter().flatten() { futures.push(client.stop_rebalance()); @@ -175,7 +184,9 @@ impl NotificationSys { } } + warn!("notification stop_rebalance stop_rebalance start"); let _ = store.stop_rebalance().await; + warn!("notification stop_rebalance stop_rebalance done"); } } diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index 041ab88d..425413c3 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -664,7 +664,7 @@ impl PeerRestClient { let response = client.load_rebalance_meta(request).await?.into_inner(); - warn!("load_rebalance_meta response {:?}", response); + warn!("load_rebalance_meta response {:?}, grid_host: {:?}", response, &self.grid_host); if !response.success { if let Some(msg) = response.error_info { return Err(Error::other(msg)); diff --git a/ecstore/src/rebalance.rs b/ecstore/src/rebalance.rs index 74d33903..853efed9 100644 --- a/ecstore/src/rebalance.rs +++ b/ecstore/src/rebalance.rs @@ -1,7 +1,3 @@ -use std::io::Cursor; -use std::sync::Arc; -use std::time::SystemTime; - use crate::StorageAPI; use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw}; use crate::config::com::{read_config_with_metadata, save_config_with_opts}; @@ -19,16 +15,18 @@ use rustfs_filemeta::{FileInfo, MetaCacheEntries, MetaCacheEntry, MetadataResolu use rustfs_rio::HashReader; use rustfs_utils::path::encode_dir_object; use serde::{Deserialize, Serialize}; +use std::io::Cursor; +use std::sync::Arc; +use time::OffsetDateTime; use tokio::io::AsyncReadExt; use tokio::sync::broadcast::{self, Receiver as B_Receiver}; use tokio::time::{Duration, Instant}; use tracing::{error, info, warn}; use uuid::Uuid; -use workers::workers::Workers; const REBAL_META_FMT: u16 = 1; // Replace with actual format value const REBAL_META_VER: u16 = 1; // Replace with actual version value -const REBAL_META_NAME: &str = "rebalance_meta"; +const REBAL_META_NAME: &str = "rebalance.bin"; #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct RebalanceStats { @@ -123,9 +121,9 @@ pub enum RebalSaveOpt { #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct RebalanceInfo { #[serde(rename = "startTs")] - pub start_time: Option, // Time at which rebalance-start was issued + pub start_time: Option, // Time at which rebalance-start was issued #[serde(rename = "stopTs")] - pub end_time: Option, // Time at which rebalance operation completed or rebalance-stop was called + pub end_time: Option, // Time at which rebalance operation completed or rebalance-stop was called #[serde(rename = "status")] pub status: RebalStatus, // Current state of rebalance operation } @@ -137,14 +135,14 @@ pub struct DiskStat { pub available_space: u64, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct RebalanceMeta { #[serde(skip)] pub cancel: Option>, // To be invoked on rebalance-stop #[serde(skip)] - pub last_refreshed_at: Option, + pub last_refreshed_at: Option, #[serde(rename = "stopTs")] - pub stopped_at: Option, // Time when rebalance-stop was issued + pub stopped_at: Option, // Time when rebalance-stop was issued #[serde(rename = "id")] pub id: String, // ID of the ongoing rebalance operation #[serde(rename = "pf")] @@ -164,29 +162,29 @@ impl RebalanceMeta { pub async fn load_with_opts(&mut self, store: Arc, opts: ObjectOptions) -> Result<()> { let (data, _) = read_config_with_metadata(store, REBAL_META_NAME, &opts).await?; if data.is_empty() { - warn!("rebalanceMeta: no data"); + warn!("rebalanceMeta load_with_opts: no data"); return Ok(()); } if data.len() <= 4 { - return Err(Error::other("rebalanceMeta: no data")); + return Err(Error::other("rebalanceMeta load_with_opts: no data")); } // Read header match u16::from_le_bytes([data[0], data[1]]) { REBAL_META_FMT => {} - fmt => return Err(Error::other(format!("rebalanceMeta: unknown format: {}", fmt))), + fmt => return Err(Error::other(format!("rebalanceMeta load_with_opts: unknown format: {}", fmt))), } match u16::from_le_bytes([data[2], data[3]]) { REBAL_META_VER => {} - ver => return Err(Error::other(format!("rebalanceMeta: unknown version: {}", ver))), + ver => return Err(Error::other(format!("rebalanceMeta load_with_opts: unknown version: {}", ver))), } let meta: Self = rmp_serde::from_read(Cursor::new(&data[4..]))?; *self = meta; - self.last_refreshed_at = Some(SystemTime::now()); + self.last_refreshed_at = Some(OffsetDateTime::now_utc()); - warn!("rebalanceMeta: loaded meta done"); + warn!("rebalanceMeta load_with_opts: loaded meta done"); Ok(()) } @@ -196,6 +194,7 @@ impl RebalanceMeta { pub async fn save_with_opts(&self, store: Arc, opts: ObjectOptions) -> Result<()> { if self.pool_stats.is_empty() { + warn!("rebalanceMeta save_with_opts: no pool stats"); return Ok(()); } @@ -218,7 +217,7 @@ impl ECStore { #[tracing::instrument(skip_all)] pub async fn load_rebalance_meta(&self) -> Result<()> { let mut meta = RebalanceMeta::new(); - warn!("rebalanceMeta: load rebalance meta"); + warn!("rebalanceMeta: store load rebalance meta"); match meta.load(self.pools[0].clone()).await { Ok(_) => { warn!("rebalanceMeta: rebalance meta loaded0"); @@ -255,9 +254,18 @@ impl ECStore { pub async fn update_rebalance_stats(&self) -> Result<()> { let mut ok = false; + let pool_stats = { + let rebalance_meta = self.rebalance_meta.read().await; + rebalance_meta.as_ref().map(|v| v.pool_stats.clone()).unwrap_or_default() + }; + + warn!("update_rebalance_stats: pool_stats: {:?}", &pool_stats); + for i in 0..self.pools.len() { - if self.find_index(i).await.is_none() { + if pool_stats.get(i).is_none() { + warn!("update_rebalance_stats: pool {} not found", i); let mut rebalance_meta = self.rebalance_meta.write().await; + warn!("update_rebalance_stats: pool {} not found, add", i); if let Some(meta) = rebalance_meta.as_mut() { meta.pool_stats.push(RebalanceStats::default()); } @@ -267,23 +275,24 @@ impl ECStore { } if ok { - let mut rebalance_meta = self.rebalance_meta.write().await; - if let Some(meta) = rebalance_meta.as_mut() { + warn!("update_rebalance_stats: save rebalance meta"); + + let rebalance_meta = self.rebalance_meta.read().await; + if let Some(meta) = rebalance_meta.as_ref() { meta.save(self.pools[0].clone()).await?; } - drop(rebalance_meta); } Ok(()) } - async fn find_index(&self, index: usize) -> Option { - if let Some(meta) = self.rebalance_meta.read().await.as_ref() { - return meta.pool_stats.get(index).map(|_v| index); - } + // async fn find_index(&self, index: usize) -> Option { + // if let Some(meta) = self.rebalance_meta.read().await.as_ref() { + // return meta.pool_stats.get(index).map(|_v| index); + // } - None - } + // None + // } #[tracing::instrument(skip(self))] pub async fn init_rebalance_meta(&self, bucktes: Vec) -> Result { @@ -310,7 +319,7 @@ impl ECStore { let mut pool_stats = Vec::with_capacity(self.pools.len()); - let now = SystemTime::now(); + let now = OffsetDateTime::now_utc(); for disk_stat in disk_stats.iter() { let mut pool_stat = RebalanceStats { @@ -369,20 +378,26 @@ impl ECStore { #[tracing::instrument(skip(self))] pub async fn next_rebal_bucket(&self, pool_index: usize) -> Result> { + warn!("next_rebal_bucket: pool_index: {}", pool_index); let rebalance_meta = self.rebalance_meta.read().await; + warn!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta); if let Some(meta) = rebalance_meta.as_ref() { if let Some(pool_stat) = meta.pool_stats.get(pool_index) { if pool_stat.info.status == RebalStatus::Completed || !pool_stat.participating { + warn!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index); return Ok(None); } if pool_stat.buckets.is_empty() { + warn!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index); return Ok(None); } + warn!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]); return Ok(Some(pool_stat.buckets[0].clone())); } } + warn!("next_rebal_bucket: pool_index: {} None", pool_index); Ok(None) } @@ -392,18 +407,28 @@ impl ECStore { if let Some(meta) = rebalance_meta.as_mut() { if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { warn!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets); - if let Some(idx) = pool_stat.buckets.iter().position(|b| b.as_str() == bucket.as_str()) { - warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket); - pool_stat.buckets.remove(idx); - pool_stat.rebalanced_buckets.push(bucket); + // 使用 retain 来过滤掉要删除的 bucket + let mut found = false; + pool_stat.buckets.retain(|b| { + if b.as_str() == bucket.as_str() { + found = true; + pool_stat.rebalanced_buckets.push(b.clone()); + false // 删除这个元素 + } else { + true // 保留这个元素 + } + }); + + if found { + warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket); return Ok(()); } else { warn!("bucket_rebalance_done: bucket {} not found", bucket); } } } - + warn!("bucket_rebalance_done: bucket {} not found", bucket); Ok(()) } @@ -411,18 +436,28 @@ impl ECStore { let rebalance_meta = self.rebalance_meta.read().await; if let Some(ref meta) = *rebalance_meta { if meta.stopped_at.is_some() { + warn!("is_rebalance_started: rebalance stopped"); return false; } + meta.pool_stats.iter().enumerate().for_each(|(i, v)| { + warn!( + "is_rebalance_started: pool_index: {}, participating: {:?}, status: {:?}", + i, v.participating, v.info.status + ); + }); + if meta .pool_stats .iter() .any(|v| v.participating && v.info.status != RebalStatus::Completed) { + warn!("is_rebalance_started: rebalance started"); return true; } } + warn!("is_rebalance_started: rebalance not started"); false } @@ -462,6 +497,7 @@ impl ECStore { { let mut rebalance_meta = self.rebalance_meta.write().await; + if let Some(meta) = rebalance_meta.as_mut() { meta.cancel = Some(tx) } else { @@ -474,19 +510,25 @@ impl ECStore { let participants = { if let Some(ref meta) = *self.rebalance_meta.read().await { - if meta.stopped_at.is_some() { - warn!("start_rebalance: rebalance already stopped exit"); - return; - } + // if meta.stopped_at.is_some() { + // warn!("start_rebalance: rebalance already stopped exit"); + // return; + // } let mut participants = vec![false; meta.pool_stats.len()]; for (i, pool_stat) in meta.pool_stats.iter().enumerate() { - if pool_stat.info.status == RebalStatus::Started { - participants[i] = pool_stat.participating; + warn!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status); + if pool_stat.info.status != RebalStatus::Started { + warn!("start_rebalance: pool {} not started, skipping", i); + continue; } + + warn!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating); + participants[i] = pool_stat.participating; } participants } else { + warn!("start_rebalance:2 rebalance_meta is None exit"); Vec::new() } }; @@ -497,11 +539,13 @@ impl ECStore { continue; } - if get_global_endpoints() - .as_ref() - .get(idx) - .is_none_or(|v| v.endpoints.as_ref().first().is_none_or(|e| e.is_local)) - { + if !get_global_endpoints().as_ref().get(idx).is_some_and(|v| { + warn!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints); + v.endpoints.as_ref().first().is_some_and(|e| { + warn!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local); + e.is_local + }) + }) { warn!("start_rebalance: pool {} is not local, skipping", idx); continue; } @@ -522,13 +566,13 @@ impl ECStore { } #[tracing::instrument(skip(self, rx))] - async fn rebalance_buckets(self: &Arc, rx: B_Receiver, pool_index: usize) -> Result<()> { + async fn rebalance_buckets(self: &Arc, mut rx: B_Receiver, pool_index: usize) -> Result<()> { let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::>(1); // Save rebalance metadata periodically let store = self.clone(); let save_task = tokio::spawn(async move { - let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(10), Duration::from_secs(10)); + let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(30), Duration::from_secs(10)); let mut msg: String; let mut quit = false; @@ -537,14 +581,15 @@ impl ECStore { // TODO: cancel rebalance Some(result) = done_rx.recv() => { quit = true; - let now = SystemTime::now(); - + let now = OffsetDateTime::now_utc(); let state = match result { Ok(_) => { + warn!("rebalance_buckets: completed"); msg = format!("Rebalance completed at {:?}", now); RebalStatus::Completed}, Err(err) => { + warn!("rebalance_buckets: error: {:?}", err); // TODO: check stop if err.to_string().contains("canceled") { msg = format!("Rebalance stopped at {:?}", now); @@ -557,9 +602,11 @@ impl ECStore { }; { + warn!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state); let mut rebalance_meta = store.rebalance_meta.write().await; if let Some(rbm) = rebalance_meta.as_mut() { + warn!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state); rbm.pool_stats[pool_index].info.status = state; rbm.pool_stats[pool_index].info.end_time = Some(now); } @@ -568,7 +615,7 @@ impl ECStore { } _ = timer.tick() => { - let now = SystemTime::now(); + let now = OffsetDateTime::now_utc(); msg = format!("Saving rebalance metadata at {:?}", now); } } @@ -576,7 +623,7 @@ impl ECStore { if let Err(err) = store.save_rebalance_stats(pool_index, RebalSaveOpt::Stats).await { error!("{} err: {:?}", msg, err); } else { - info!(msg); + warn!(msg); } if quit { @@ -588,30 +635,41 @@ impl ECStore { } }); - warn!("Pool {} rebalancing is started", pool_index + 1); + warn!("Pool {} rebalancing is started", pool_index); - while let Some(bucket) = self.next_rebal_bucket(pool_index).await? { - warn!("Rebalancing bucket: start {}", bucket); - - if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await { - if err.to_string().contains("not initialized") { - warn!("rebalance_bucket: rebalance not initialized, continue"); - continue; - } - error!("Error rebalancing bucket {}: {:?}", bucket, err); - done_tx.send(Err(err)).await.ok(); + loop { + if let Ok(true) = rx.try_recv() { + warn!("Pool {} rebalancing is stopped", pool_index); + done_tx.send(Err(Error::other("rebalance stopped canceled"))).await.ok(); break; } - warn!("Rebalance bucket: done {} ", bucket); - self.bucket_rebalance_done(pool_index, bucket).await?; + if let Some(bucket) = self.next_rebal_bucket(pool_index).await? { + warn!("Rebalancing bucket: start {}", bucket); + + if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await { + if err.to_string().contains("not initialized") { + warn!("rebalance_bucket: rebalance not initialized, continue"); + continue; + } + error!("Error rebalancing bucket {}: {:?}", bucket, err); + done_tx.send(Err(err)).await.ok(); + break; + } + + warn!("Rebalance bucket: done {} ", bucket); + self.bucket_rebalance_done(pool_index, bucket).await?; + } else { + warn!("Rebalance bucket: no bucket to rebalance"); + break; + } } - warn!("Pool {} rebalancing is done", pool_index + 1); + warn!("Pool {} rebalancing is done", pool_index); done_tx.send(Ok(())).await.ok(); save_task.await.ok(); - + warn!("Pool {} rebalancing is done2", pool_index); Ok(()) } @@ -622,6 +680,7 @@ impl ECStore { if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { // Check if the pool's rebalance status is already completed if pool_stat.info.status == RebalStatus::Completed { + warn!("check_if_rebalance_done: pool {} is already completed", pool_index); return true; } @@ -631,7 +690,8 @@ impl ECStore { // Mark pool rebalance as done if within 5% of the PercentFreeGoal if (pfi - meta.percent_free_goal).abs() <= 0.05 { pool_stat.info.status = RebalStatus::Completed; - pool_stat.info.end_time = Some(SystemTime::now()); + pool_stat.info.end_time = Some(OffsetDateTime::now_utc()); + warn!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi); return true; } } @@ -641,24 +701,30 @@ impl ECStore { } #[allow(unused_assignments)] - #[tracing::instrument(skip(self, wk, set))] + #[tracing::instrument(skip(self, set))] async fn rebalance_entry( &self, bucket: String, pool_index: usize, entry: MetaCacheEntry, set: Arc, - wk: Arc, + // wk: Arc, ) { - defer!(|| async { - wk.give().await; - }); + warn!("rebalance_entry: start rebalance_entry"); + + // defer!(|| async { + // warn!("rebalance_entry: defer give worker start"); + // wk.give().await; + // warn!("rebalance_entry: defer give worker done"); + // }); if entry.is_dir() { + warn!("rebalance_entry: entry is dir, skipping"); return; } if self.check_if_rebalance_done(pool_index).await { + warn!("rebalance_entry: rebalance done, skipping pool {}", pool_index); return; } @@ -666,6 +732,7 @@ impl ECStore { Ok(fivs) => fivs, Err(err) => { error!("rebalance_entry Error getting file info versions: {}", err); + warn!("rebalance_entry: Error getting file info versions, skipping"); return; } }; @@ -676,7 +743,7 @@ impl ECStore { let expired: usize = 0; for version in fivs.versions.iter() { if version.is_remote() { - info!("rebalance_entry Entry {} is remote, skipping", version.name); + warn!("rebalance_entry Entry {} is remote, skipping", version.name); continue; } // TODO: filterLifecycle @@ -684,7 +751,7 @@ impl ECStore { let remaining_versions = fivs.versions.len() - expired; if version.deleted && remaining_versions == 1 { rebalanced += 1; - info!("rebalance_entry Entry {} is deleted and last version, skipping", version.name); + warn!("rebalance_entry Entry {} is deleted and last version, skipping", version.name); continue; } let version_id = version.version_id.map(|v| v.to_string()); @@ -735,6 +802,7 @@ impl ECStore { } for _i in 0..3 { + warn!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name); let rd = match set .get_object_reader( bucket.as_str(), @@ -753,6 +821,10 @@ impl ECStore { Err(err) => { if is_err_object_not_found(&err) || is_err_version_not_found(&err) { ignore = true; + warn!( + "rebalance_entry: get_object_reader, bucket: {}, version: {}, ignore", + &bucket, &version.name + ); break; } @@ -765,7 +837,7 @@ impl ECStore { if let Err(err) = self.rebalance_object(pool_index, bucket.clone(), rd).await { if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { ignore = true; - info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); break; } @@ -780,7 +852,7 @@ impl ECStore { } if ignore { - info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); continue; } @@ -812,7 +884,7 @@ impl ECStore { { error!("rebalance_entry: delete_object err {:?}", &err); } else { - info!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name); + warn!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name); } } } @@ -957,26 +1029,29 @@ impl ECStore { let pool = self.pools[pool_index].clone(); - let wk = Workers::new(pool.disk_set.len() * 2).map_err(Error::other)?; + let mut jobs = Vec::new(); + // let wk = Workers::new(pool.disk_set.len() * 2).map_err(Error::other)?; + // wk.clone().take().await; for (set_idx, set) in pool.disk_set.iter().enumerate() { - wk.clone().take().await; - let rebalance_entry: ListCallback = Arc::new({ let this = Arc::clone(self); let bucket = bucket.clone(); - let wk = wk.clone(); + // let wk = wk.clone(); let set = set.clone(); move |entry: MetaCacheEntry| { let this = this.clone(); let bucket = bucket.clone(); - let wk = wk.clone(); + // let wk = wk.clone(); let set = set.clone(); Box::pin(async move { - wk.take().await; - tokio::spawn(async move { - this.rebalance_entry(bucket, pool_index, entry, set, wk).await; - }); + warn!("rebalance_entry: rebalance_entry spawn start"); + // wk.take().await; + // tokio::spawn(async move { + warn!("rebalance_entry: rebalance_entry spawn start2"); + this.rebalance_entry(bucket, pool_index, entry, set).await; + warn!("rebalance_entry: rebalance_entry spawn done"); + // }); }) } }); @@ -984,62 +1059,68 @@ impl ECStore { let set = set.clone(); let rx = rx.resubscribe(); let bucket = bucket.clone(); - let wk = wk.clone(); - tokio::spawn(async move { + // let wk = wk.clone(); + + let job = tokio::spawn(async move { if let Err(err) = set.list_objects_to_rebalance(rx, bucket, rebalance_entry).await { error!("Rebalance worker {} error: {}", set_idx, err); } else { info!("Rebalance worker {} done", set_idx); } - wk.clone().give().await; + // wk.clone().give().await; }); + + jobs.push(job); } - wk.wait().await; + // wk.wait().await; + for job in jobs { + job.await.unwrap(); + } + warn!("rebalance_bucket: rebalance_bucket done"); Ok(()) } #[tracing::instrument(skip(self))] pub async fn save_rebalance_stats(&self, pool_idx: usize, opt: RebalSaveOpt) -> Result<()> { - // TODO: NSLOOK - + // TODO: lock let mut meta = RebalanceMeta::new(); - meta.load_with_opts( - self.pools[0].clone(), - ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) - .await?; - - if opt == RebalSaveOpt::StoppedAt { - meta.stopped_at = Some(SystemTime::now()); - } - - let mut rebalance_meta = self.rebalance_meta.write().await; - - if let Some(rb) = rebalance_meta.as_mut() { - if opt == RebalSaveOpt::Stats { - meta.pool_stats[pool_idx] = rb.pool_stats[pool_idx].clone(); + if let Err(err) = meta.load(self.pools[0].clone()).await { + if err != Error::ConfigNotFound { + warn!("save_rebalance_stats: load err: {:?}", err); + return Err(err); } - - *rb = meta; - } else { - *rebalance_meta = Some(meta); } - if let Some(meta) = rebalance_meta.as_mut() { - meta.save_with_opts( - self.pools[0].clone(), - ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) - .await?; + match opt { + RebalSaveOpt::Stats => { + { + let mut rebalance_meta = self.rebalance_meta.write().await; + if let Some(rbm) = rebalance_meta.as_mut() { + meta.pool_stats[pool_idx] = rbm.pool_stats[pool_idx].clone(); + } + } + + if let Some(pool_stat) = meta.pool_stats.get_mut(pool_idx) { + pool_stat.info.end_time = Some(OffsetDateTime::now_utc()); + } + } + RebalSaveOpt::StoppedAt => { + meta.stopped_at = Some(OffsetDateTime::now_utc()); + } } + { + let mut rebalance_meta = self.rebalance_meta.write().await; + *rebalance_meta = Some(meta.clone()); + } + + warn!( + "save_rebalance_stats: save rebalance meta, pool_idx: {}, opt: {:?}, meta: {:?}", + pool_idx, opt, meta + ); + meta.save(self.pools[0].clone()).await?; + Ok(()) } } @@ -1052,12 +1133,15 @@ impl SetDisks { bucket: String, cb: ListCallback, ) -> Result<()> { + warn!("list_objects_to_rebalance: start list_objects_to_rebalance"); // Placeholder for actual object listing logic let (disks, _) = self.get_online_disks_with_healing(false).await; if disks.is_empty() { + warn!("list_objects_to_rebalance: no disk available"); return Err(Error::other("errNoDiskAvailable")); } + warn!("list_objects_to_rebalance: get online disks with healing"); let listing_quorum = self.set_drive_count.div_ceil(2); let resolver = MetadataResolutionParams { @@ -1075,7 +1159,10 @@ impl SetDisks { bucket: bucket.clone(), recursice: true, min_disks: listing_quorum, - agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))), + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + warn!("list_objects_to_rebalance: agreed: {:?}", &entry.name); + Box::pin(cb1(entry)) + })), partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { // let cb = cb.clone(); let resolver = resolver.clone(); @@ -1083,11 +1170,11 @@ impl SetDisks { match entries.resolve(resolver) { Some(entry) => { - warn!("rebalance: list_objects_to_decommission get {}", &entry.name); + warn!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name); Box::pin(async move { cb(entry).await }) } None => { - warn!("rebalance: list_objects_to_decommission get none"); + warn!("list_objects_to_rebalance: list_objects_to_decommission get none"); Box::pin(async {}) } } @@ -1097,6 +1184,7 @@ impl SetDisks { ) .await?; + warn!("list_objects_to_rebalance: list_objects_to_rebalance done"); Ok(()) } } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 271cc4c9..607f9fd2 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -860,6 +860,7 @@ impl SetDisks { }; if let Some(err) = reduce_read_quorum_errs(errs, OBJECT_OP_IGNORED_ERRS, expected_rquorum) { + error!("object_quorum_from_meta: {:?}, errs={:?}", err, errs); return Err(err); } @@ -872,6 +873,7 @@ impl SetDisks { let parity_blocks = Self::common_parity(&parities, default_parity_count as i32); if parity_blocks < 0 { + error!("object_quorum_from_meta: parity_blocks < 0, errs={:?}", errs); return Err(DiskError::ErasureReadQuorum); } @@ -936,6 +938,7 @@ impl SetDisks { Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count).map_err(map_err_notfound)?; if read_quorum < 0 { + error!("check_upload_id_exists: read_quorum < 0, errs={:?}", errs); return Err(Error::ErasureReadQuorum); } @@ -977,6 +980,7 @@ impl SetDisks { quorum: usize, ) -> disk::error::Result { if quorum < 1 { + error!("find_file_info_in_quorum: quorum < 1"); return Err(DiskError::ErasureReadQuorum); } @@ -1035,6 +1039,7 @@ impl SetDisks { } if max_count < quorum { + error!("find_file_info_in_quorum: max_count < quorum, max_val={:?}", max_val); return Err(DiskError::ErasureReadQuorum); } @@ -1079,7 +1084,7 @@ impl SetDisks { return Ok(fi); } - warn!("QuorumError::Read, find_file_info_in_quorum fileinfo not found"); + error!("find_file_info_in_quorum: fileinfo not found"); Err(DiskError::ErasureReadQuorum) } @@ -1763,10 +1768,18 @@ impl SetDisks { let _min_disks = self.set_drive_count - self.default_parity_count; - let (read_quorum, _) = Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count) - .map_err(|err| to_object_err(err.into(), vec![bucket, object]))?; + let (read_quorum, _) = match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count) + .map_err(|err| to_object_err(err.into(), vec![bucket, object])) + { + Ok(v) => v, + Err(e) => { + error!("Self::object_quorum_from_meta: {:?}, bucket: {}, object: {}", &e, bucket, object); + return Err(e); + } + }; if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum as usize) { + error!("reduce_read_quorum_errs: {:?}, bucket: {}, object: {}", &err, bucket, object); return Err(to_object_err(err.into(), vec![bucket, object])); } @@ -1896,6 +1909,7 @@ impl SetDisks { let nil_count = errors.iter().filter(|&e| e.is_none()).count(); if nil_count < erasure.data_shards { if let Some(read_err) = reduce_read_quorum_errs(&errors, OBJECT_OP_IGNORED_ERRS, erasure.data_shards) { + error!("create_bitrot_reader reduce_read_quorum_errs {:?}", &errors); return Err(to_object_err(read_err.into(), vec![bucket, object])); } @@ -2942,6 +2956,7 @@ impl SetDisks { } Ok(m) } else { + error!("delete_if_dang_ling: is_object_dang_ling errs={:?}", errs); Err(DiskError::ErasureReadQuorum) } } @@ -3004,41 +3019,56 @@ impl SetDisks { } let (buckets_results_tx, mut buckets_results_rx) = mpsc::channel::(disks.len()); + // 新增:从环境变量读取基础间隔,默认30秒 + let set_disk_update_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(30); let update_time = { let mut rng = rand::rng(); - Duration::from_secs(30) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0)) + Duration::from_secs(set_disk_update_interval_secs) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0)) }; let mut ticker = interval(update_time); - let task = tokio::spawn(async move { - let last_save = Some(SystemTime::now()); - let mut need_loop = true; - while need_loop { - select! { - _ = ticker.tick() => { - if !cache.info.last_update.eq(&last_save) { - let _ = cache.save(DATA_USAGE_CACHE_NAME).await; - let _ = updates.send(cache.clone()).await; - } - } - result = buckets_results_rx.recv() => { - match result { - Some(result) => { - cache.replace(&result.name, &result.parent, result.entry); - cache.info.last_update = Some(SystemTime::now()); - }, - None => { - need_loop = false; - cache.info.next_cycle = want_cycle; - cache.info.last_update = Some(SystemTime::now()); + // 检查是否需要运行后台任务 + let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(false); + + let task = if !skip_background_task { + Some(tokio::spawn(async move { + let last_save = Some(SystemTime::now()); + let mut need_loop = true; + while need_loop { + select! { + _ = ticker.tick() => { + if !cache.info.last_update.eq(&last_save) { let _ = cache.save(DATA_USAGE_CACHE_NAME).await; let _ = updates.send(cache.clone()).await; } } + result = buckets_results_rx.recv() => { + match result { + Some(result) => { + cache.replace(&result.name, &result.parent, result.entry); + cache.info.last_update = Some(SystemTime::now()); + }, + None => { + need_loop = false; + cache.info.next_cycle = want_cycle; + cache.info.last_update = Some(SystemTime::now()); + let _ = cache.save(DATA_USAGE_CACHE_NAME).await; + let _ = updates.send(cache.clone()).await; + } + } + } } } - } - }); + })) + } else { + None + }; // Restrict parallelism for disk usage scanner let max_procs = num_cpus::get(); @@ -3142,7 +3172,9 @@ impl SetDisks { info!("ns_scanner start"); let _ = join_all(futures).await; - let _ = task.await; + if let Some(task) = task { + let _ = task.await; + } info!("ns_scanner completed"); Ok(()) } @@ -3894,7 +3926,13 @@ impl ObjectIO for SetDisks { let stream = mem::replace(&mut data.stream, HashReader::new(Box::new(Cursor::new(Vec::new())), 0, 0, None, false)?); - let (reader, w_size) = Arc::new(erasure).encode(stream, &mut writers, write_quorum).await?; // TODO: 出错,删除临时目录 + let (reader, w_size) = match Arc::new(erasure).encode(stream, &mut writers, write_quorum).await { + Ok((r, w)) => (r, w), + Err(e) => { + error!("encode err {:?}", e); + return Err(e.into()); + } + }; // TODO: 出错,删除临时目录 let _ = mem::replace(&mut data.stream, reader); // if let Err(err) = close_bitrot_writers(&mut writers).await { diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index c24f2224..c606a43e 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -847,9 +847,26 @@ impl ECStore { let (update_closer_tx, mut update_close_rx) = mpsc::channel(10); let mut ctx_clone = cancel.subscribe(); let all_buckets_clone = all_buckets.clone(); + // 新增:从环境变量读取interval,默认30秒 + let ns_scanner_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(30); + + // 检查是否跳过后台任务 + let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(false); + + if skip_background_task { + info!("跳过后台任务执行: RUSTFS_SKIP_BACKGROUND_TASK=true"); + return Ok(()); + } + let task = tokio::spawn(async move { let mut last_update: Option = None; - let mut interval = interval(Duration::from_secs(30)); + let mut interval = interval(Duration::from_secs(ns_scanner_interval_secs)); let all_merged = Arc::new(RwLock::new(DataUsageCache::default())); loop { select! { diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index c3bd38d9..c08bb8b2 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -364,6 +364,7 @@ impl ECStore { max_keys: i32, ) -> Result { if marker.is_none() && version_marker.is_some() { + warn!("inner_list_object_versions: marker is none and version_marker is some"); return Err(StorageError::NotImplemented); } diff --git a/iam/src/manager.rs b/iam/src/manager.rs index 23c10ecd..a556aff2 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -95,38 +95,45 @@ where self.clone().save_iam_formatter().await?; self.clone().load().await?; - // Background thread starts periodic updates or receives signal updates - tokio::spawn({ - let s = Arc::clone(&self); - async move { - let ticker = tokio::time::interval(Duration::from_secs(120)); - tokio::pin!(ticker, reciver); - loop { - select! { - _ = ticker.tick() => { - if let Err(err) =s.clone().load().await{ - error!("iam load err {:?}", err); - } - }, - i = reciver.recv() => { - match i { - Some(t) => { - let last = s.last_timestamp.load(Ordering::Relaxed); - if last <= t { + // 检查环境变量是否设置 + let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK").is_ok(); - if let Err(err) =s.clone().load().await{ - error!("iam load err {:?}", err); + if !skip_background_task { + // Background thread starts periodic updates or receives signal updates + tokio::spawn({ + let s = Arc::clone(&self); + async move { + let ticker = tokio::time::interval(Duration::from_secs(120)); + tokio::pin!(ticker, reciver); + loop { + select! { + _ = ticker.tick() => { + warn!("iam load ticker"); + if let Err(err) =s.clone().load().await{ + error!("iam load err {:?}", err); + } + }, + i = reciver.recv() => { + warn!("iam load reciver"); + match i { + Some(t) => { + let last = s.last_timestamp.load(Ordering::Relaxed); + if last <= t { + warn!("iam load reciver load"); + if let Err(err) =s.clone().load().await{ + error!("iam load err {:?}", err); + } + ticker.reset(); } - ticker.reset(); - } - }, - None => return, + }, + None => return, + } } } } } - } - }); + }); + } Ok(()) } diff --git a/rustfs/src/admin/handlers/rebalance.rs b/rustfs/src/admin/handlers/rebalance.rs index f2cfb7c5..e9c3507a 100644 --- a/rustfs/src/admin/handlers/rebalance.rs +++ b/rustfs/src/admin/handlers/rebalance.rs @@ -10,7 +10,8 @@ use http::{HeaderMap, StatusCode}; use matchit::Params; use s3s::{Body, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error}; use serde::{Deserialize, Serialize}; -use std::time::{Duration, SystemTime}; +use std::time::Duration; +use time::OffsetDateTime; use tracing::warn; use crate::admin::router::Operation; @@ -56,8 +57,8 @@ pub struct RebalanceAdminStatus { pub id: String, // Identifies the ongoing rebalance operation by a UUID #[serde(rename = "pools")] pub pools: Vec, // Contains all pools, including inactive - #[serde(rename = "stoppedAt")] - pub stopped_at: Option, // Optional timestamp when rebalance was stopped + #[serde(rename = "stoppedAt", with = "offsetdatetime_rfc3339")] + pub stopped_at: Option, // Optional timestamp when rebalance was stopped } pub struct RebalanceStart {} @@ -101,11 +102,13 @@ impl Operation for RebalanceStart { } }; + store.start_rebalance().await; + warn!("Rebalance started with id: {}", id); if let Some(notification_sys) = get_global_notification_sys() { - warn!("Loading rebalance meta"); + warn!("RebalanceStart Loading rebalance meta start"); notification_sys.load_rebalance_meta(true).await; - warn!("Rebalance meta loaded"); + warn!("RebalanceStart Loading rebalance meta done"); } let resp = RebalanceResp { id }; @@ -175,15 +178,14 @@ impl Operation for RebalanceStatus { let total_bytes_to_rebal = ps.init_capacity as f64 * meta.percent_free_goal - ps.init_free_space as f64; let mut elapsed = if let Some(start_time) = ps.info.start_time { - SystemTime::now() - .duration_since(start_time) - .map_err(|e| s3_error!(InternalError, "Failed to calculate elapsed time: {}", e))? + let now = OffsetDateTime::now_utc(); + now - start_time } else { return Err(s3_error!(InternalError, "Start time is not available")); }; let mut eta = if ps.bytes > 0 { - Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_secs_f64() / ps.bytes as f64) + Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_seconds_f64() / ps.bytes as f64) } else { Duration::ZERO }; @@ -193,10 +195,8 @@ impl Operation for RebalanceStatus { } if let Some(stopped_at) = stop_time { - if let Ok(du) = stopped_at.duration_since(ps.info.start_time.unwrap_or(stopped_at)) { - elapsed = du; - } else { - return Err(s3_error!(InternalError, "Failed to calculate elapsed time")); + if let Some(start_time) = ps.info.start_time { + elapsed = stopped_at - start_time; } eta = Duration::ZERO; @@ -208,7 +208,7 @@ impl Operation for RebalanceStatus { bytes: ps.bytes, bucket: ps.bucket.clone(), object: ps.object.clone(), - elapsed: elapsed.as_secs(), + elapsed: elapsed.whole_seconds() as u64, eta: eta.as_secs(), }); } @@ -244,10 +244,45 @@ impl Operation for RebalanceStop { .await .map_err(|e| s3_error!(InternalError, "Failed to stop rebalance: {}", e))?; + warn!("handle RebalanceStop save_rebalance_stats done "); if let Some(notification_sys) = get_global_notification_sys() { - notification_sys.load_rebalance_meta(true).await; + warn!("handle RebalanceStop notification_sys load_rebalance_meta"); + notification_sys.load_rebalance_meta(false).await; + warn!("handle RebalanceStop notification_sys load_rebalance_meta done"); } - return Err(s3_error!(NotImplemented)); + Ok(S3Response::new((StatusCode::OK, Body::empty()))) + } +} + +mod offsetdatetime_rfc3339 { + use serde::{self, Deserialize, Deserializer, Serializer}; + use time::{OffsetDateTime, format_description::well_known::Rfc3339}; + + pub fn serialize(dt: &Option, serializer: S) -> Result + where + S: Serializer, + { + match dt { + Some(dt) => { + let s = dt.format(&Rfc3339).map_err(serde::ser::Error::custom)?; + serializer.serialize_some(&s) + } + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let opt = Option::::deserialize(deserializer)?; + match opt { + Some(s) => { + let dt = OffsetDateTime::parse(&s, &Rfc3339).map_err(serde::de::Error::custom)?; + Ok(Some(dt)) + } + None => Ok(None), + } } } diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs index 19fe84d0..d650e5c5 100644 --- a/rustfs/src/admin/rpc.rs +++ b/rustfs/src/admin/rpc.rs @@ -3,6 +3,7 @@ use super::router::Operation; use super::router::S3Router; use crate::storage::ecfs::bytes_stream; use ecstore::disk::DiskAPI; +use ecstore::disk::WalkDirOptions; use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; use ecstore::store::find_local_disk; use futures::TryStreamExt; @@ -18,6 +19,7 @@ use s3s::s3_error; use serde_urlencoded::from_bytes; use tokio_util::io::ReaderStream; use tokio_util::io::StreamReader; +use tracing::warn; pub const RPC_PREFIX: &str = "/rustfs/rpc"; @@ -28,12 +30,30 @@ pub fn regist_rpc_route(r: &mut S3Router) -> std::io::Result<()> AdminOperation(&ReadFile {}), )?; + r.insert( + Method::HEAD, + format!("{}{}", RPC_PREFIX, "/read_file_stream").as_str(), + AdminOperation(&ReadFile {}), + )?; + r.insert( Method::PUT, format!("{}{}", RPC_PREFIX, "/put_file_stream").as_str(), AdminOperation(&PutFile {}), )?; + r.insert( + Method::GET, + format!("{}{}", RPC_PREFIX, "/walk_dir").as_str(), + AdminOperation(&WalkDir {}), + )?; + + r.insert( + Method::HEAD, + format!("{}{}", RPC_PREFIX, "/walk_dir").as_str(), + AdminOperation(&WalkDir {}), + )?; + Ok(()) } @@ -50,6 +70,9 @@ pub struct ReadFile {} #[async_trait::async_trait] impl Operation for ReadFile { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + if req.method == Method::HEAD { + return Ok(S3Response::new((StatusCode::OK, Body::empty()))); + } let query = { if let Some(query) = req.uri.query() { let input: ReadFileQuery = @@ -79,6 +102,61 @@ impl Operation for ReadFile { } } +#[derive(Debug, Default, serde::Deserialize)] +pub struct WalkDirQuery { + disk: String, +} + +pub struct WalkDir {} + +#[async_trait::async_trait] +impl Operation for WalkDir { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + if req.method == Method::HEAD { + return Ok(S3Response::new((StatusCode::OK, Body::empty()))); + } + + let query = { + if let Some(query) = req.uri.query() { + let input: WalkDirQuery = + from_bytes(query.as_bytes()).map_err(|e| s3_error!(InvalidArgument, "get query failed1 {:?}", e))?; + input + } else { + WalkDirQuery::default() + } + }; + + let mut input = req.input; + let body = match input.store_all_unlimited().await { + Ok(b) => b, + Err(e) => { + warn!("get body failed, e: {:?}", e); + return Err(s3_error!(InvalidRequest, "get body failed")); + } + }; + + // let body_bytes = decrypt_data(input_cred.secret_key.expose().as_bytes(), &body) + // .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, format!("decrypt_data err {}", e)))?; + + let args: WalkDirOptions = + serde_json::from_slice(&body).map_err(|e| s3_error!(InternalError, "unmarshal body err {}", e))?; + let Some(disk) = find_local_disk(&query.disk).await else { + return Err(s3_error!(InvalidArgument, "disk not found")); + }; + + let (rd, mut wd) = tokio::io::duplex(DEFAULT_READ_BUFFER_SIZE); + + tokio::spawn(async move { + if let Err(e) = disk.walk_dir(args, &mut wd).await { + warn!("walk dir err {}", e); + } + }); + + let body = Body::from(StreamingBlob::wrap(ReaderStream::with_capacity(rd, DEFAULT_READ_BUFFER_SIZE))); + Ok(S3Response::new((StatusCode::OK, body))) + } +} + // /rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}" #[derive(Debug, Default, serde::Deserialize)] pub struct PutFileQuery { diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index a9c0de4b..a4af6164 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -807,11 +807,38 @@ impl Node for NodeService { } } Err(err) => { + if err == rustfs_filemeta::Error::Unexpected { + let _ = tx + .send(Ok(WalkDirResponse { + success: false, + meta_cache_entry: "".to_string(), + error_info: Some(err.to_string()), + })) + .await; + + break; + } + if rustfs_filemeta::is_io_eof(&err) { + let _ = tx + .send(Ok(WalkDirResponse { + success: false, + meta_cache_entry: "".to_string(), + error_info: Some(err.to_string()), + })) + .await; break; } println!("get err {:?}", err); + + let _ = tx + .send(Ok(WalkDirResponse { + success: false, + meta_cache_entry: "".to_string(), + error_info: Some(err.to_string()), + })) + .await; break; } } diff --git a/scripts/dev_clear.sh b/scripts/dev_clear.sh new file mode 100644 index 00000000..095f1d4d --- /dev/null +++ b/scripts/dev_clear.sh @@ -0,0 +1,11 @@ +for i in {0..3}; do + DIR="/data/rustfs$i" + echo "处理 $DIR" + if [ -d "$DIR" ]; then + echo "清空 $DIR" + sudo rm -rf "$DIR"/* "$DIR"/.[!.]* "$DIR"/..?* 2>/dev/null || true + echo "已清空 $DIR" + else + echo "$DIR 不存在,跳过" + fi +done \ No newline at end of file diff --git a/scripts/dev.sh b/scripts/dev_deploy.sh similarity index 66% rename from scripts/dev.sh rename to scripts/dev_deploy.sh index eb72331c..13cb3eb2 100755 --- a/scripts/dev.sh +++ b/scripts/dev_deploy.sh @@ -4,18 +4,20 @@ rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip # 压缩./target/x86_64-unknown-linux-musl/release/rustfs -zip ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs - +zip -j ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs # 本地文件路径 LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip" REMOTE_PATH="~" -# 定义服务器列表数组 -# 格式:服务器 IP 用户名 目标路径 -SERVER_LIST=( - "root@121.89.80.13" -) +# 必须传入IP参数,否则报错退出 +if [ -z "$1" ]; then + echo "用法: $0 " + echo "请传入目标服务器IP地址" + exit 1 +fi + +SERVER_LIST=("root@$1") # 遍历服务器列表 for SERVER in "${SERVER_LIST[@]}"; do @@ -26,7 +28,4 @@ for SERVER in "${SERVER_LIST[@]}"; do else echo "复制到 $SERVER 失败" fi -done - - -# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9 \ No newline at end of file +done \ No newline at end of file diff --git a/scripts/dev_rustfs.env b/scripts/dev_rustfs.env new file mode 100644 index 00000000..a953320a --- /dev/null +++ b/scripts/dev_rustfs.env @@ -0,0 +1,11 @@ +RUSTFS_ROOT_USER=rustfsadmin +RUSTFS_ROOT_PASSWORD=rustfsadmin + +RUSTFS_VOLUMES="http://node{1...4}:7000/data/rustfs{0...3} http://node{5...8}:7000/data/rustfs{0...3}" +RUSTFS_ADDRESS=":7000" +RUSTFS_CONSOLE_ENABLE=true +RUSTFS_CONSOLE_ADDRESS=":7001" +RUST_LOG=warn +RUSTFS_OBS_LOG_DIRECTORY="/var/logs/rustfs/" +RUSTFS_NS_SCANNER_INTERVAL=60 +RUSTFS_SKIP_BACKGROUND_TASK=true \ No newline at end of file diff --git a/scripts/dev_rustfs.sh b/scripts/dev_rustfs.sh new file mode 100644 index 00000000..76f5ab61 --- /dev/null +++ b/scripts/dev_rustfs.sh @@ -0,0 +1,199 @@ +#!/bin/bash + +# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9 + +# 本地 rustfs.zip 路径 +ZIP_FILE="./rustfs.zip" +# 解压目标 +UNZIP_TARGET="./" + + +SERVER_LIST=( + "root@172.23.215.2" # node1 + "root@172.23.215.4" # node2 + "root@172.23.215.7" # node3 + "root@172.23.215.3" # node4 + "root@172.23.215.8" # node5 + "root@172.23.215.5" # node6 + "root@172.23.215.9" # node7 + "root@172.23.215.6" # node8 +) + +REMOTE_TMP="~/rustfs" + +# 部署 rustfs 到所有服务器 +deploy() { + echo "解压 $ZIP_FILE ..." + unzip -o "$ZIP_FILE" -d "$UNZIP_TARGET" + if [ $? -ne 0 ]; then + echo "解压失败,退出" + exit 1 + fi + + LOCAL_RUSTFS="${UNZIP_TARGET}rustfs" + if [ ! -f "$LOCAL_RUSTFS" ]; then + echo "未找到解压后的 rustfs 文件,退出" + exit 1 + fi + + for SERVER in "${SERVER_LIST[@]}"; do + echo "上传 $LOCAL_RUSTFS 到 $SERVER:$REMOTE_TMP" + scp "$LOCAL_RUSTFS" "${SERVER}:${REMOTE_TMP}" + if [ $? -ne 0 ]; then + echo "❌ 上传到 $SERVER 失败,跳过" + continue + fi + + echo "在 $SERVER 上操作 systemctl 和文件替换" + ssh "$SERVER" bash </dev/null || true + echo "已清空 $DIR" + else + echo "$DIR 不存在,跳过" + fi +done +EOF + done +} + +# 控制 rustfs 服务 +stop_rustfs() { + for SERVER in "${SERVER_LIST[@]}"; do + echo "停止 $SERVER rustfs 服务" + ssh "$SERVER" "sudo systemctl stop rustfs" + done +} + +start_rustfs() { + for SERVER in "${SERVER_LIST[@]}"; do + echo "启动 $SERVER rustfs 服务" + ssh "$SERVER" "sudo systemctl start rustfs" + done +} + +restart_rustfs() { + for SERVER in "${SERVER_LIST[@]}"; do + echo "重启 $SERVER rustfs 服务" + ssh "$SERVER" "sudo systemctl restart rustfs" + done +} + +# 向所有服务器追加公钥到 ~/.ssh/authorized_keys +add_ssh_key() { + if [ -z "$2" ]; then + echo "用法: $0 addkey " + exit 1 + fi + PUBKEY_FILE="$2" + if [ ! -f "$PUBKEY_FILE" ]; then + echo "指定的公钥文件不存在: $PUBKEY_FILE" + exit 1 + fi + PUBKEY_CONTENT=$(cat "$PUBKEY_FILE") + for SERVER in "${SERVER_LIST[@]}"; do + echo "追加公钥到 $SERVER:~/.ssh/authorized_keys" + ssh "$SERVER" "mkdir -p ~/.ssh && chmod 700 ~/.ssh && echo '$PUBKEY_CONTENT' >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys" + if [ $? -eq 0 ]; then + echo "✅ $SERVER 公钥追加成功" + else + echo "❌ $SERVER 公钥追加失败" + fi + done +} + +monitor_logs() { + for SERVER in "${SERVER_LIST[@]}"; do + echo "监控 $SERVER:/var/logs/rustfs/rustfs.log ..." + ssh "$SERVER" "tail -F /var/logs/rustfs/rustfs.log" | + sed "s/^/[$SERVER] /" & + done + wait +} + +set_env_file() { + if [ -z "$2" ]; then + echo "用法: $0 setenv " + exit 1 + fi + ENV_FILE="$2" + if [ ! -f "$ENV_FILE" ]; then + echo "指定的环境变量文件不存在: $ENV_FILE" + exit 1 + fi + for SERVER in "${SERVER_LIST[@]}"; do + echo "上传 $ENV_FILE 到 $SERVER:~/rustfs.env" + scp "$ENV_FILE" "${SERVER}:~/rustfs.env" + if [ $? -ne 0 ]; then + echo "❌ 上传到 $SERVER 失败,跳过" + continue + fi + echo "覆盖 $SERVER:/etc/default/rustfs" + ssh "$SERVER" "sudo mv ~/rustfs.env /etc/default/rustfs" + if [ $? -eq 0 ]; then + echo "✅ $SERVER /etc/default/rustfs 覆盖成功" + else + echo "❌ $SERVER /etc/default/rustfs 覆盖失败" + fi + done +} + +# 主命令分发 +case "$1" in + deploy) + deploy + ;; + clear) + clear_data_dirs + ;; + stop) + stop_rustfs + ;; + start) + start_rustfs + ;; + restart) + restart_rustfs + ;; + addkey) + add_ssh_key "$@" + ;; + monitor_logs) + monitor_logs + ;; + setenv) + set_env_file "$@" + ;; + *) + echo "用法: $0 {deploy|clear|stop|start|restart|addkey |monitor_logs|setenv }" + ;; +esac \ No newline at end of file