feat(grpc): walk_dir http

fix(ecstore): rebalance loop
This commit is contained in:
weisd
2025-06-13 18:07:40 +08:00
parent ac4f1400fc
commit 52342f2f8e
24 changed files with 940 additions and 276 deletions

View File

@@ -79,3 +79,13 @@ build: BUILD_CMD = /root/.cargo/bin/cargo build --release --bin rustfs --target-
build:
$(DOCKER_CLI) build -t $(ROCKYLINUX_BUILD_IMAGE_NAME) -f $(DOCKERFILE_PATH)/Dockerfile.$(BUILD_OS) .
$(DOCKER_CLI) run --rm --name $(ROCKYLINUX_BUILD_CONTAINER_NAME) -v $(shell pwd):/root/s3-rustfs -it $(ROCKYLINUX_BUILD_IMAGE_NAME) $(BUILD_CMD)
.PHONY: build-musl
build-musl:
@echo "🔨 Building rustfs for x86_64-unknown-linux-musl..."
cargo build --target x86_64-unknown-linux-musl --bin rustfs -r
.PHONY: deploy-dev
deploy-dev: build-musl
@echo "🚀 Deploying to dev server: $${IP}"
./scripts/dev_deploy.sh $${IP}

View File

@@ -111,7 +111,20 @@ impl Clone for Error {
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
match e.kind() {
std::io::ErrorKind::UnexpectedEof => Error::Unexpected,
_ => Error::Io(e),
}
}
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
match e {
Error::Unexpected => std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Unexpected EOF"),
Error::Io(e) => e,
_ => std::io::Error::other(e.to_string()),
}
}
}

View File

@@ -3,6 +3,7 @@ use futures::{Stream, StreamExt};
use http::HeaderMap;
use pin_project_lite::pin_project;
use reqwest::{Client, Method, RequestBuilder};
use std::error::Error as _;
use std::io::{self, Error};
use std::pin::Pin;
use std::sync::LazyLock;
@@ -43,12 +44,18 @@ pin_project! {
}
impl HttpReader {
pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result<Self> {
pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option<Vec<u8>>) -> io::Result<Self> {
http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}");
Self::with_capacity(url, method, headers, 0).await
Self::with_capacity(url, method, headers, body, 0).await
}
/// Create a new HttpReader from a URL. The request is performed immediately.
pub async fn with_capacity(url: String, method: Method, headers: HeaderMap, mut read_buf_size: usize) -> io::Result<Self> {
pub async fn with_capacity(
url: String,
method: Method,
headers: HeaderMap,
body: Option<Vec<u8>>,
mut read_buf_size: usize,
) -> io::Result<Self> {
http_log!(
"[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}",
read_buf_size
@@ -60,12 +67,12 @@ impl HttpReader {
Ok(resp) => {
http_log!("[HttpReader::new] HEAD status: {}", resp.status());
if !resp.status().is_success() {
return Err(Error::other(format!("HEAD failed: status {}", resp.status())));
return Err(Error::other(format!("HEAD failed: url: {}, status {}", url, resp.status())));
}
}
Err(e) => {
http_log!("[HttpReader::new] HEAD error: {e}");
return Err(Error::other(format!("HEAD request failed: {e}")));
return Err(Error::other(e.source().map(|s| s.to_string()).unwrap_or_else(|| e.to_string())));
}
}
@@ -80,7 +87,10 @@ impl HttpReader {
let (err_tx, err_rx) = oneshot::channel::<io::Error>();
tokio::spawn(async move {
let client = get_http_client();
let request: RequestBuilder = client.request(method_clone, url_clone).headers(headers_clone);
let mut request: RequestBuilder = client.request(method_clone, url_clone).headers(headers_clone);
if let Some(body) = body {
request = request.body(body);
}
let response = request.send().await;
match response {

View File

@@ -1,7 +1,7 @@
use crate::disk::error::DiskError;
use crate::disk::{self, DiskAPI, DiskStore, WalkDirOptions};
use futures::future::join_all;
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader};
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{spawn, sync::broadcast::Receiver as B_Receiver};
use tracing::error;
@@ -50,7 +50,6 @@ impl Clone for ListPathRawOptions {
}
pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -> disk::error::Result<()> {
// println!("list_path_raw {},{}", &opts.bucket, &opts.path);
if opts.disks.is_empty() {
return Err(DiskError::other("list_path_raw: 0 drives provided"));
}
@@ -59,12 +58,13 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
let mut readers = Vec::with_capacity(opts.disks.len());
let fds = Arc::new(opts.fallback_disks.clone());
let (cancel_tx, cancel_rx) = tokio::sync::broadcast::channel::<bool>(1);
for disk in opts.disks.iter() {
let opdisk = disk.clone();
let opts_clone = opts.clone();
let fds_clone = fds.clone();
// let (m_tx, m_rx) = mpsc::channel::<MetaCacheEntry>(100);
// readers.push(m_rx);
let mut cancel_rx_clone = cancel_rx.resubscribe();
let (rd, mut wr) = tokio::io::duplex(64);
readers.push(MetacacheReader::new(rd));
jobs.push(spawn(async move {
@@ -92,7 +92,13 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
need_fallback = true;
}
if cancel_rx_clone.try_recv().is_ok() {
// warn!("list_path_raw: cancel_rx_clone.try_recv().await.is_ok()");
return Ok(());
}
while need_fallback {
// warn!("list_path_raw: while need_fallback start");
let disk = match fds_clone.iter().find(|d| d.is_some()) {
Some(d) => {
if let Some(disk) = d.clone() {
@@ -130,6 +136,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
}
// warn!("list_path_raw: while need_fallback done");
Ok(())
}));
}
@@ -143,9 +150,15 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
loop {
let mut current = MetaCacheEntry::default();
// warn!(
// "list_path_raw: loop start, bucket: {}, path: {}, current: {:?}",
// opts.bucket, opts.path, &current.name
// );
if rx.try_recv().is_ok() {
return Err(DiskError::other("canceled"));
}
let mut top_entries: Vec<Option<MetaCacheEntry>> = vec![None; readers.len()];
let mut at_eof = 0;
@@ -168,31 +181,47 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
} else {
// eof
at_eof += 1;
// warn!("list_path_raw: peek eof, disk: {}", i);
continue;
}
}
Err(err) => {
if err == rustfs_filemeta::Error::Unexpected {
at_eof += 1;
// warn!("list_path_raw: peek err eof, disk: {}", i);
continue;
} else if err == rustfs_filemeta::Error::FileNotFound {
}
// warn!("list_path_raw: peek err00, err: {:?}", err);
if is_io_eof(&err) {
at_eof += 1;
// warn!("list_path_raw: peek eof, disk: {}", i);
continue;
}
if err == rustfs_filemeta::Error::FileNotFound {
at_eof += 1;
fnf += 1;
// warn!("list_path_raw: peek fnf, disk: {}", i);
continue;
} else if err == rustfs_filemeta::Error::VolumeNotFound {
at_eof += 1;
fnf += 1;
vnf += 1;
// warn!("list_path_raw: peek vnf, disk: {}", i);
continue;
} else {
has_err += 1;
errs[i] = Some(err.into());
// warn!("list_path_raw: peek err, disk: {}", i);
continue;
}
}
};
// warn!("list_path_raw: loop entry: {:?}, disk: {}", &entry.name, i);
// If no current, add it.
if current.name.is_empty() {
top_entries[i] = Some(entry.clone());
@@ -228,10 +257,12 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
if vnf > 0 && vnf >= (readers.len() - opts.min_disks) {
// warn!("list_path_raw: vnf > 0 && vnf >= (readers.len() - opts.min_disks) break");
return Err(DiskError::VolumeNotFound);
}
if fnf > 0 && fnf >= (readers.len() - opts.min_disks) {
// warn!("list_path_raw: fnf > 0 && fnf >= (readers.len() - opts.min_disks) break");
return Err(DiskError::FileNotFound);
}
@@ -250,6 +281,10 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
_ => {}
});
error!(
"list_path_raw: has_err > 0 && has_err > opts.disks.len() - opts.min_disks break, err: {:?}",
&combined_err.join(", ")
);
return Err(DiskError::other(combined_err.join(", ")));
}
@@ -263,6 +298,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
}
// error!("list_path_raw: at_eof + has_err == readers.len() break {:?}", &errs);
break;
}
@@ -272,12 +308,16 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
if let Some(agreed_fn) = opts.agreed.as_ref() {
// warn!("list_path_raw: agreed_fn start, current: {:?}", &current.name);
agreed_fn(current).await;
// warn!("list_path_raw: agreed_fn done");
}
continue;
}
// warn!("list_path_raw: skip start, current: {:?}", &current.name);
for (i, r) in readers.iter_mut().enumerate() {
if top_entries[i].is_some() {
let _ = r.skip(1).await;
@@ -291,7 +331,12 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
Ok(())
});
jobs.push(revjob);
if let Err(err) = revjob.await.map_err(std::io::Error::other)? {
error!("list_path_raw: revjob err {:?}", err);
let _ = cancel_tx.send(true);
return Err(err);
}
let results = join_all(jobs).await;
for result in results {
@@ -300,5 +345,6 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
}
// warn!("list_path_raw: done");
Ok(())
}

View File

@@ -41,6 +41,7 @@ pub async fn read_config_with_metadata<S: StorageAPI>(
if err == Error::FileNotFound || matches!(err, Error::ObjectNotFound(_, _)) {
Error::ConfigNotFound
} else {
warn!("read_config_with_metadata: err: {:?}, file: {}", err, file);
err
}
})?;
@@ -92,9 +93,19 @@ pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()>
}
pub async fn save_config_with_opts<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>, opts: &ObjectOptions) -> Result<()> {
let _ = api
warn!(
"save_config_with_opts, bucket: {}, file: {}, data len: {}",
RUSTFS_META_BUCKET,
file,
data.len()
);
if let Err(err) = api
.put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::from_vec(data), opts)
.await?;
.await
{
warn!("save_config_with_opts: err: {:?}, file: {}", err, file);
return Err(err);
}
Ok(())
}

View File

@@ -124,7 +124,7 @@ pub enum DiskError {
#[error("erasure read quorum")]
ErasureReadQuorum,
#[error("io error")]
#[error("io error {0}")]
Io(io::Error),
}

View File

@@ -773,7 +773,7 @@ impl LocalDisk {
Ok(res) => res,
Err(e) => {
if e != DiskError::VolumeNotFound && e != Error::FileNotFound {
info!("scan list_dir {}, err {:?}", &current, &e);
warn!("scan list_dir {}, err {:?}", &current, &e);
}
if opts.report_notfound && e == Error::FileNotFound && current == &opts.base_dir {
@@ -785,6 +785,7 @@ impl LocalDisk {
};
if entries.is_empty() {
warn!("scan list_dir {}, entries is empty", &current);
return Ok(());
}
@@ -800,6 +801,7 @@ impl LocalDisk {
let entry = item.clone();
// check limit
if opts.limit > 0 && *objs_returned >= opts.limit {
warn!("scan list_dir {}, limit reached", &current);
return Ok(());
}
// check prefix
@@ -843,13 +845,14 @@ impl LocalDisk {
let name = decode_dir_object(format!("{}/{}", &current, &name).as_str());
out.write_obj(&MetaCacheEntry {
name,
name: name.clone(),
metadata,
..Default::default()
})
.await?;
*objs_returned += 1;
// warn!("scan list_dir {}, write_obj done, name: {:?}", &current, &name);
return Ok(());
}
}
@@ -870,6 +873,7 @@ impl LocalDisk {
for entry in entries.iter() {
if opts.limit > 0 && *objs_returned >= opts.limit {
// warn!("scan list_dir {}, limit reached 2", &current);
return Ok(());
}
@@ -945,6 +949,7 @@ impl LocalDisk {
while let Some(dir) = dir_stack.pop() {
if opts.limit > 0 && *objs_returned >= opts.limit {
// warn!("scan list_dir {}, limit reached 3", &current);
return Ok(());
}
@@ -965,6 +970,7 @@ impl LocalDisk {
}
}
// warn!("scan list_dir {}, done", &current);
Ok(())
}
}
@@ -1568,6 +1574,11 @@ impl DiskAPI for LocalDisk {
let mut current = opts.base_dir.clone();
self.scan_dir(&mut current, &opts, &mut out, &mut objs_returned).await?;
warn!(
"walk_dir: done, volume_dir: {:?}, base_dir: {}",
volume_dir.to_string_lossy(),
opts.base_dir
);
Ok(())
}

View File

@@ -2,20 +2,20 @@ use std::path::PathBuf;
use bytes::Bytes;
use futures::lock::Mutex;
use http::{HeaderMap, Method};
use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE};
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest,
ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
},
};
use rmp_serde::Serializer;
use rustfs_filemeta::{FileInfo, MetaCacheEntry, MetacacheWriter, RawFileInfo};
use rustfs_filemeta::{FileInfo, RawFileInfo};
use rustfs_rio::{HttpReader, HttpWriter};
use serde::Serialize;
use tokio::{
io::AsyncWrite,
sync::mpsc::{self, Sender},
@@ -256,47 +256,55 @@ impl DiskAPI for RemoteDisk {
Ok(())
}
// FIXME: TODO: use writer
#[tracing::instrument(skip(self, wr))]
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
let now = std::time::SystemTime::now();
info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix);
let mut wr = wr;
let mut out = MetacacheWriter::new(&mut wr);
let mut buf = Vec::new();
opts.serialize(&mut Serializer::new(&mut buf))?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {}", err)))?;
let request = Request::new(WalkDirRequest {
disk: self.endpoint.to_string(),
walk_dir_options: buf.into(),
});
let mut response = client.walk_dir(request).await?.into_inner();
// // FIXME: TODO: use writer
// #[tracing::instrument(skip(self, wr))]
// async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
// let now = std::time::SystemTime::now();
// info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix);
// let mut wr = wr;
// let mut out = MetacacheWriter::new(&mut wr);
// let mut buf = Vec::new();
// opts.serialize(&mut Serializer::new(&mut buf))?;
// let mut client = node_service_time_out_client(&self.addr)
// .await
// .map_err(|err| Error::other(format!("can not get client, err: {}", err)))?;
// let request = Request::new(WalkDirRequest {
// disk: self.endpoint.to_string(),
// walk_dir_options: buf.into(),
// });
// let mut response = client.walk_dir(request).await?.into_inner();
loop {
match response.next().await {
Some(Ok(resp)) => {
if !resp.success {
return Err(Error::other(resp.error_info.unwrap_or_default()));
}
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
.map_err(|_| Error::other(format!("Unexpected response: {:?}", response)))?;
out.write_obj(&entry).await?;
}
None => break,
_ => return Err(Error::other(format!("Unexpected response: {:?}", response))),
}
}
// loop {
// match response.next().await {
// Some(Ok(resp)) => {
// if !resp.success {
// if let Some(err) = resp.error_info {
// if err == "Unexpected EOF" {
// return Err(Error::Io(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, err)));
// } else {
// return Err(Error::other(err));
// }
// }
info!(
"walk_dir {}/{:?} done {:?}",
opts.bucket,
opts.filter_prefix,
now.elapsed().unwrap_or_default()
);
Ok(())
}
// return Err(Error::other("unknown error"));
// }
// let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
// .map_err(|_| Error::other(format!("Unexpected response: {:?}", response)))?;
// out.write_obj(&entry).await?;
// }
// None => break,
// _ => return Err(Error::other(format!("Unexpected response: {:?}", response))),
// }
// }
// info!(
// "walk_dir {}/{:?} done {:?}",
// opts.bucket,
// opts.filter_prefix,
// now.elapsed().unwrap_or_default()
// );
// Ok(())
// }
#[tracing::instrument(skip(self))]
async fn delete_version(
@@ -559,6 +567,28 @@ impl DiskAPI for RemoteDisk {
Ok(response.volumes)
}
#[tracing::instrument(skip(self, wr))]
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
info!("walk_dir {}", self.endpoint.to_string());
let url = format!(
"{}/rustfs/rpc/walk_dir?disk={}",
self.endpoint.grid_host(),
urlencoding::encode(self.endpoint.to_string().as_str()),
);
let opts = serde_json::to_vec(&opts)?;
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let mut reader = HttpReader::new(url, Method::GET, headers, Some(opts)).await?;
tokio::io::copy(&mut reader, wr).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file {}/{}", volume, path);
@@ -573,7 +603,7 @@ impl DiskAPI for RemoteDisk {
0
);
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new()).await?))
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new(), None).await?))
}
#[tracing::instrument(level = "debug", skip(self))]
@@ -589,7 +619,7 @@ impl DiskAPI for RemoteDisk {
length
);
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new()).await?))
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new(), None).await?))
}
#[tracing::instrument(level = "debug", skip(self))]

View File

@@ -242,12 +242,14 @@ impl Erasure {
}
if !reader.can_decode(&shards) {
error!("erasure decode can_decode errs: {:?}", &errs);
ret_err = Some(Error::ErasureReadQuorum.into());
break;
}
// Decode the shards
if let Err(e) = self.decode_data(&mut shards) {
error!("erasure decode decode_data err: {:?}", e);
ret_err = Some(e);
break;
}
@@ -255,6 +257,7 @@ impl Erasure {
let n = match write_data_blocks(writer, &shards, self.data_shards, block_offset, block_length).await {
Ok(n) => n,
Err(e) => {
error!("erasure decode write_data_blocks err: {:?}", e);
ret_err = Some(e);
break;
}

View File

@@ -8,6 +8,7 @@ use std::sync::Arc;
use std::vec;
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
use tracing::error;
pub(crate) struct MultiWriter<'a> {
writers: &'a mut [Option<BitrotWriterWrapper>],
@@ -60,6 +61,13 @@ impl<'a> MultiWriter<'a> {
}
if let Some(write_err) = reduce_write_quorum_errs(&self.errs, OBJECT_OP_IGNORED_ERRS, self.write_quorum) {
error!(
"reduce_write_quorum_errs: {:?}, offline-disks={}/{}, errs={:?}",
write_err,
count_errs(&self.errs, &Error::DiskNotFound),
self.writers.len(),
self.errs
);
return Err(std::io::Error::other(format!(
"Failed to write data: {} (offline-disks={}/{})",
write_err,

View File

@@ -143,7 +143,11 @@ impl NotificationSys {
#[tracing::instrument(skip(self))]
pub async fn load_rebalance_meta(&self, start: bool) {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {
for (i, client) in self.peer_clients.iter().flatten().enumerate() {
warn!(
"notification load_rebalance_meta start: {}, index: {}, client: {:?}",
start, i, client.host
);
futures.push(client.load_rebalance_meta(start));
}
@@ -158,11 +162,16 @@ impl NotificationSys {
}
pub async fn stop_rebalance(&self) {
warn!("notification stop_rebalance start");
let Some(store) = new_object_layer_fn() else {
error!("stop_rebalance: not init");
return;
};
// warn!("notification stop_rebalance load_rebalance_meta");
// self.load_rebalance_meta(false).await;
// warn!("notification stop_rebalance load_rebalance_meta done");
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {
futures.push(client.stop_rebalance());
@@ -175,7 +184,9 @@ impl NotificationSys {
}
}
warn!("notification stop_rebalance stop_rebalance start");
let _ = store.stop_rebalance().await;
warn!("notification stop_rebalance stop_rebalance done");
}
}

View File

@@ -664,7 +664,7 @@ impl PeerRestClient {
let response = client.load_rebalance_meta(request).await?.into_inner();
warn!("load_rebalance_meta response {:?}", response);
warn!("load_rebalance_meta response {:?}, grid_host: {:?}", response, &self.grid_host);
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::other(msg));

View File

@@ -1,7 +1,3 @@
use std::io::Cursor;
use std::sync::Arc;
use std::time::SystemTime;
use crate::StorageAPI;
use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw};
use crate::config::com::{read_config_with_metadata, save_config_with_opts};
@@ -19,16 +15,18 @@ use rustfs_filemeta::{FileInfo, MetaCacheEntries, MetaCacheEntry, MetadataResolu
use rustfs_rio::HashReader;
use rustfs_utils::path::encode_dir_object;
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::io::AsyncReadExt;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
use tokio::time::{Duration, Instant};
use tracing::{error, info, warn};
use uuid::Uuid;
use workers::workers::Workers;
const REBAL_META_FMT: u16 = 1; // Replace with actual format value
const REBAL_META_VER: u16 = 1; // Replace with actual version value
const REBAL_META_NAME: &str = "rebalance_meta";
const REBAL_META_NAME: &str = "rebalance.bin";
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct RebalanceStats {
@@ -123,9 +121,9 @@ pub enum RebalSaveOpt {
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct RebalanceInfo {
#[serde(rename = "startTs")]
pub start_time: Option<SystemTime>, // Time at which rebalance-start was issued
pub start_time: Option<OffsetDateTime>, // Time at which rebalance-start was issued
#[serde(rename = "stopTs")]
pub end_time: Option<SystemTime>, // Time at which rebalance operation completed or rebalance-stop was called
pub end_time: Option<OffsetDateTime>, // Time at which rebalance operation completed or rebalance-stop was called
#[serde(rename = "status")]
pub status: RebalStatus, // Current state of rebalance operation
}
@@ -137,14 +135,14 @@ pub struct DiskStat {
pub available_space: u64,
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct RebalanceMeta {
#[serde(skip)]
pub cancel: Option<broadcast::Sender<bool>>, // To be invoked on rebalance-stop
#[serde(skip)]
pub last_refreshed_at: Option<SystemTime>,
pub last_refreshed_at: Option<OffsetDateTime>,
#[serde(rename = "stopTs")]
pub stopped_at: Option<SystemTime>, // Time when rebalance-stop was issued
pub stopped_at: Option<OffsetDateTime>, // Time when rebalance-stop was issued
#[serde(rename = "id")]
pub id: String, // ID of the ongoing rebalance operation
#[serde(rename = "pf")]
@@ -164,29 +162,29 @@ impl RebalanceMeta {
pub async fn load_with_opts<S: StorageAPI>(&mut self, store: Arc<S>, opts: ObjectOptions) -> Result<()> {
let (data, _) = read_config_with_metadata(store, REBAL_META_NAME, &opts).await?;
if data.is_empty() {
warn!("rebalanceMeta: no data");
warn!("rebalanceMeta load_with_opts: no data");
return Ok(());
}
if data.len() <= 4 {
return Err(Error::other("rebalanceMeta: no data"));
return Err(Error::other("rebalanceMeta load_with_opts: no data"));
}
// Read header
match u16::from_le_bytes([data[0], data[1]]) {
REBAL_META_FMT => {}
fmt => return Err(Error::other(format!("rebalanceMeta: unknown format: {}", fmt))),
fmt => return Err(Error::other(format!("rebalanceMeta load_with_opts: unknown format: {}", fmt))),
}
match u16::from_le_bytes([data[2], data[3]]) {
REBAL_META_VER => {}
ver => return Err(Error::other(format!("rebalanceMeta: unknown version: {}", ver))),
ver => return Err(Error::other(format!("rebalanceMeta load_with_opts: unknown version: {}", ver))),
}
let meta: Self = rmp_serde::from_read(Cursor::new(&data[4..]))?;
*self = meta;
self.last_refreshed_at = Some(SystemTime::now());
self.last_refreshed_at = Some(OffsetDateTime::now_utc());
warn!("rebalanceMeta: loaded meta done");
warn!("rebalanceMeta load_with_opts: loaded meta done");
Ok(())
}
@@ -196,6 +194,7 @@ impl RebalanceMeta {
pub async fn save_with_opts<S: StorageAPI>(&self, store: Arc<S>, opts: ObjectOptions) -> Result<()> {
if self.pool_stats.is_empty() {
warn!("rebalanceMeta save_with_opts: no pool stats");
return Ok(());
}
@@ -218,7 +217,7 @@ impl ECStore {
#[tracing::instrument(skip_all)]
pub async fn load_rebalance_meta(&self) -> Result<()> {
let mut meta = RebalanceMeta::new();
warn!("rebalanceMeta: load rebalance meta");
warn!("rebalanceMeta: store load rebalance meta");
match meta.load(self.pools[0].clone()).await {
Ok(_) => {
warn!("rebalanceMeta: rebalance meta loaded0");
@@ -255,9 +254,18 @@ impl ECStore {
pub async fn update_rebalance_stats(&self) -> Result<()> {
let mut ok = false;
let pool_stats = {
let rebalance_meta = self.rebalance_meta.read().await;
rebalance_meta.as_ref().map(|v| v.pool_stats.clone()).unwrap_or_default()
};
warn!("update_rebalance_stats: pool_stats: {:?}", &pool_stats);
for i in 0..self.pools.len() {
if self.find_index(i).await.is_none() {
if pool_stats.get(i).is_none() {
warn!("update_rebalance_stats: pool {} not found", i);
let mut rebalance_meta = self.rebalance_meta.write().await;
warn!("update_rebalance_stats: pool {} not found, add", i);
if let Some(meta) = rebalance_meta.as_mut() {
meta.pool_stats.push(RebalanceStats::default());
}
@@ -267,23 +275,24 @@ impl ECStore {
}
if ok {
let mut rebalance_meta = self.rebalance_meta.write().await;
if let Some(meta) = rebalance_meta.as_mut() {
warn!("update_rebalance_stats: save rebalance meta");
let rebalance_meta = self.rebalance_meta.read().await;
if let Some(meta) = rebalance_meta.as_ref() {
meta.save(self.pools[0].clone()).await?;
}
drop(rebalance_meta);
}
Ok(())
}
async fn find_index(&self, index: usize) -> Option<usize> {
if let Some(meta) = self.rebalance_meta.read().await.as_ref() {
return meta.pool_stats.get(index).map(|_v| index);
}
// async fn find_index(&self, index: usize) -> Option<usize> {
// if let Some(meta) = self.rebalance_meta.read().await.as_ref() {
// return meta.pool_stats.get(index).map(|_v| index);
// }
None
}
// None
// }
#[tracing::instrument(skip(self))]
pub async fn init_rebalance_meta(&self, bucktes: Vec<String>) -> Result<String> {
@@ -310,7 +319,7 @@ impl ECStore {
let mut pool_stats = Vec::with_capacity(self.pools.len());
let now = SystemTime::now();
let now = OffsetDateTime::now_utc();
for disk_stat in disk_stats.iter() {
let mut pool_stat = RebalanceStats {
@@ -369,20 +378,26 @@ impl ECStore {
#[tracing::instrument(skip(self))]
pub async fn next_rebal_bucket(&self, pool_index: usize) -> Result<Option<String>> {
warn!("next_rebal_bucket: pool_index: {}", pool_index);
let rebalance_meta = self.rebalance_meta.read().await;
warn!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta);
if let Some(meta) = rebalance_meta.as_ref() {
if let Some(pool_stat) = meta.pool_stats.get(pool_index) {
if pool_stat.info.status == RebalStatus::Completed || !pool_stat.participating {
warn!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index);
return Ok(None);
}
if pool_stat.buckets.is_empty() {
warn!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index);
return Ok(None);
}
warn!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]);
return Ok(Some(pool_stat.buckets[0].clone()));
}
}
warn!("next_rebal_bucket: pool_index: {} None", pool_index);
Ok(None)
}
@@ -392,18 +407,28 @@ impl ECStore {
if let Some(meta) = rebalance_meta.as_mut() {
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) {
warn!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets);
if let Some(idx) = pool_stat.buckets.iter().position(|b| b.as_str() == bucket.as_str()) {
warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket);
pool_stat.buckets.remove(idx);
pool_stat.rebalanced_buckets.push(bucket);
// 使用 retain 来过滤掉要删除的 bucket
let mut found = false;
pool_stat.buckets.retain(|b| {
if b.as_str() == bucket.as_str() {
found = true;
pool_stat.rebalanced_buckets.push(b.clone());
false // 删除这个元素
} else {
true // 保留这个元素
}
});
if found {
warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket);
return Ok(());
} else {
warn!("bucket_rebalance_done: bucket {} not found", bucket);
}
}
}
warn!("bucket_rebalance_done: bucket {} not found", bucket);
Ok(())
}
@@ -411,18 +436,28 @@ impl ECStore {
let rebalance_meta = self.rebalance_meta.read().await;
if let Some(ref meta) = *rebalance_meta {
if meta.stopped_at.is_some() {
warn!("is_rebalance_started: rebalance stopped");
return false;
}
meta.pool_stats.iter().enumerate().for_each(|(i, v)| {
warn!(
"is_rebalance_started: pool_index: {}, participating: {:?}, status: {:?}",
i, v.participating, v.info.status
);
});
if meta
.pool_stats
.iter()
.any(|v| v.participating && v.info.status != RebalStatus::Completed)
{
warn!("is_rebalance_started: rebalance started");
return true;
}
}
warn!("is_rebalance_started: rebalance not started");
false
}
@@ -462,6 +497,7 @@ impl ECStore {
{
let mut rebalance_meta = self.rebalance_meta.write().await;
if let Some(meta) = rebalance_meta.as_mut() {
meta.cancel = Some(tx)
} else {
@@ -474,19 +510,25 @@ impl ECStore {
let participants = {
if let Some(ref meta) = *self.rebalance_meta.read().await {
if meta.stopped_at.is_some() {
warn!("start_rebalance: rebalance already stopped exit");
return;
}
// if meta.stopped_at.is_some() {
// warn!("start_rebalance: rebalance already stopped exit");
// return;
// }
let mut participants = vec![false; meta.pool_stats.len()];
for (i, pool_stat) in meta.pool_stats.iter().enumerate() {
if pool_stat.info.status == RebalStatus::Started {
participants[i] = pool_stat.participating;
warn!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status);
if pool_stat.info.status != RebalStatus::Started {
warn!("start_rebalance: pool {} not started, skipping", i);
continue;
}
warn!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating);
participants[i] = pool_stat.participating;
}
participants
} else {
warn!("start_rebalance:2 rebalance_meta is None exit");
Vec::new()
}
};
@@ -497,11 +539,13 @@ impl ECStore {
continue;
}
if get_global_endpoints()
.as_ref()
.get(idx)
.is_none_or(|v| v.endpoints.as_ref().first().is_none_or(|e| e.is_local))
{
if !get_global_endpoints().as_ref().get(idx).is_some_and(|v| {
warn!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints);
v.endpoints.as_ref().first().is_some_and(|e| {
warn!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local);
e.is_local
})
}) {
warn!("start_rebalance: pool {} is not local, skipping", idx);
continue;
}
@@ -522,13 +566,13 @@ impl ECStore {
}
#[tracing::instrument(skip(self, rx))]
async fn rebalance_buckets(self: &Arc<Self>, rx: B_Receiver<bool>, pool_index: usize) -> Result<()> {
async fn rebalance_buckets(self: &Arc<Self>, mut rx: B_Receiver<bool>, pool_index: usize) -> Result<()> {
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<Result<()>>(1);
// Save rebalance metadata periodically
let store = self.clone();
let save_task = tokio::spawn(async move {
let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(10), Duration::from_secs(10));
let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(30), Duration::from_secs(10));
let mut msg: String;
let mut quit = false;
@@ -537,14 +581,15 @@ impl ECStore {
// TODO: cancel rebalance
Some(result) = done_rx.recv() => {
quit = true;
let now = SystemTime::now();
let now = OffsetDateTime::now_utc();
let state = match result {
Ok(_) => {
warn!("rebalance_buckets: completed");
msg = format!("Rebalance completed at {:?}", now);
RebalStatus::Completed},
Err(err) => {
warn!("rebalance_buckets: error: {:?}", err);
// TODO: check stop
if err.to_string().contains("canceled") {
msg = format!("Rebalance stopped at {:?}", now);
@@ -557,9 +602,11 @@ impl ECStore {
};
{
warn!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state);
let mut rebalance_meta = store.rebalance_meta.write().await;
if let Some(rbm) = rebalance_meta.as_mut() {
warn!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state);
rbm.pool_stats[pool_index].info.status = state;
rbm.pool_stats[pool_index].info.end_time = Some(now);
}
@@ -568,7 +615,7 @@ impl ECStore {
}
_ = timer.tick() => {
let now = SystemTime::now();
let now = OffsetDateTime::now_utc();
msg = format!("Saving rebalance metadata at {:?}", now);
}
}
@@ -576,7 +623,7 @@ impl ECStore {
if let Err(err) = store.save_rebalance_stats(pool_index, RebalSaveOpt::Stats).await {
error!("{} err: {:?}", msg, err);
} else {
info!(msg);
warn!(msg);
}
if quit {
@@ -588,30 +635,41 @@ impl ECStore {
}
});
warn!("Pool {} rebalancing is started", pool_index + 1);
warn!("Pool {} rebalancing is started", pool_index);
while let Some(bucket) = self.next_rebal_bucket(pool_index).await? {
warn!("Rebalancing bucket: start {}", bucket);
if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await {
if err.to_string().contains("not initialized") {
warn!("rebalance_bucket: rebalance not initialized, continue");
continue;
}
error!("Error rebalancing bucket {}: {:?}", bucket, err);
done_tx.send(Err(err)).await.ok();
loop {
if let Ok(true) = rx.try_recv() {
warn!("Pool {} rebalancing is stopped", pool_index);
done_tx.send(Err(Error::other("rebalance stopped canceled"))).await.ok();
break;
}
warn!("Rebalance bucket: done {} ", bucket);
self.bucket_rebalance_done(pool_index, bucket).await?;
if let Some(bucket) = self.next_rebal_bucket(pool_index).await? {
warn!("Rebalancing bucket: start {}", bucket);
if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await {
if err.to_string().contains("not initialized") {
warn!("rebalance_bucket: rebalance not initialized, continue");
continue;
}
error!("Error rebalancing bucket {}: {:?}", bucket, err);
done_tx.send(Err(err)).await.ok();
break;
}
warn!("Rebalance bucket: done {} ", bucket);
self.bucket_rebalance_done(pool_index, bucket).await?;
} else {
warn!("Rebalance bucket: no bucket to rebalance");
break;
}
}
warn!("Pool {} rebalancing is done", pool_index + 1);
warn!("Pool {} rebalancing is done", pool_index);
done_tx.send(Ok(())).await.ok();
save_task.await.ok();
warn!("Pool {} rebalancing is done2", pool_index);
Ok(())
}
@@ -622,6 +680,7 @@ impl ECStore {
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) {
// Check if the pool's rebalance status is already completed
if pool_stat.info.status == RebalStatus::Completed {
warn!("check_if_rebalance_done: pool {} is already completed", pool_index);
return true;
}
@@ -631,7 +690,8 @@ impl ECStore {
// Mark pool rebalance as done if within 5% of the PercentFreeGoal
if (pfi - meta.percent_free_goal).abs() <= 0.05 {
pool_stat.info.status = RebalStatus::Completed;
pool_stat.info.end_time = Some(SystemTime::now());
pool_stat.info.end_time = Some(OffsetDateTime::now_utc());
warn!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi);
return true;
}
}
@@ -641,24 +701,30 @@ impl ECStore {
}
#[allow(unused_assignments)]
#[tracing::instrument(skip(self, wk, set))]
#[tracing::instrument(skip(self, set))]
async fn rebalance_entry(
&self,
bucket: String,
pool_index: usize,
entry: MetaCacheEntry,
set: Arc<SetDisks>,
wk: Arc<Workers>,
// wk: Arc<Workers>,
) {
defer!(|| async {
wk.give().await;
});
warn!("rebalance_entry: start rebalance_entry");
// defer!(|| async {
// warn!("rebalance_entry: defer give worker start");
// wk.give().await;
// warn!("rebalance_entry: defer give worker done");
// });
if entry.is_dir() {
warn!("rebalance_entry: entry is dir, skipping");
return;
}
if self.check_if_rebalance_done(pool_index).await {
warn!("rebalance_entry: rebalance done, skipping pool {}", pool_index);
return;
}
@@ -666,6 +732,7 @@ impl ECStore {
Ok(fivs) => fivs,
Err(err) => {
error!("rebalance_entry Error getting file info versions: {}", err);
warn!("rebalance_entry: Error getting file info versions, skipping");
return;
}
};
@@ -676,7 +743,7 @@ impl ECStore {
let expired: usize = 0;
for version in fivs.versions.iter() {
if version.is_remote() {
info!("rebalance_entry Entry {} is remote, skipping", version.name);
warn!("rebalance_entry Entry {} is remote, skipping", version.name);
continue;
}
// TODO: filterLifecycle
@@ -684,7 +751,7 @@ impl ECStore {
let remaining_versions = fivs.versions.len() - expired;
if version.deleted && remaining_versions == 1 {
rebalanced += 1;
info!("rebalance_entry Entry {} is deleted and last version, skipping", version.name);
warn!("rebalance_entry Entry {} is deleted and last version, skipping", version.name);
continue;
}
let version_id = version.version_id.map(|v| v.to_string());
@@ -735,6 +802,7 @@ impl ECStore {
}
for _i in 0..3 {
warn!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name);
let rd = match set
.get_object_reader(
bucket.as_str(),
@@ -753,6 +821,10 @@ impl ECStore {
Err(err) => {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) {
ignore = true;
warn!(
"rebalance_entry: get_object_reader, bucket: {}, version: {}, ignore",
&bucket, &version.name
);
break;
}
@@ -765,7 +837,7 @@ impl ECStore {
if let Err(err) = self.rebalance_object(pool_index, bucket.clone(), rd).await {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
ignore = true;
info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
break;
}
@@ -780,7 +852,7 @@ impl ECStore {
}
if ignore {
info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
continue;
}
@@ -812,7 +884,7 @@ impl ECStore {
{
error!("rebalance_entry: delete_object err {:?}", &err);
} else {
info!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name);
warn!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name);
}
}
}
@@ -957,26 +1029,29 @@ impl ECStore {
let pool = self.pools[pool_index].clone();
let wk = Workers::new(pool.disk_set.len() * 2).map_err(Error::other)?;
let mut jobs = Vec::new();
// let wk = Workers::new(pool.disk_set.len() * 2).map_err(Error::other)?;
// wk.clone().take().await;
for (set_idx, set) in pool.disk_set.iter().enumerate() {
wk.clone().take().await;
let rebalance_entry: ListCallback = Arc::new({
let this = Arc::clone(self);
let bucket = bucket.clone();
let wk = wk.clone();
// let wk = wk.clone();
let set = set.clone();
move |entry: MetaCacheEntry| {
let this = this.clone();
let bucket = bucket.clone();
let wk = wk.clone();
// let wk = wk.clone();
let set = set.clone();
Box::pin(async move {
wk.take().await;
tokio::spawn(async move {
this.rebalance_entry(bucket, pool_index, entry, set, wk).await;
});
warn!("rebalance_entry: rebalance_entry spawn start");
// wk.take().await;
// tokio::spawn(async move {
warn!("rebalance_entry: rebalance_entry spawn start2");
this.rebalance_entry(bucket, pool_index, entry, set).await;
warn!("rebalance_entry: rebalance_entry spawn done");
// });
})
}
});
@@ -984,62 +1059,68 @@ impl ECStore {
let set = set.clone();
let rx = rx.resubscribe();
let bucket = bucket.clone();
let wk = wk.clone();
tokio::spawn(async move {
// let wk = wk.clone();
let job = tokio::spawn(async move {
if let Err(err) = set.list_objects_to_rebalance(rx, bucket, rebalance_entry).await {
error!("Rebalance worker {} error: {}", set_idx, err);
} else {
info!("Rebalance worker {} done", set_idx);
}
wk.clone().give().await;
// wk.clone().give().await;
});
jobs.push(job);
}
wk.wait().await;
// wk.wait().await;
for job in jobs {
job.await.unwrap();
}
warn!("rebalance_bucket: rebalance_bucket done");
Ok(())
}
#[tracing::instrument(skip(self))]
pub async fn save_rebalance_stats(&self, pool_idx: usize, opt: RebalSaveOpt) -> Result<()> {
// TODO: NSLOOK
// TODO: lock
let mut meta = RebalanceMeta::new();
meta.load_with_opts(
self.pools[0].clone(),
ObjectOptions {
no_lock: true,
..Default::default()
},
)
.await?;
if opt == RebalSaveOpt::StoppedAt {
meta.stopped_at = Some(SystemTime::now());
}
let mut rebalance_meta = self.rebalance_meta.write().await;
if let Some(rb) = rebalance_meta.as_mut() {
if opt == RebalSaveOpt::Stats {
meta.pool_stats[pool_idx] = rb.pool_stats[pool_idx].clone();
if let Err(err) = meta.load(self.pools[0].clone()).await {
if err != Error::ConfigNotFound {
warn!("save_rebalance_stats: load err: {:?}", err);
return Err(err);
}
*rb = meta;
} else {
*rebalance_meta = Some(meta);
}
if let Some(meta) = rebalance_meta.as_mut() {
meta.save_with_opts(
self.pools[0].clone(),
ObjectOptions {
no_lock: true,
..Default::default()
},
)
.await?;
match opt {
RebalSaveOpt::Stats => {
{
let mut rebalance_meta = self.rebalance_meta.write().await;
if let Some(rbm) = rebalance_meta.as_mut() {
meta.pool_stats[pool_idx] = rbm.pool_stats[pool_idx].clone();
}
}
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_idx) {
pool_stat.info.end_time = Some(OffsetDateTime::now_utc());
}
}
RebalSaveOpt::StoppedAt => {
meta.stopped_at = Some(OffsetDateTime::now_utc());
}
}
{
let mut rebalance_meta = self.rebalance_meta.write().await;
*rebalance_meta = Some(meta.clone());
}
warn!(
"save_rebalance_stats: save rebalance meta, pool_idx: {}, opt: {:?}, meta: {:?}",
pool_idx, opt, meta
);
meta.save(self.pools[0].clone()).await?;
Ok(())
}
}
@@ -1052,12 +1133,15 @@ impl SetDisks {
bucket: String,
cb: ListCallback,
) -> Result<()> {
warn!("list_objects_to_rebalance: start list_objects_to_rebalance");
// Placeholder for actual object listing logic
let (disks, _) = self.get_online_disks_with_healing(false).await;
if disks.is_empty() {
warn!("list_objects_to_rebalance: no disk available");
return Err(Error::other("errNoDiskAvailable"));
}
warn!("list_objects_to_rebalance: get online disks with healing");
let listing_quorum = self.set_drive_count.div_ceil(2);
let resolver = MetadataResolutionParams {
@@ -1075,7 +1159,10 @@ impl SetDisks {
bucket: bucket.clone(),
recursice: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))),
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
warn!("list_objects_to_rebalance: agreed: {:?}", &entry.name);
Box::pin(cb1(entry))
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
// let cb = cb.clone();
let resolver = resolver.clone();
@@ -1083,11 +1170,11 @@ impl SetDisks {
match entries.resolve(resolver) {
Some(entry) => {
warn!("rebalance: list_objects_to_decommission get {}", &entry.name);
warn!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name);
Box::pin(async move { cb(entry).await })
}
None => {
warn!("rebalance: list_objects_to_decommission get none");
warn!("list_objects_to_rebalance: list_objects_to_decommission get none");
Box::pin(async {})
}
}
@@ -1097,6 +1184,7 @@ impl SetDisks {
)
.await?;
warn!("list_objects_to_rebalance: list_objects_to_rebalance done");
Ok(())
}
}

View File

@@ -860,6 +860,7 @@ impl SetDisks {
};
if let Some(err) = reduce_read_quorum_errs(errs, OBJECT_OP_IGNORED_ERRS, expected_rquorum) {
error!("object_quorum_from_meta: {:?}, errs={:?}", err, errs);
return Err(err);
}
@@ -872,6 +873,7 @@ impl SetDisks {
let parity_blocks = Self::common_parity(&parities, default_parity_count as i32);
if parity_blocks < 0 {
error!("object_quorum_from_meta: parity_blocks < 0, errs={:?}", errs);
return Err(DiskError::ErasureReadQuorum);
}
@@ -936,6 +938,7 @@ impl SetDisks {
Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count).map_err(map_err_notfound)?;
if read_quorum < 0 {
error!("check_upload_id_exists: read_quorum < 0, errs={:?}", errs);
return Err(Error::ErasureReadQuorum);
}
@@ -977,6 +980,7 @@ impl SetDisks {
quorum: usize,
) -> disk::error::Result<FileInfo> {
if quorum < 1 {
error!("find_file_info_in_quorum: quorum < 1");
return Err(DiskError::ErasureReadQuorum);
}
@@ -1035,6 +1039,7 @@ impl SetDisks {
}
if max_count < quorum {
error!("find_file_info_in_quorum: max_count < quorum, max_val={:?}", max_val);
return Err(DiskError::ErasureReadQuorum);
}
@@ -1079,7 +1084,7 @@ impl SetDisks {
return Ok(fi);
}
warn!("QuorumError::Read, find_file_info_in_quorum fileinfo not found");
error!("find_file_info_in_quorum: fileinfo not found");
Err(DiskError::ErasureReadQuorum)
}
@@ -1763,10 +1768,18 @@ impl SetDisks {
let _min_disks = self.set_drive_count - self.default_parity_count;
let (read_quorum, _) = Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count)
.map_err(|err| to_object_err(err.into(), vec![bucket, object]))?;
let (read_quorum, _) = match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count)
.map_err(|err| to_object_err(err.into(), vec![bucket, object]))
{
Ok(v) => v,
Err(e) => {
error!("Self::object_quorum_from_meta: {:?}, bucket: {}, object: {}", &e, bucket, object);
return Err(e);
}
};
if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum as usize) {
error!("reduce_read_quorum_errs: {:?}, bucket: {}, object: {}", &err, bucket, object);
return Err(to_object_err(err.into(), vec![bucket, object]));
}
@@ -1896,6 +1909,7 @@ impl SetDisks {
let nil_count = errors.iter().filter(|&e| e.is_none()).count();
if nil_count < erasure.data_shards {
if let Some(read_err) = reduce_read_quorum_errs(&errors, OBJECT_OP_IGNORED_ERRS, erasure.data_shards) {
error!("create_bitrot_reader reduce_read_quorum_errs {:?}", &errors);
return Err(to_object_err(read_err.into(), vec![bucket, object]));
}
@@ -2942,6 +2956,7 @@ impl SetDisks {
}
Ok(m)
} else {
error!("delete_if_dang_ling: is_object_dang_ling errs={:?}", errs);
Err(DiskError::ErasureReadQuorum)
}
}
@@ -3004,41 +3019,56 @@ impl SetDisks {
}
let (buckets_results_tx, mut buckets_results_rx) = mpsc::channel::<DataUsageEntryInfo>(disks.len());
// 新增从环境变量读取基础间隔默认30秒
let set_disk_update_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
let update_time = {
let mut rng = rand::rng();
Duration::from_secs(30) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0))
Duration::from_secs(set_disk_update_interval_secs) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0))
};
let mut ticker = interval(update_time);
let task = tokio::spawn(async move {
let last_save = Some(SystemTime::now());
let mut need_loop = true;
while need_loop {
select! {
_ = ticker.tick() => {
if !cache.info.last_update.eq(&last_save) {
let _ = cache.save(DATA_USAGE_CACHE_NAME).await;
let _ = updates.send(cache.clone()).await;
}
}
result = buckets_results_rx.recv() => {
match result {
Some(result) => {
cache.replace(&result.name, &result.parent, result.entry);
cache.info.last_update = Some(SystemTime::now());
},
None => {
need_loop = false;
cache.info.next_cycle = want_cycle;
cache.info.last_update = Some(SystemTime::now());
// 检查是否需要运行后台任务
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK")
.ok()
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false);
let task = if !skip_background_task {
Some(tokio::spawn(async move {
let last_save = Some(SystemTime::now());
let mut need_loop = true;
while need_loop {
select! {
_ = ticker.tick() => {
if !cache.info.last_update.eq(&last_save) {
let _ = cache.save(DATA_USAGE_CACHE_NAME).await;
let _ = updates.send(cache.clone()).await;
}
}
result = buckets_results_rx.recv() => {
match result {
Some(result) => {
cache.replace(&result.name, &result.parent, result.entry);
cache.info.last_update = Some(SystemTime::now());
},
None => {
need_loop = false;
cache.info.next_cycle = want_cycle;
cache.info.last_update = Some(SystemTime::now());
let _ = cache.save(DATA_USAGE_CACHE_NAME).await;
let _ = updates.send(cache.clone()).await;
}
}
}
}
}
}
});
}))
} else {
None
};
// Restrict parallelism for disk usage scanner
let max_procs = num_cpus::get();
@@ -3142,7 +3172,9 @@ impl SetDisks {
info!("ns_scanner start");
let _ = join_all(futures).await;
let _ = task.await;
if let Some(task) = task {
let _ = task.await;
}
info!("ns_scanner completed");
Ok(())
}
@@ -3894,7 +3926,13 @@ impl ObjectIO for SetDisks {
let stream = mem::replace(&mut data.stream, HashReader::new(Box::new(Cursor::new(Vec::new())), 0, 0, None, false)?);
let (reader, w_size) = Arc::new(erasure).encode(stream, &mut writers, write_quorum).await?; // TODO: 出错,删除临时目录
let (reader, w_size) = match Arc::new(erasure).encode(stream, &mut writers, write_quorum).await {
Ok((r, w)) => (r, w),
Err(e) => {
error!("encode err {:?}", e);
return Err(e.into());
}
}; // TODO: 出错,删除临时目录
let _ = mem::replace(&mut data.stream, reader);
// if let Err(err) = close_bitrot_writers(&mut writers).await {

View File

@@ -847,9 +847,26 @@ impl ECStore {
let (update_closer_tx, mut update_close_rx) = mpsc::channel(10);
let mut ctx_clone = cancel.subscribe();
let all_buckets_clone = all_buckets.clone();
// 新增从环境变量读取interval默认30秒
let ns_scanner_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
// 检查是否跳过后台任务
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK")
.ok()
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false);
if skip_background_task {
info!("跳过后台任务执行: RUSTFS_SKIP_BACKGROUND_TASK=true");
return Ok(());
}
let task = tokio::spawn(async move {
let mut last_update: Option<SystemTime> = None;
let mut interval = interval(Duration::from_secs(30));
let mut interval = interval(Duration::from_secs(ns_scanner_interval_secs));
let all_merged = Arc::new(RwLock::new(DataUsageCache::default()));
loop {
select! {

View File

@@ -364,6 +364,7 @@ impl ECStore {
max_keys: i32,
) -> Result<ListObjectVersionsInfo> {
if marker.is_none() && version_marker.is_some() {
warn!("inner_list_object_versions: marker is none and version_marker is some");
return Err(StorageError::NotImplemented);
}

View File

@@ -95,38 +95,45 @@ where
self.clone().save_iam_formatter().await?;
self.clone().load().await?;
// Background thread starts periodic updates or receives signal updates
tokio::spawn({
let s = Arc::clone(&self);
async move {
let ticker = tokio::time::interval(Duration::from_secs(120));
tokio::pin!(ticker, reciver);
loop {
select! {
_ = ticker.tick() => {
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
}
},
i = reciver.recv() => {
match i {
Some(t) => {
let last = s.last_timestamp.load(Ordering::Relaxed);
if last <= t {
// 检查环境变量是否设置
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK").is_ok();
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
if !skip_background_task {
// Background thread starts periodic updates or receives signal updates
tokio::spawn({
let s = Arc::clone(&self);
async move {
let ticker = tokio::time::interval(Duration::from_secs(120));
tokio::pin!(ticker, reciver);
loop {
select! {
_ = ticker.tick() => {
warn!("iam load ticker");
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
}
},
i = reciver.recv() => {
warn!("iam load reciver");
match i {
Some(t) => {
let last = s.last_timestamp.load(Ordering::Relaxed);
if last <= t {
warn!("iam load reciver load");
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
}
ticker.reset();
}
ticker.reset();
}
},
None => return,
},
None => return,
}
}
}
}
}
}
});
});
}
Ok(())
}

View File

@@ -10,7 +10,8 @@ use http::{HeaderMap, StatusCode};
use matchit::Params;
use s3s::{Body, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime};
use std::time::Duration;
use time::OffsetDateTime;
use tracing::warn;
use crate::admin::router::Operation;
@@ -56,8 +57,8 @@ pub struct RebalanceAdminStatus {
pub id: String, // Identifies the ongoing rebalance operation by a UUID
#[serde(rename = "pools")]
pub pools: Vec<RebalancePoolStatus>, // Contains all pools, including inactive
#[serde(rename = "stoppedAt")]
pub stopped_at: Option<SystemTime>, // Optional timestamp when rebalance was stopped
#[serde(rename = "stoppedAt", with = "offsetdatetime_rfc3339")]
pub stopped_at: Option<OffsetDateTime>, // Optional timestamp when rebalance was stopped
}
pub struct RebalanceStart {}
@@ -101,11 +102,13 @@ impl Operation for RebalanceStart {
}
};
store.start_rebalance().await;
warn!("Rebalance started with id: {}", id);
if let Some(notification_sys) = get_global_notification_sys() {
warn!("Loading rebalance meta");
warn!("RebalanceStart Loading rebalance meta start");
notification_sys.load_rebalance_meta(true).await;
warn!("Rebalance meta loaded");
warn!("RebalanceStart Loading rebalance meta done");
}
let resp = RebalanceResp { id };
@@ -175,15 +178,14 @@ impl Operation for RebalanceStatus {
let total_bytes_to_rebal = ps.init_capacity as f64 * meta.percent_free_goal - ps.init_free_space as f64;
let mut elapsed = if let Some(start_time) = ps.info.start_time {
SystemTime::now()
.duration_since(start_time)
.map_err(|e| s3_error!(InternalError, "Failed to calculate elapsed time: {}", e))?
let now = OffsetDateTime::now_utc();
now - start_time
} else {
return Err(s3_error!(InternalError, "Start time is not available"));
};
let mut eta = if ps.bytes > 0 {
Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_secs_f64() / ps.bytes as f64)
Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_seconds_f64() / ps.bytes as f64)
} else {
Duration::ZERO
};
@@ -193,10 +195,8 @@ impl Operation for RebalanceStatus {
}
if let Some(stopped_at) = stop_time {
if let Ok(du) = stopped_at.duration_since(ps.info.start_time.unwrap_or(stopped_at)) {
elapsed = du;
} else {
return Err(s3_error!(InternalError, "Failed to calculate elapsed time"));
if let Some(start_time) = ps.info.start_time {
elapsed = stopped_at - start_time;
}
eta = Duration::ZERO;
@@ -208,7 +208,7 @@ impl Operation for RebalanceStatus {
bytes: ps.bytes,
bucket: ps.bucket.clone(),
object: ps.object.clone(),
elapsed: elapsed.as_secs(),
elapsed: elapsed.whole_seconds() as u64,
eta: eta.as_secs(),
});
}
@@ -244,10 +244,45 @@ impl Operation for RebalanceStop {
.await
.map_err(|e| s3_error!(InternalError, "Failed to stop rebalance: {}", e))?;
warn!("handle RebalanceStop save_rebalance_stats done ");
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.load_rebalance_meta(true).await;
warn!("handle RebalanceStop notification_sys load_rebalance_meta");
notification_sys.load_rebalance_meta(false).await;
warn!("handle RebalanceStop notification_sys load_rebalance_meta done");
}
return Err(s3_error!(NotImplemented));
Ok(S3Response::new((StatusCode::OK, Body::empty())))
}
}
mod offsetdatetime_rfc3339 {
use serde::{self, Deserialize, Deserializer, Serializer};
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
pub fn serialize<S>(dt: &Option<OffsetDateTime>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match dt {
Some(dt) => {
let s = dt.format(&Rfc3339).map_err(serde::ser::Error::custom)?;
serializer.serialize_some(&s)
}
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<OffsetDateTime>, D::Error>
where
D: Deserializer<'de>,
{
let opt = Option::<String>::deserialize(deserializer)?;
match opt {
Some(s) => {
let dt = OffsetDateTime::parse(&s, &Rfc3339).map_err(serde::de::Error::custom)?;
Ok(Some(dt))
}
None => Ok(None),
}
}
}

View File

@@ -3,6 +3,7 @@ use super::router::Operation;
use super::router::S3Router;
use crate::storage::ecfs::bytes_stream;
use ecstore::disk::DiskAPI;
use ecstore::disk::WalkDirOptions;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
use ecstore::store::find_local_disk;
use futures::TryStreamExt;
@@ -18,6 +19,7 @@ use s3s::s3_error;
use serde_urlencoded::from_bytes;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::warn;
pub const RPC_PREFIX: &str = "/rustfs/rpc";
@@ -28,12 +30,30 @@ pub fn regist_rpc_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
AdminOperation(&ReadFile {}),
)?;
r.insert(
Method::HEAD,
format!("{}{}", RPC_PREFIX, "/read_file_stream").as_str(),
AdminOperation(&ReadFile {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", RPC_PREFIX, "/put_file_stream").as_str(),
AdminOperation(&PutFile {}),
)?;
r.insert(
Method::GET,
format!("{}{}", RPC_PREFIX, "/walk_dir").as_str(),
AdminOperation(&WalkDir {}),
)?;
r.insert(
Method::HEAD,
format!("{}{}", RPC_PREFIX, "/walk_dir").as_str(),
AdminOperation(&WalkDir {}),
)?;
Ok(())
}
@@ -50,6 +70,9 @@ pub struct ReadFile {}
#[async_trait::async_trait]
impl Operation for ReadFile {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
if req.method == Method::HEAD {
return Ok(S3Response::new((StatusCode::OK, Body::empty())));
}
let query = {
if let Some(query) = req.uri.query() {
let input: ReadFileQuery =
@@ -79,6 +102,61 @@ impl Operation for ReadFile {
}
}
#[derive(Debug, Default, serde::Deserialize)]
pub struct WalkDirQuery {
disk: String,
}
pub struct WalkDir {}
#[async_trait::async_trait]
impl Operation for WalkDir {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
if req.method == Method::HEAD {
return Ok(S3Response::new((StatusCode::OK, Body::empty())));
}
let query = {
if let Some(query) = req.uri.query() {
let input: WalkDirQuery =
from_bytes(query.as_bytes()).map_err(|e| s3_error!(InvalidArgument, "get query failed1 {:?}", e))?;
input
} else {
WalkDirQuery::default()
}
};
let mut input = req.input;
let body = match input.store_all_unlimited().await {
Ok(b) => b,
Err(e) => {
warn!("get body failed, e: {:?}", e);
return Err(s3_error!(InvalidRequest, "get body failed"));
}
};
// let body_bytes = decrypt_data(input_cred.secret_key.expose().as_bytes(), &body)
// .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, format!("decrypt_data err {}", e)))?;
let args: WalkDirOptions =
serde_json::from_slice(&body).map_err(|e| s3_error!(InternalError, "unmarshal body err {}", e))?;
let Some(disk) = find_local_disk(&query.disk).await else {
return Err(s3_error!(InvalidArgument, "disk not found"));
};
let (rd, mut wd) = tokio::io::duplex(DEFAULT_READ_BUFFER_SIZE);
tokio::spawn(async move {
if let Err(e) = disk.walk_dir(args, &mut wd).await {
warn!("walk dir err {}", e);
}
});
let body = Body::from(StreamingBlob::wrap(ReaderStream::with_capacity(rd, DEFAULT_READ_BUFFER_SIZE)));
Ok(S3Response::new((StatusCode::OK, body)))
}
}
// /rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}"
#[derive(Debug, Default, serde::Deserialize)]
pub struct PutFileQuery {

View File

@@ -807,11 +807,38 @@ impl Node for NodeService {
}
}
Err(err) => {
if err == rustfs_filemeta::Error::Unexpected {
let _ = tx
.send(Ok(WalkDirResponse {
success: false,
meta_cache_entry: "".to_string(),
error_info: Some(err.to_string()),
}))
.await;
break;
}
if rustfs_filemeta::is_io_eof(&err) {
let _ = tx
.send(Ok(WalkDirResponse {
success: false,
meta_cache_entry: "".to_string(),
error_info: Some(err.to_string()),
}))
.await;
break;
}
println!("get err {:?}", err);
let _ = tx
.send(Ok(WalkDirResponse {
success: false,
meta_cache_entry: "".to_string(),
error_info: Some(err.to_string()),
}))
.await;
break;
}
}

11
scripts/dev_clear.sh Normal file
View File

@@ -0,0 +1,11 @@
for i in {0..3}; do
DIR="/data/rustfs$i"
echo "处理 $DIR"
if [ -d "$DIR" ]; then
echo "清空 $DIR"
sudo rm -rf "$DIR"/* "$DIR"/.[!.]* "$DIR"/..?* 2>/dev/null || true
echo "已清空 $DIR"
else
echo "$DIR 不存在,跳过"
fi
done

View File

@@ -4,18 +4,20 @@
rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip
# 压缩./target/x86_64-unknown-linux-musl/release/rustfs
zip ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs
zip -j ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs
# 本地文件路径
LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip"
REMOTE_PATH="~"
# 定义服务器列表数组
# 格式:服务器 IP 用户名 目标路径
SERVER_LIST=(
"root@121.89.80.13"
)
# 必须传入IP参数否则报错退出
if [ -z "$1" ]; then
echo "用法: $0 <server_ip>"
echo "请传入目标服务器IP地址"
exit 1
fi
SERVER_LIST=("root@$1")
# 遍历服务器列表
for SERVER in "${SERVER_LIST[@]}"; do
@@ -26,7 +28,4 @@ for SERVER in "${SERVER_LIST[@]}"; do
else
echo "复制到 $SERVER 失败"
fi
done
# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9
done

11
scripts/dev_rustfs.env Normal file
View File

@@ -0,0 +1,11 @@
RUSTFS_ROOT_USER=rustfsadmin
RUSTFS_ROOT_PASSWORD=rustfsadmin
RUSTFS_VOLUMES="http://node{1...4}:7000/data/rustfs{0...3} http://node{5...8}:7000/data/rustfs{0...3}"
RUSTFS_ADDRESS=":7000"
RUSTFS_CONSOLE_ENABLE=true
RUSTFS_CONSOLE_ADDRESS=":7001"
RUST_LOG=warn
RUSTFS_OBS_LOG_DIRECTORY="/var/logs/rustfs/"
RUSTFS_NS_SCANNER_INTERVAL=60
RUSTFS_SKIP_BACKGROUND_TASK=true

199
scripts/dev_rustfs.sh Normal file
View File

@@ -0,0 +1,199 @@
#!/bin/bash
# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9
# 本地 rustfs.zip 路径
ZIP_FILE="./rustfs.zip"
# 解压目标
UNZIP_TARGET="./"
SERVER_LIST=(
"root@172.23.215.2" # node1
"root@172.23.215.4" # node2
"root@172.23.215.7" # node3
"root@172.23.215.3" # node4
"root@172.23.215.8" # node5
"root@172.23.215.5" # node6
"root@172.23.215.9" # node7
"root@172.23.215.6" # node8
)
REMOTE_TMP="~/rustfs"
# 部署 rustfs 到所有服务器
deploy() {
echo "解压 $ZIP_FILE ..."
unzip -o "$ZIP_FILE" -d "$UNZIP_TARGET"
if [ $? -ne 0 ]; then
echo "解压失败,退出"
exit 1
fi
LOCAL_RUSTFS="${UNZIP_TARGET}rustfs"
if [ ! -f "$LOCAL_RUSTFS" ]; then
echo "未找到解压后的 rustfs 文件,退出"
exit 1
fi
for SERVER in "${SERVER_LIST[@]}"; do
echo "上传 $LOCAL_RUSTFS$SERVER:$REMOTE_TMP"
scp "$LOCAL_RUSTFS" "${SERVER}:${REMOTE_TMP}"
if [ $? -ne 0 ]; then
echo "❌ 上传到 $SERVER 失败,跳过"
continue
fi
echo "$SERVER 上操作 systemctl 和文件替换"
ssh "$SERVER" bash <<EOF
set -e
echo "停止 rustfs 服务"
sudo systemctl stop rustfs || true
echo "覆盖 /usr/local/bin/rustfs"
sudo cp ~/rustfs /usr/local/bin/rustfs
sudo chmod +x /usr/local/bin/rustfs
echo "启动 rustfs 服务"
sudo systemctl start rustfs
echo "检测 rustfs 服务状态"
sudo systemctl status rustfs --no-pager --lines=10
EOF
if [ $? -eq 0 ]; then
echo "$SERVER 部署并重启 rustfs 成功"
else
echo "$SERVER 部署或重启 rustfs 失败"
fi
done
}
# 清空 /data/rustfs0~3 目录下所有文件(包括隐藏文件)
clear_data_dirs() {
for SERVER in "${SERVER_LIST[@]}"; do
echo "清空 $SERVER:/data/rustfs0~3 下所有文件"
ssh "$SERVER" bash <<EOF
for i in {0..3}; do
DIR="/data/rustfs$i"
echo "处理 $DIR"
if [ -d "$DIR" ]; then
echo "清空 $DIR"
sudo rm -rf "$DIR"/* "$DIR"/.[!.]* "$DIR"/..?* 2>/dev/null || true
echo "已清空 $DIR"
else
echo "$DIR 不存在,跳过"
fi
done
EOF
done
}
# 控制 rustfs 服务
stop_rustfs() {
for SERVER in "${SERVER_LIST[@]}"; do
echo "停止 $SERVER rustfs 服务"
ssh "$SERVER" "sudo systemctl stop rustfs"
done
}
start_rustfs() {
for SERVER in "${SERVER_LIST[@]}"; do
echo "启动 $SERVER rustfs 服务"
ssh "$SERVER" "sudo systemctl start rustfs"
done
}
restart_rustfs() {
for SERVER in "${SERVER_LIST[@]}"; do
echo "重启 $SERVER rustfs 服务"
ssh "$SERVER" "sudo systemctl restart rustfs"
done
}
# 向所有服务器追加公钥到 ~/.ssh/authorized_keys
add_ssh_key() {
if [ -z "$2" ]; then
echo "用法: $0 addkey <pubkey_file>"
exit 1
fi
PUBKEY_FILE="$2"
if [ ! -f "$PUBKEY_FILE" ]; then
echo "指定的公钥文件不存在: $PUBKEY_FILE"
exit 1
fi
PUBKEY_CONTENT=$(cat "$PUBKEY_FILE")
for SERVER in "${SERVER_LIST[@]}"; do
echo "追加公钥到 $SERVER:~/.ssh/authorized_keys"
ssh "$SERVER" "mkdir -p ~/.ssh && chmod 700 ~/.ssh && echo '$PUBKEY_CONTENT' >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys"
if [ $? -eq 0 ]; then
echo "$SERVER 公钥追加成功"
else
echo "$SERVER 公钥追加失败"
fi
done
}
monitor_logs() {
for SERVER in "${SERVER_LIST[@]}"; do
echo "监控 $SERVER:/var/logs/rustfs/rustfs.log ..."
ssh "$SERVER" "tail -F /var/logs/rustfs/rustfs.log" |
sed "s/^/[$SERVER] /" &
done
wait
}
set_env_file() {
if [ -z "$2" ]; then
echo "用法: $0 setenv <env_file>"
exit 1
fi
ENV_FILE="$2"
if [ ! -f "$ENV_FILE" ]; then
echo "指定的环境变量文件不存在: $ENV_FILE"
exit 1
fi
for SERVER in "${SERVER_LIST[@]}"; do
echo "上传 $ENV_FILE$SERVER:~/rustfs.env"
scp "$ENV_FILE" "${SERVER}:~/rustfs.env"
if [ $? -ne 0 ]; then
echo "❌ 上传到 $SERVER 失败,跳过"
continue
fi
echo "覆盖 $SERVER:/etc/default/rustfs"
ssh "$SERVER" "sudo mv ~/rustfs.env /etc/default/rustfs"
if [ $? -eq 0 ]; then
echo "$SERVER /etc/default/rustfs 覆盖成功"
else
echo "$SERVER /etc/default/rustfs 覆盖失败"
fi
done
}
# 主命令分发
case "$1" in
deploy)
deploy
;;
clear)
clear_data_dirs
;;
stop)
stop_rustfs
;;
start)
start_rustfs
;;
restart)
restart_rustfs
;;
addkey)
add_ssh_key "$@"
;;
monitor_logs)
monitor_logs
;;
setenv)
set_env_file "$@"
;;
*)
echo "用法: $0 {deploy|clear|stop|start|restart|addkey <pubkey_file>|monitor_logs|setenv <env_file>}"
;;
esac