mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
10
Makefile
10
Makefile
@@ -79,3 +79,13 @@ build: BUILD_CMD = /root/.cargo/bin/cargo build --release --bin rustfs --target-
|
||||
build:
|
||||
$(DOCKER_CLI) build -t $(ROCKYLINUX_BUILD_IMAGE_NAME) -f $(DOCKERFILE_PATH)/Dockerfile.$(BUILD_OS) .
|
||||
$(DOCKER_CLI) run --rm --name $(ROCKYLINUX_BUILD_CONTAINER_NAME) -v $(shell pwd):/root/s3-rustfs -it $(ROCKYLINUX_BUILD_IMAGE_NAME) $(BUILD_CMD)
|
||||
|
||||
.PHONY: build-musl
|
||||
build-musl:
|
||||
@echo "🔨 Building rustfs for x86_64-unknown-linux-musl..."
|
||||
cargo build --target x86_64-unknown-linux-musl --bin rustfs -r
|
||||
|
||||
.PHONY: deploy-dev
|
||||
deploy-dev: build-musl
|
||||
@echo "🚀 Deploying to dev server: $${IP}"
|
||||
./scripts/dev_deploy.sh $${IP}
|
||||
|
||||
@@ -111,7 +111,20 @@ impl Clone for Error {
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Error::Io(e)
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::UnexpectedEof => Error::Unexpected,
|
||||
_ => Error::Io(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for std::io::Error {
|
||||
fn from(e: Error) -> Self {
|
||||
match e {
|
||||
Error::Unexpected => std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Unexpected EOF"),
|
||||
Error::Io(e) => e,
|
||||
_ => std::io::Error::other(e.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,6 +427,9 @@ mod tests {
|
||||
let filemeta_error: Error = io_error.into();
|
||||
|
||||
match filemeta_error {
|
||||
Error::Unexpected => {
|
||||
assert_eq!(kind, ErrorKind::UnexpectedEof);
|
||||
}
|
||||
Error::Io(extracted_io_error) => {
|
||||
assert_eq!(extracted_io_error.kind(), kind);
|
||||
assert!(extracted_io_error.to_string().contains("test error"));
|
||||
|
||||
@@ -3,6 +3,7 @@ use futures::{Stream, StreamExt};
|
||||
use http::HeaderMap;
|
||||
use pin_project_lite::pin_project;
|
||||
use reqwest::{Client, Method, RequestBuilder};
|
||||
use std::error::Error as _;
|
||||
use std::io::{self, Error};
|
||||
use std::pin::Pin;
|
||||
use std::sync::LazyLock;
|
||||
@@ -43,12 +44,18 @@ pin_project! {
|
||||
}
|
||||
|
||||
impl HttpReader {
|
||||
pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result<Self> {
|
||||
pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option<Vec<u8>>) -> io::Result<Self> {
|
||||
http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}");
|
||||
Self::with_capacity(url, method, headers, 0).await
|
||||
Self::with_capacity(url, method, headers, body, 0).await
|
||||
}
|
||||
/// Create a new HttpReader from a URL. The request is performed immediately.
|
||||
pub async fn with_capacity(url: String, method: Method, headers: HeaderMap, mut read_buf_size: usize) -> io::Result<Self> {
|
||||
pub async fn with_capacity(
|
||||
url: String,
|
||||
method: Method,
|
||||
headers: HeaderMap,
|
||||
body: Option<Vec<u8>>,
|
||||
mut read_buf_size: usize,
|
||||
) -> io::Result<Self> {
|
||||
http_log!(
|
||||
"[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}",
|
||||
read_buf_size
|
||||
@@ -60,12 +67,12 @@ impl HttpReader {
|
||||
Ok(resp) => {
|
||||
http_log!("[HttpReader::new] HEAD status: {}", resp.status());
|
||||
if !resp.status().is_success() {
|
||||
return Err(Error::other(format!("HEAD failed: status {}", resp.status())));
|
||||
return Err(Error::other(format!("HEAD failed: url: {}, status {}", url, resp.status())));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
http_log!("[HttpReader::new] HEAD error: {e}");
|
||||
return Err(Error::other(format!("HEAD request failed: {e}")));
|
||||
return Err(Error::other(e.source().map(|s| s.to_string()).unwrap_or_else(|| e.to_string())));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +87,10 @@ impl HttpReader {
|
||||
let (err_tx, err_rx) = oneshot::channel::<io::Error>();
|
||||
tokio::spawn(async move {
|
||||
let client = get_http_client();
|
||||
let request: RequestBuilder = client.request(method_clone, url_clone).headers(headers_clone);
|
||||
let mut request: RequestBuilder = client.request(method_clone, url_clone).headers(headers_clone);
|
||||
if let Some(body) = body {
|
||||
request = request.body(body);
|
||||
}
|
||||
|
||||
let response = request.send().await;
|
||||
match response {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::disk::{self, DiskAPI, DiskStore, WalkDirOptions};
|
||||
use futures::future::join_all;
|
||||
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader};
|
||||
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof};
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
use tokio::{spawn, sync::broadcast::Receiver as B_Receiver};
|
||||
use tracing::error;
|
||||
@@ -50,7 +50,6 @@ impl Clone for ListPathRawOptions {
|
||||
}
|
||||
|
||||
pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -> disk::error::Result<()> {
|
||||
// println!("list_path_raw {},{}", &opts.bucket, &opts.path);
|
||||
if opts.disks.is_empty() {
|
||||
return Err(DiskError::other("list_path_raw: 0 drives provided"));
|
||||
}
|
||||
@@ -59,12 +58,13 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
let mut readers = Vec::with_capacity(opts.disks.len());
|
||||
let fds = Arc::new(opts.fallback_disks.clone());
|
||||
|
||||
let (cancel_tx, cancel_rx) = tokio::sync::broadcast::channel::<bool>(1);
|
||||
|
||||
for disk in opts.disks.iter() {
|
||||
let opdisk = disk.clone();
|
||||
let opts_clone = opts.clone();
|
||||
let fds_clone = fds.clone();
|
||||
// let (m_tx, m_rx) = mpsc::channel::<MetaCacheEntry>(100);
|
||||
// readers.push(m_rx);
|
||||
let mut cancel_rx_clone = cancel_rx.resubscribe();
|
||||
let (rd, mut wr) = tokio::io::duplex(64);
|
||||
readers.push(MetacacheReader::new(rd));
|
||||
jobs.push(spawn(async move {
|
||||
@@ -92,7 +92,13 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
need_fallback = true;
|
||||
}
|
||||
|
||||
if cancel_rx_clone.try_recv().is_ok() {
|
||||
// warn!("list_path_raw: cancel_rx_clone.try_recv().await.is_ok()");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
while need_fallback {
|
||||
// warn!("list_path_raw: while need_fallback start");
|
||||
let disk = match fds_clone.iter().find(|d| d.is_some()) {
|
||||
Some(d) => {
|
||||
if let Some(disk) = d.clone() {
|
||||
@@ -130,6 +136,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
}
|
||||
|
||||
// warn!("list_path_raw: while need_fallback done");
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
@@ -143,9 +150,15 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
loop {
|
||||
let mut current = MetaCacheEntry::default();
|
||||
|
||||
// warn!(
|
||||
// "list_path_raw: loop start, bucket: {}, path: {}, current: {:?}",
|
||||
// opts.bucket, opts.path, ¤t.name
|
||||
// );
|
||||
|
||||
if rx.try_recv().is_ok() {
|
||||
return Err(DiskError::other("canceled"));
|
||||
}
|
||||
|
||||
let mut top_entries: Vec<Option<MetaCacheEntry>> = vec![None; readers.len()];
|
||||
|
||||
let mut at_eof = 0;
|
||||
@@ -168,31 +181,47 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
} else {
|
||||
// eof
|
||||
at_eof += 1;
|
||||
|
||||
// warn!("list_path_raw: peek eof, disk: {}", i);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
if err == rustfs_filemeta::Error::Unexpected {
|
||||
at_eof += 1;
|
||||
// warn!("list_path_raw: peek err eof, disk: {}", i);
|
||||
continue;
|
||||
} else if err == rustfs_filemeta::Error::FileNotFound {
|
||||
}
|
||||
|
||||
// warn!("list_path_raw: peek err00, err: {:?}", err);
|
||||
|
||||
if is_io_eof(&err) {
|
||||
at_eof += 1;
|
||||
// warn!("list_path_raw: peek eof, disk: {}", i);
|
||||
continue;
|
||||
}
|
||||
|
||||
if err == rustfs_filemeta::Error::FileNotFound {
|
||||
at_eof += 1;
|
||||
fnf += 1;
|
||||
// warn!("list_path_raw: peek fnf, disk: {}", i);
|
||||
continue;
|
||||
} else if err == rustfs_filemeta::Error::VolumeNotFound {
|
||||
at_eof += 1;
|
||||
fnf += 1;
|
||||
vnf += 1;
|
||||
// warn!("list_path_raw: peek vnf, disk: {}", i);
|
||||
continue;
|
||||
} else {
|
||||
has_err += 1;
|
||||
errs[i] = Some(err.into());
|
||||
// warn!("list_path_raw: peek err, disk: {}", i);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// warn!("list_path_raw: loop entry: {:?}, disk: {}", &entry.name, i);
|
||||
|
||||
// If no current, add it.
|
||||
if current.name.is_empty() {
|
||||
top_entries[i] = Some(entry.clone());
|
||||
@@ -228,10 +257,12 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
|
||||
if vnf > 0 && vnf >= (readers.len() - opts.min_disks) {
|
||||
// warn!("list_path_raw: vnf > 0 && vnf >= (readers.len() - opts.min_disks) break");
|
||||
return Err(DiskError::VolumeNotFound);
|
||||
}
|
||||
|
||||
if fnf > 0 && fnf >= (readers.len() - opts.min_disks) {
|
||||
// warn!("list_path_raw: fnf > 0 && fnf >= (readers.len() - opts.min_disks) break");
|
||||
return Err(DiskError::FileNotFound);
|
||||
}
|
||||
|
||||
@@ -250,6 +281,10 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
_ => {}
|
||||
});
|
||||
|
||||
error!(
|
||||
"list_path_raw: has_err > 0 && has_err > opts.disks.len() - opts.min_disks break, err: {:?}",
|
||||
&combined_err.join(", ")
|
||||
);
|
||||
return Err(DiskError::other(combined_err.join(", ")));
|
||||
}
|
||||
|
||||
@@ -263,6 +298,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
}
|
||||
|
||||
// error!("list_path_raw: at_eof + has_err == readers.len() break {:?}", &errs);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -272,12 +308,16 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
|
||||
if let Some(agreed_fn) = opts.agreed.as_ref() {
|
||||
// warn!("list_path_raw: agreed_fn start, current: {:?}", ¤t.name);
|
||||
agreed_fn(current).await;
|
||||
// warn!("list_path_raw: agreed_fn done");
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// warn!("list_path_raw: skip start, current: {:?}", ¤t.name);
|
||||
|
||||
for (i, r) in readers.iter_mut().enumerate() {
|
||||
if top_entries[i].is_some() {
|
||||
let _ = r.skip(1).await;
|
||||
@@ -291,7 +331,12 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
Ok(())
|
||||
});
|
||||
|
||||
jobs.push(revjob);
|
||||
if let Err(err) = revjob.await.map_err(std::io::Error::other)? {
|
||||
error!("list_path_raw: revjob err {:?}", err);
|
||||
let _ = cancel_tx.send(true);
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let results = join_all(jobs).await;
|
||||
for result in results {
|
||||
@@ -300,5 +345,6 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
}
|
||||
|
||||
// warn!("list_path_raw: done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ pub async fn read_config_with_metadata<S: StorageAPI>(
|
||||
if err == Error::FileNotFound || matches!(err, Error::ObjectNotFound(_, _)) {
|
||||
Error::ConfigNotFound
|
||||
} else {
|
||||
warn!("read_config_with_metadata: err: {:?}, file: {}", err, file);
|
||||
err
|
||||
}
|
||||
})?;
|
||||
@@ -92,9 +93,19 @@ pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()>
|
||||
}
|
||||
|
||||
pub async fn save_config_with_opts<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>, opts: &ObjectOptions) -> Result<()> {
|
||||
let _ = api
|
||||
warn!(
|
||||
"save_config_with_opts, bucket: {}, file: {}, data len: {}",
|
||||
RUSTFS_META_BUCKET,
|
||||
file,
|
||||
data.len()
|
||||
);
|
||||
if let Err(err) = api
|
||||
.put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::from_vec(data), opts)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
warn!("save_config_with_opts: err: {:?}, file: {}", err, file);
|
||||
return Err(err);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ pub enum DiskError {
|
||||
#[error("erasure read quorum")]
|
||||
ErasureReadQuorum,
|
||||
|
||||
#[error("io error")]
|
||||
#[error("io error {0}")]
|
||||
Io(io::Error),
|
||||
}
|
||||
|
||||
|
||||
@@ -773,7 +773,7 @@ impl LocalDisk {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if e != DiskError::VolumeNotFound && e != Error::FileNotFound {
|
||||
info!("scan list_dir {}, err {:?}", ¤t, &e);
|
||||
warn!("scan list_dir {}, err {:?}", ¤t, &e);
|
||||
}
|
||||
|
||||
if opts.report_notfound && e == Error::FileNotFound && current == &opts.base_dir {
|
||||
@@ -785,6 +785,7 @@ impl LocalDisk {
|
||||
};
|
||||
|
||||
if entries.is_empty() {
|
||||
warn!("scan list_dir {}, entries is empty", ¤t);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -800,6 +801,7 @@ impl LocalDisk {
|
||||
let entry = item.clone();
|
||||
// check limit
|
||||
if opts.limit > 0 && *objs_returned >= opts.limit {
|
||||
warn!("scan list_dir {}, limit reached", ¤t);
|
||||
return Ok(());
|
||||
}
|
||||
// check prefix
|
||||
@@ -843,13 +845,14 @@ impl LocalDisk {
|
||||
let name = decode_dir_object(format!("{}/{}", ¤t, &name).as_str());
|
||||
|
||||
out.write_obj(&MetaCacheEntry {
|
||||
name,
|
||||
name: name.clone(),
|
||||
metadata,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
*objs_returned += 1;
|
||||
|
||||
// warn!("scan list_dir {}, write_obj done, name: {:?}", ¤t, &name);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@@ -870,6 +873,7 @@ impl LocalDisk {
|
||||
|
||||
for entry in entries.iter() {
|
||||
if opts.limit > 0 && *objs_returned >= opts.limit {
|
||||
// warn!("scan list_dir {}, limit reached 2", ¤t);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -945,6 +949,7 @@ impl LocalDisk {
|
||||
|
||||
while let Some(dir) = dir_stack.pop() {
|
||||
if opts.limit > 0 && *objs_returned >= opts.limit {
|
||||
// warn!("scan list_dir {}, limit reached 3", ¤t);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -965,6 +970,7 @@ impl LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
// warn!("scan list_dir {}, done", ¤t);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1568,6 +1574,11 @@ impl DiskAPI for LocalDisk {
|
||||
let mut current = opts.base_dir.clone();
|
||||
self.scan_dir(&mut current, &opts, &mut out, &mut objs_returned).await?;
|
||||
|
||||
warn!(
|
||||
"walk_dir: done, volume_dir: {:?}, base_dir: {}",
|
||||
volume_dir.to_string_lossy(),
|
||||
opts.base_dir
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -2,20 +2,20 @@ use std::path::PathBuf;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::lock::Mutex;
|
||||
use http::{HeaderMap, Method};
|
||||
use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE};
|
||||
use protos::{
|
||||
node_service_time_out_client,
|
||||
proto_gen::node_service::{
|
||||
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
|
||||
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest,
|
||||
ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
|
||||
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
|
||||
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
|
||||
},
|
||||
};
|
||||
use rmp_serde::Serializer;
|
||||
use rustfs_filemeta::{FileInfo, MetaCacheEntry, MetacacheWriter, RawFileInfo};
|
||||
|
||||
use rustfs_filemeta::{FileInfo, RawFileInfo};
|
||||
use rustfs_rio::{HttpReader, HttpWriter};
|
||||
use serde::Serialize;
|
||||
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
sync::mpsc::{self, Sender},
|
||||
@@ -256,47 +256,55 @@ impl DiskAPI for RemoteDisk {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// FIXME: TODO: use writer
|
||||
#[tracing::instrument(skip(self, wr))]
|
||||
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
|
||||
let now = std::time::SystemTime::now();
|
||||
info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix);
|
||||
let mut wr = wr;
|
||||
let mut out = MetacacheWriter::new(&mut wr);
|
||||
let mut buf = Vec::new();
|
||||
opts.serialize(&mut Serializer::new(&mut buf))?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::other(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WalkDirRequest {
|
||||
disk: self.endpoint.to_string(),
|
||||
walk_dir_options: buf.into(),
|
||||
});
|
||||
let mut response = client.walk_dir(request).await?.into_inner();
|
||||
// // FIXME: TODO: use writer
|
||||
// #[tracing::instrument(skip(self, wr))]
|
||||
// async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
|
||||
// let now = std::time::SystemTime::now();
|
||||
// info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix);
|
||||
// let mut wr = wr;
|
||||
// let mut out = MetacacheWriter::new(&mut wr);
|
||||
// let mut buf = Vec::new();
|
||||
// opts.serialize(&mut Serializer::new(&mut buf))?;
|
||||
// let mut client = node_service_time_out_client(&self.addr)
|
||||
// .await
|
||||
// .map_err(|err| Error::other(format!("can not get client, err: {}", err)))?;
|
||||
// let request = Request::new(WalkDirRequest {
|
||||
// disk: self.endpoint.to_string(),
|
||||
// walk_dir_options: buf.into(),
|
||||
// });
|
||||
// let mut response = client.walk_dir(request).await?.into_inner();
|
||||
|
||||
loop {
|
||||
match response.next().await {
|
||||
Some(Ok(resp)) => {
|
||||
if !resp.success {
|
||||
return Err(Error::other(resp.error_info.unwrap_or_default()));
|
||||
}
|
||||
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
|
||||
.map_err(|_| Error::other(format!("Unexpected response: {:?}", response)))?;
|
||||
out.write_obj(&entry).await?;
|
||||
}
|
||||
None => break,
|
||||
_ => return Err(Error::other(format!("Unexpected response: {:?}", response))),
|
||||
}
|
||||
}
|
||||
// loop {
|
||||
// match response.next().await {
|
||||
// Some(Ok(resp)) => {
|
||||
// if !resp.success {
|
||||
// if let Some(err) = resp.error_info {
|
||||
// if err == "Unexpected EOF" {
|
||||
// return Err(Error::Io(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, err)));
|
||||
// } else {
|
||||
// return Err(Error::other(err));
|
||||
// }
|
||||
// }
|
||||
|
||||
info!(
|
||||
"walk_dir {}/{:?} done {:?}",
|
||||
opts.bucket,
|
||||
opts.filter_prefix,
|
||||
now.elapsed().unwrap_or_default()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
// return Err(Error::other("unknown error"));
|
||||
// }
|
||||
// let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
|
||||
// .map_err(|_| Error::other(format!("Unexpected response: {:?}", response)))?;
|
||||
// out.write_obj(&entry).await?;
|
||||
// }
|
||||
// None => break,
|
||||
// _ => return Err(Error::other(format!("Unexpected response: {:?}", response))),
|
||||
// }
|
||||
// }
|
||||
|
||||
// info!(
|
||||
// "walk_dir {}/{:?} done {:?}",
|
||||
// opts.bucket,
|
||||
// opts.filter_prefix,
|
||||
// now.elapsed().unwrap_or_default()
|
||||
// );
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn delete_version(
|
||||
@@ -559,6 +567,28 @@ impl DiskAPI for RemoteDisk {
|
||||
Ok(response.volumes)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, wr))]
|
||||
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
|
||||
info!("walk_dir {}", self.endpoint.to_string());
|
||||
|
||||
let url = format!(
|
||||
"{}/rustfs/rpc/walk_dir?disk={}",
|
||||
self.endpoint.grid_host(),
|
||||
urlencoding::encode(self.endpoint.to_string().as_str()),
|
||||
);
|
||||
|
||||
let opts = serde_json::to_vec(&opts)?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
|
||||
|
||||
let mut reader = HttpReader::new(url, Method::GET, headers, Some(opts)).await?;
|
||||
|
||||
tokio::io::copy(&mut reader, wr).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
|
||||
info!("read_file {}/{}", volume, path);
|
||||
@@ -573,7 +603,7 @@ impl DiskAPI for RemoteDisk {
|
||||
0
|
||||
);
|
||||
|
||||
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new()).await?))
|
||||
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new(), None).await?))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
@@ -589,7 +619,7 @@ impl DiskAPI for RemoteDisk {
|
||||
length
|
||||
);
|
||||
|
||||
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new()).await?))
|
||||
Ok(Box::new(HttpReader::new(url, Method::GET, HeaderMap::new(), None).await?))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
|
||||
@@ -680,7 +680,7 @@ mod test {
|
||||
),
|
||||
(
|
||||
vec!["ftp://server/d1", "http://server/d2", "http://server/d3", "http://server/d4"],
|
||||
Some(Error::other("'ftp://server/d1': io error")),
|
||||
Some(Error::other("'ftp://server/d1': io error invalid URL endpoint format")),
|
||||
10,
|
||||
),
|
||||
(
|
||||
@@ -719,7 +719,13 @@ mod test {
|
||||
(None, Ok(_)) => {}
|
||||
(Some(e), Ok(_)) => panic!("{}: error: expected = {}, got = <nil>", test_case.2, e),
|
||||
(Some(e), Err(e2)) => {
|
||||
assert_eq!(e.to_string(), e2.to_string(), "{}: error: expected = {}, got = {}", test_case.2, e, e2)
|
||||
assert!(
|
||||
e2.to_string().starts_with(&e.to_string()),
|
||||
"{}: error: expected = {}, got = {}",
|
||||
test_case.2,
|
||||
e,
|
||||
e2
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,12 +242,14 @@ impl Erasure {
|
||||
}
|
||||
|
||||
if !reader.can_decode(&shards) {
|
||||
error!("erasure decode can_decode errs: {:?}", &errs);
|
||||
ret_err = Some(Error::ErasureReadQuorum.into());
|
||||
break;
|
||||
}
|
||||
|
||||
// Decode the shards
|
||||
if let Err(e) = self.decode_data(&mut shards) {
|
||||
error!("erasure decode decode_data err: {:?}", e);
|
||||
ret_err = Some(e);
|
||||
break;
|
||||
}
|
||||
@@ -255,6 +257,7 @@ impl Erasure {
|
||||
let n = match write_data_blocks(writer, &shards, self.data_shards, block_offset, block_length).await {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
error!("erasure decode write_data_blocks err: {:?}", e);
|
||||
ret_err = Some(e);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::sync::Arc;
|
||||
use std::vec;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::error;
|
||||
|
||||
pub(crate) struct MultiWriter<'a> {
|
||||
writers: &'a mut [Option<BitrotWriterWrapper>],
|
||||
@@ -60,6 +61,13 @@ impl<'a> MultiWriter<'a> {
|
||||
}
|
||||
|
||||
if let Some(write_err) = reduce_write_quorum_errs(&self.errs, OBJECT_OP_IGNORED_ERRS, self.write_quorum) {
|
||||
error!(
|
||||
"reduce_write_quorum_errs: {:?}, offline-disks={}/{}, errs={:?}",
|
||||
write_err,
|
||||
count_errs(&self.errs, &Error::DiskNotFound),
|
||||
self.writers.len(),
|
||||
self.errs
|
||||
);
|
||||
return Err(std::io::Error::other(format!(
|
||||
"Failed to write data: {} (offline-disks={}/{})",
|
||||
write_err,
|
||||
|
||||
@@ -143,7 +143,11 @@ impl NotificationSys {
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn load_rebalance_meta(&self, start: bool) {
|
||||
let mut futures = Vec::with_capacity(self.peer_clients.len());
|
||||
for client in self.peer_clients.iter().flatten() {
|
||||
for (i, client) in self.peer_clients.iter().flatten().enumerate() {
|
||||
warn!(
|
||||
"notification load_rebalance_meta start: {}, index: {}, client: {:?}",
|
||||
start, i, client.host
|
||||
);
|
||||
futures.push(client.load_rebalance_meta(start));
|
||||
}
|
||||
|
||||
@@ -158,11 +162,16 @@ impl NotificationSys {
|
||||
}
|
||||
|
||||
pub async fn stop_rebalance(&self) {
|
||||
warn!("notification stop_rebalance start");
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
error!("stop_rebalance: not init");
|
||||
return;
|
||||
};
|
||||
|
||||
// warn!("notification stop_rebalance load_rebalance_meta");
|
||||
// self.load_rebalance_meta(false).await;
|
||||
// warn!("notification stop_rebalance load_rebalance_meta done");
|
||||
|
||||
let mut futures = Vec::with_capacity(self.peer_clients.len());
|
||||
for client in self.peer_clients.iter().flatten() {
|
||||
futures.push(client.stop_rebalance());
|
||||
@@ -175,7 +184,9 @@ impl NotificationSys {
|
||||
}
|
||||
}
|
||||
|
||||
warn!("notification stop_rebalance stop_rebalance start");
|
||||
let _ = store.stop_rebalance().await;
|
||||
warn!("notification stop_rebalance stop_rebalance done");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -664,7 +664,7 @@ impl PeerRestClient {
|
||||
|
||||
let response = client.load_rebalance_meta(request).await?.into_inner();
|
||||
|
||||
warn!("load_rebalance_meta response {:?}", response);
|
||||
warn!("load_rebalance_meta response {:?}, grid_host: {:?}", response, &self.grid_host);
|
||||
if !response.success {
|
||||
if let Some(msg) = response.error_info {
|
||||
return Err(Error::other(msg));
|
||||
|
||||
@@ -1,7 +1,3 @@
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::StorageAPI;
|
||||
use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw};
|
||||
use crate::config::com::{read_config_with_metadata, save_config_with_opts};
|
||||
@@ -19,16 +15,18 @@ use rustfs_filemeta::{FileInfo, MetaCacheEntries, MetaCacheEntry, MetadataResolu
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_utils::path::encode_dir_object;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::{error, info, warn};
|
||||
use uuid::Uuid;
|
||||
use workers::workers::Workers;
|
||||
|
||||
const REBAL_META_FMT: u16 = 1; // Replace with actual format value
|
||||
const REBAL_META_VER: u16 = 1; // Replace with actual version value
|
||||
const REBAL_META_NAME: &str = "rebalance_meta";
|
||||
const REBAL_META_NAME: &str = "rebalance.bin";
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct RebalanceStats {
|
||||
@@ -123,9 +121,9 @@ pub enum RebalSaveOpt {
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct RebalanceInfo {
|
||||
#[serde(rename = "startTs")]
|
||||
pub start_time: Option<SystemTime>, // Time at which rebalance-start was issued
|
||||
pub start_time: Option<OffsetDateTime>, // Time at which rebalance-start was issued
|
||||
#[serde(rename = "stopTs")]
|
||||
pub end_time: Option<SystemTime>, // Time at which rebalance operation completed or rebalance-stop was called
|
||||
pub end_time: Option<OffsetDateTime>, // Time at which rebalance operation completed or rebalance-stop was called
|
||||
#[serde(rename = "status")]
|
||||
pub status: RebalStatus, // Current state of rebalance operation
|
||||
}
|
||||
@@ -137,14 +135,14 @@ pub struct DiskStat {
|
||||
pub available_space: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
|
||||
pub struct RebalanceMeta {
|
||||
#[serde(skip)]
|
||||
pub cancel: Option<broadcast::Sender<bool>>, // To be invoked on rebalance-stop
|
||||
#[serde(skip)]
|
||||
pub last_refreshed_at: Option<SystemTime>,
|
||||
pub last_refreshed_at: Option<OffsetDateTime>,
|
||||
#[serde(rename = "stopTs")]
|
||||
pub stopped_at: Option<SystemTime>, // Time when rebalance-stop was issued
|
||||
pub stopped_at: Option<OffsetDateTime>, // Time when rebalance-stop was issued
|
||||
#[serde(rename = "id")]
|
||||
pub id: String, // ID of the ongoing rebalance operation
|
||||
#[serde(rename = "pf")]
|
||||
@@ -164,29 +162,29 @@ impl RebalanceMeta {
|
||||
pub async fn load_with_opts<S: StorageAPI>(&mut self, store: Arc<S>, opts: ObjectOptions) -> Result<()> {
|
||||
let (data, _) = read_config_with_metadata(store, REBAL_META_NAME, &opts).await?;
|
||||
if data.is_empty() {
|
||||
warn!("rebalanceMeta: no data");
|
||||
warn!("rebalanceMeta load_with_opts: no data");
|
||||
return Ok(());
|
||||
}
|
||||
if data.len() <= 4 {
|
||||
return Err(Error::other("rebalanceMeta: no data"));
|
||||
return Err(Error::other("rebalanceMeta load_with_opts: no data"));
|
||||
}
|
||||
|
||||
// Read header
|
||||
match u16::from_le_bytes([data[0], data[1]]) {
|
||||
REBAL_META_FMT => {}
|
||||
fmt => return Err(Error::other(format!("rebalanceMeta: unknown format: {}", fmt))),
|
||||
fmt => return Err(Error::other(format!("rebalanceMeta load_with_opts: unknown format: {}", fmt))),
|
||||
}
|
||||
match u16::from_le_bytes([data[2], data[3]]) {
|
||||
REBAL_META_VER => {}
|
||||
ver => return Err(Error::other(format!("rebalanceMeta: unknown version: {}", ver))),
|
||||
ver => return Err(Error::other(format!("rebalanceMeta load_with_opts: unknown version: {}", ver))),
|
||||
}
|
||||
|
||||
let meta: Self = rmp_serde::from_read(Cursor::new(&data[4..]))?;
|
||||
*self = meta;
|
||||
|
||||
self.last_refreshed_at = Some(SystemTime::now());
|
||||
self.last_refreshed_at = Some(OffsetDateTime::now_utc());
|
||||
|
||||
warn!("rebalanceMeta: loaded meta done");
|
||||
warn!("rebalanceMeta load_with_opts: loaded meta done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -196,6 +194,7 @@ impl RebalanceMeta {
|
||||
|
||||
pub async fn save_with_opts<S: StorageAPI>(&self, store: Arc<S>, opts: ObjectOptions) -> Result<()> {
|
||||
if self.pool_stats.is_empty() {
|
||||
warn!("rebalanceMeta save_with_opts: no pool stats");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -218,7 +217,7 @@ impl ECStore {
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn load_rebalance_meta(&self) -> Result<()> {
|
||||
let mut meta = RebalanceMeta::new();
|
||||
warn!("rebalanceMeta: load rebalance meta");
|
||||
warn!("rebalanceMeta: store load rebalance meta");
|
||||
match meta.load(self.pools[0].clone()).await {
|
||||
Ok(_) => {
|
||||
warn!("rebalanceMeta: rebalance meta loaded0");
|
||||
@@ -255,9 +254,18 @@ impl ECStore {
|
||||
pub async fn update_rebalance_stats(&self) -> Result<()> {
|
||||
let mut ok = false;
|
||||
|
||||
let pool_stats = {
|
||||
let rebalance_meta = self.rebalance_meta.read().await;
|
||||
rebalance_meta.as_ref().map(|v| v.pool_stats.clone()).unwrap_or_default()
|
||||
};
|
||||
|
||||
warn!("update_rebalance_stats: pool_stats: {:?}", &pool_stats);
|
||||
|
||||
for i in 0..self.pools.len() {
|
||||
if self.find_index(i).await.is_none() {
|
||||
if pool_stats.get(i).is_none() {
|
||||
warn!("update_rebalance_stats: pool {} not found", i);
|
||||
let mut rebalance_meta = self.rebalance_meta.write().await;
|
||||
warn!("update_rebalance_stats: pool {} not found, add", i);
|
||||
if let Some(meta) = rebalance_meta.as_mut() {
|
||||
meta.pool_stats.push(RebalanceStats::default());
|
||||
}
|
||||
@@ -267,23 +275,24 @@ impl ECStore {
|
||||
}
|
||||
|
||||
if ok {
|
||||
let mut rebalance_meta = self.rebalance_meta.write().await;
|
||||
if let Some(meta) = rebalance_meta.as_mut() {
|
||||
warn!("update_rebalance_stats: save rebalance meta");
|
||||
|
||||
let rebalance_meta = self.rebalance_meta.read().await;
|
||||
if let Some(meta) = rebalance_meta.as_ref() {
|
||||
meta.save(self.pools[0].clone()).await?;
|
||||
}
|
||||
drop(rebalance_meta);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_index(&self, index: usize) -> Option<usize> {
|
||||
if let Some(meta) = self.rebalance_meta.read().await.as_ref() {
|
||||
return meta.pool_stats.get(index).map(|_v| index);
|
||||
}
|
||||
// async fn find_index(&self, index: usize) -> Option<usize> {
|
||||
// if let Some(meta) = self.rebalance_meta.read().await.as_ref() {
|
||||
// return meta.pool_stats.get(index).map(|_v| index);
|
||||
// }
|
||||
|
||||
None
|
||||
}
|
||||
// None
|
||||
// }
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn init_rebalance_meta(&self, bucktes: Vec<String>) -> Result<String> {
|
||||
@@ -310,7 +319,7 @@ impl ECStore {
|
||||
|
||||
let mut pool_stats = Vec::with_capacity(self.pools.len());
|
||||
|
||||
let now = SystemTime::now();
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
for disk_stat in disk_stats.iter() {
|
||||
let mut pool_stat = RebalanceStats {
|
||||
@@ -369,20 +378,26 @@ impl ECStore {
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn next_rebal_bucket(&self, pool_index: usize) -> Result<Option<String>> {
|
||||
warn!("next_rebal_bucket: pool_index: {}", pool_index);
|
||||
let rebalance_meta = self.rebalance_meta.read().await;
|
||||
warn!("next_rebal_bucket: rebalance_meta: {:?}", rebalance_meta);
|
||||
if let Some(meta) = rebalance_meta.as_ref() {
|
||||
if let Some(pool_stat) = meta.pool_stats.get(pool_index) {
|
||||
if pool_stat.info.status == RebalStatus::Completed || !pool_stat.participating {
|
||||
warn!("next_rebal_bucket: pool_index: {} completed or not participating", pool_index);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if pool_stat.buckets.is_empty() {
|
||||
warn!("next_rebal_bucket: pool_index: {} buckets is empty", pool_index);
|
||||
return Ok(None);
|
||||
}
|
||||
warn!("next_rebal_bucket: pool_index: {} bucket: {}", pool_index, pool_stat.buckets[0]);
|
||||
return Ok(Some(pool_stat.buckets[0].clone()));
|
||||
}
|
||||
}
|
||||
|
||||
warn!("next_rebal_bucket: pool_index: {} None", pool_index);
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -392,18 +407,28 @@ impl ECStore {
|
||||
if let Some(meta) = rebalance_meta.as_mut() {
|
||||
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) {
|
||||
warn!("bucket_rebalance_done: buckets {:?}", &pool_stat.buckets);
|
||||
if let Some(idx) = pool_stat.buckets.iter().position(|b| b.as_str() == bucket.as_str()) {
|
||||
warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket);
|
||||
pool_stat.buckets.remove(idx);
|
||||
pool_stat.rebalanced_buckets.push(bucket);
|
||||
|
||||
// 使用 retain 来过滤掉要删除的 bucket
|
||||
let mut found = false;
|
||||
pool_stat.buckets.retain(|b| {
|
||||
if b.as_str() == bucket.as_str() {
|
||||
found = true;
|
||||
pool_stat.rebalanced_buckets.push(b.clone());
|
||||
false // 删除这个元素
|
||||
} else {
|
||||
true // 保留这个元素
|
||||
}
|
||||
});
|
||||
|
||||
if found {
|
||||
warn!("bucket_rebalance_done: bucket {} rebalanced", &bucket);
|
||||
return Ok(());
|
||||
} else {
|
||||
warn!("bucket_rebalance_done: bucket {} not found", bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warn!("bucket_rebalance_done: bucket {} not found", bucket);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -411,18 +436,28 @@ impl ECStore {
|
||||
let rebalance_meta = self.rebalance_meta.read().await;
|
||||
if let Some(ref meta) = *rebalance_meta {
|
||||
if meta.stopped_at.is_some() {
|
||||
warn!("is_rebalance_started: rebalance stopped");
|
||||
return false;
|
||||
}
|
||||
|
||||
meta.pool_stats.iter().enumerate().for_each(|(i, v)| {
|
||||
warn!(
|
||||
"is_rebalance_started: pool_index: {}, participating: {:?}, status: {:?}",
|
||||
i, v.participating, v.info.status
|
||||
);
|
||||
});
|
||||
|
||||
if meta
|
||||
.pool_stats
|
||||
.iter()
|
||||
.any(|v| v.participating && v.info.status != RebalStatus::Completed)
|
||||
{
|
||||
warn!("is_rebalance_started: rebalance started");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
warn!("is_rebalance_started: rebalance not started");
|
||||
false
|
||||
}
|
||||
|
||||
@@ -462,6 +497,7 @@ impl ECStore {
|
||||
|
||||
{
|
||||
let mut rebalance_meta = self.rebalance_meta.write().await;
|
||||
|
||||
if let Some(meta) = rebalance_meta.as_mut() {
|
||||
meta.cancel = Some(tx)
|
||||
} else {
|
||||
@@ -474,19 +510,25 @@ impl ECStore {
|
||||
|
||||
let participants = {
|
||||
if let Some(ref meta) = *self.rebalance_meta.read().await {
|
||||
if meta.stopped_at.is_some() {
|
||||
warn!("start_rebalance: rebalance already stopped exit");
|
||||
return;
|
||||
}
|
||||
// if meta.stopped_at.is_some() {
|
||||
// warn!("start_rebalance: rebalance already stopped exit");
|
||||
// return;
|
||||
// }
|
||||
|
||||
let mut participants = vec![false; meta.pool_stats.len()];
|
||||
for (i, pool_stat) in meta.pool_stats.iter().enumerate() {
|
||||
if pool_stat.info.status == RebalStatus::Started {
|
||||
participants[i] = pool_stat.participating;
|
||||
warn!("start_rebalance: pool {} status: {:?}", i, pool_stat.info.status);
|
||||
if pool_stat.info.status != RebalStatus::Started {
|
||||
warn!("start_rebalance: pool {} not started, skipping", i);
|
||||
continue;
|
||||
}
|
||||
|
||||
warn!("start_rebalance: pool {} participating: {:?}", i, pool_stat.participating);
|
||||
participants[i] = pool_stat.participating;
|
||||
}
|
||||
participants
|
||||
} else {
|
||||
warn!("start_rebalance:2 rebalance_meta is None exit");
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
@@ -497,11 +539,13 @@ impl ECStore {
|
||||
continue;
|
||||
}
|
||||
|
||||
if get_global_endpoints()
|
||||
.as_ref()
|
||||
.get(idx)
|
||||
.is_none_or(|v| v.endpoints.as_ref().first().is_none_or(|e| e.is_local))
|
||||
{
|
||||
if !get_global_endpoints().as_ref().get(idx).is_some_and(|v| {
|
||||
warn!("start_rebalance: pool {} endpoints: {:?}", idx, v.endpoints);
|
||||
v.endpoints.as_ref().first().is_some_and(|e| {
|
||||
warn!("start_rebalance: pool {} endpoint: {:?}, is_local: {}", idx, e, e.is_local);
|
||||
e.is_local
|
||||
})
|
||||
}) {
|
||||
warn!("start_rebalance: pool {} is not local, skipping", idx);
|
||||
continue;
|
||||
}
|
||||
@@ -522,13 +566,13 @@ impl ECStore {
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, rx))]
|
||||
async fn rebalance_buckets(self: &Arc<Self>, rx: B_Receiver<bool>, pool_index: usize) -> Result<()> {
|
||||
async fn rebalance_buckets(self: &Arc<Self>, mut rx: B_Receiver<bool>, pool_index: usize) -> Result<()> {
|
||||
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<Result<()>>(1);
|
||||
|
||||
// Save rebalance metadata periodically
|
||||
let store = self.clone();
|
||||
let save_task = tokio::spawn(async move {
|
||||
let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(10), Duration::from_secs(10));
|
||||
let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(30), Duration::from_secs(10));
|
||||
let mut msg: String;
|
||||
let mut quit = false;
|
||||
|
||||
@@ -537,14 +581,15 @@ impl ECStore {
|
||||
// TODO: cancel rebalance
|
||||
Some(result) = done_rx.recv() => {
|
||||
quit = true;
|
||||
let now = SystemTime::now();
|
||||
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
let state = match result {
|
||||
Ok(_) => {
|
||||
warn!("rebalance_buckets: completed");
|
||||
msg = format!("Rebalance completed at {:?}", now);
|
||||
RebalStatus::Completed},
|
||||
Err(err) => {
|
||||
warn!("rebalance_buckets: error: {:?}", err);
|
||||
// TODO: check stop
|
||||
if err.to_string().contains("canceled") {
|
||||
msg = format!("Rebalance stopped at {:?}", now);
|
||||
@@ -557,9 +602,11 @@ impl ECStore {
|
||||
};
|
||||
|
||||
{
|
||||
warn!("rebalance_buckets: save rebalance meta, pool_index: {}, state: {:?}", pool_index, state);
|
||||
let mut rebalance_meta = store.rebalance_meta.write().await;
|
||||
|
||||
if let Some(rbm) = rebalance_meta.as_mut() {
|
||||
warn!("rebalance_buckets: save rebalance meta2, pool_index: {}, state: {:?}", pool_index, state);
|
||||
rbm.pool_stats[pool_index].info.status = state;
|
||||
rbm.pool_stats[pool_index].info.end_time = Some(now);
|
||||
}
|
||||
@@ -568,7 +615,7 @@ impl ECStore {
|
||||
|
||||
}
|
||||
_ = timer.tick() => {
|
||||
let now = SystemTime::now();
|
||||
let now = OffsetDateTime::now_utc();
|
||||
msg = format!("Saving rebalance metadata at {:?}", now);
|
||||
}
|
||||
}
|
||||
@@ -576,7 +623,7 @@ impl ECStore {
|
||||
if let Err(err) = store.save_rebalance_stats(pool_index, RebalSaveOpt::Stats).await {
|
||||
error!("{} err: {:?}", msg, err);
|
||||
} else {
|
||||
info!(msg);
|
||||
warn!(msg);
|
||||
}
|
||||
|
||||
if quit {
|
||||
@@ -588,30 +635,41 @@ impl ECStore {
|
||||
}
|
||||
});
|
||||
|
||||
warn!("Pool {} rebalancing is started", pool_index + 1);
|
||||
warn!("Pool {} rebalancing is started", pool_index);
|
||||
|
||||
while let Some(bucket) = self.next_rebal_bucket(pool_index).await? {
|
||||
warn!("Rebalancing bucket: start {}", bucket);
|
||||
|
||||
if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await {
|
||||
if err.to_string().contains("not initialized") {
|
||||
warn!("rebalance_bucket: rebalance not initialized, continue");
|
||||
continue;
|
||||
}
|
||||
error!("Error rebalancing bucket {}: {:?}", bucket, err);
|
||||
done_tx.send(Err(err)).await.ok();
|
||||
loop {
|
||||
if let Ok(true) = rx.try_recv() {
|
||||
warn!("Pool {} rebalancing is stopped", pool_index);
|
||||
done_tx.send(Err(Error::other("rebalance stopped canceled"))).await.ok();
|
||||
break;
|
||||
}
|
||||
|
||||
warn!("Rebalance bucket: done {} ", bucket);
|
||||
self.bucket_rebalance_done(pool_index, bucket).await?;
|
||||
if let Some(bucket) = self.next_rebal_bucket(pool_index).await? {
|
||||
warn!("Rebalancing bucket: start {}", bucket);
|
||||
|
||||
if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await {
|
||||
if err.to_string().contains("not initialized") {
|
||||
warn!("rebalance_bucket: rebalance not initialized, continue");
|
||||
continue;
|
||||
}
|
||||
error!("Error rebalancing bucket {}: {:?}", bucket, err);
|
||||
done_tx.send(Err(err)).await.ok();
|
||||
break;
|
||||
}
|
||||
|
||||
warn!("Rebalance bucket: done {} ", bucket);
|
||||
self.bucket_rebalance_done(pool_index, bucket).await?;
|
||||
} else {
|
||||
warn!("Rebalance bucket: no bucket to rebalance");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
warn!("Pool {} rebalancing is done", pool_index + 1);
|
||||
warn!("Pool {} rebalancing is done", pool_index);
|
||||
|
||||
done_tx.send(Ok(())).await.ok();
|
||||
save_task.await.ok();
|
||||
|
||||
warn!("Pool {} rebalancing is done2", pool_index);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -622,6 +680,7 @@ impl ECStore {
|
||||
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) {
|
||||
// Check if the pool's rebalance status is already completed
|
||||
if pool_stat.info.status == RebalStatus::Completed {
|
||||
warn!("check_if_rebalance_done: pool {} is already completed", pool_index);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -631,7 +690,8 @@ impl ECStore {
|
||||
// Mark pool rebalance as done if within 5% of the PercentFreeGoal
|
||||
if (pfi - meta.percent_free_goal).abs() <= 0.05 {
|
||||
pool_stat.info.status = RebalStatus::Completed;
|
||||
pool_stat.info.end_time = Some(SystemTime::now());
|
||||
pool_stat.info.end_time = Some(OffsetDateTime::now_utc());
|
||||
warn!("check_if_rebalance_done: pool {} is completed, pfi: {}", pool_index, pfi);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -641,24 +701,30 @@ impl ECStore {
|
||||
}
|
||||
|
||||
#[allow(unused_assignments)]
|
||||
#[tracing::instrument(skip(self, wk, set))]
|
||||
#[tracing::instrument(skip(self, set))]
|
||||
async fn rebalance_entry(
|
||||
&self,
|
||||
bucket: String,
|
||||
pool_index: usize,
|
||||
entry: MetaCacheEntry,
|
||||
set: Arc<SetDisks>,
|
||||
wk: Arc<Workers>,
|
||||
// wk: Arc<Workers>,
|
||||
) {
|
||||
defer!(|| async {
|
||||
wk.give().await;
|
||||
});
|
||||
warn!("rebalance_entry: start rebalance_entry");
|
||||
|
||||
// defer!(|| async {
|
||||
// warn!("rebalance_entry: defer give worker start");
|
||||
// wk.give().await;
|
||||
// warn!("rebalance_entry: defer give worker done");
|
||||
// });
|
||||
|
||||
if entry.is_dir() {
|
||||
warn!("rebalance_entry: entry is dir, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
if self.check_if_rebalance_done(pool_index).await {
|
||||
warn!("rebalance_entry: rebalance done, skipping pool {}", pool_index);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -666,6 +732,7 @@ impl ECStore {
|
||||
Ok(fivs) => fivs,
|
||||
Err(err) => {
|
||||
error!("rebalance_entry Error getting file info versions: {}", err);
|
||||
warn!("rebalance_entry: Error getting file info versions, skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -676,7 +743,7 @@ impl ECStore {
|
||||
let expired: usize = 0;
|
||||
for version in fivs.versions.iter() {
|
||||
if version.is_remote() {
|
||||
info!("rebalance_entry Entry {} is remote, skipping", version.name);
|
||||
warn!("rebalance_entry Entry {} is remote, skipping", version.name);
|
||||
continue;
|
||||
}
|
||||
// TODO: filterLifecycle
|
||||
@@ -684,7 +751,7 @@ impl ECStore {
|
||||
let remaining_versions = fivs.versions.len() - expired;
|
||||
if version.deleted && remaining_versions == 1 {
|
||||
rebalanced += 1;
|
||||
info!("rebalance_entry Entry {} is deleted and last version, skipping", version.name);
|
||||
warn!("rebalance_entry Entry {} is deleted and last version, skipping", version.name);
|
||||
continue;
|
||||
}
|
||||
let version_id = version.version_id.map(|v| v.to_string());
|
||||
@@ -735,6 +802,7 @@ impl ECStore {
|
||||
}
|
||||
|
||||
for _i in 0..3 {
|
||||
warn!("rebalance_entry: get_object_reader, bucket: {}, version: {}", &bucket, &version.name);
|
||||
let rd = match set
|
||||
.get_object_reader(
|
||||
bucket.as_str(),
|
||||
@@ -753,6 +821,10 @@ impl ECStore {
|
||||
Err(err) => {
|
||||
if is_err_object_not_found(&err) || is_err_version_not_found(&err) {
|
||||
ignore = true;
|
||||
warn!(
|
||||
"rebalance_entry: get_object_reader, bucket: {}, version: {}, ignore",
|
||||
&bucket, &version.name
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -765,7 +837,7 @@ impl ECStore {
|
||||
if let Err(err) = self.rebalance_object(pool_index, bucket.clone(), rd).await {
|
||||
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
|
||||
ignore = true;
|
||||
info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
|
||||
warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -780,7 +852,7 @@ impl ECStore {
|
||||
}
|
||||
|
||||
if ignore {
|
||||
info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
|
||||
warn!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -812,7 +884,7 @@ impl ECStore {
|
||||
{
|
||||
error!("rebalance_entry: delete_object err {:?}", &err);
|
||||
} else {
|
||||
info!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name);
|
||||
warn!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -957,26 +1029,29 @@ impl ECStore {
|
||||
|
||||
let pool = self.pools[pool_index].clone();
|
||||
|
||||
let wk = Workers::new(pool.disk_set.len() * 2).map_err(Error::other)?;
|
||||
let mut jobs = Vec::new();
|
||||
|
||||
// let wk = Workers::new(pool.disk_set.len() * 2).map_err(Error::other)?;
|
||||
// wk.clone().take().await;
|
||||
for (set_idx, set) in pool.disk_set.iter().enumerate() {
|
||||
wk.clone().take().await;
|
||||
|
||||
let rebalance_entry: ListCallback = Arc::new({
|
||||
let this = Arc::clone(self);
|
||||
let bucket = bucket.clone();
|
||||
let wk = wk.clone();
|
||||
// let wk = wk.clone();
|
||||
let set = set.clone();
|
||||
move |entry: MetaCacheEntry| {
|
||||
let this = this.clone();
|
||||
let bucket = bucket.clone();
|
||||
let wk = wk.clone();
|
||||
// let wk = wk.clone();
|
||||
let set = set.clone();
|
||||
Box::pin(async move {
|
||||
wk.take().await;
|
||||
tokio::spawn(async move {
|
||||
this.rebalance_entry(bucket, pool_index, entry, set, wk).await;
|
||||
});
|
||||
warn!("rebalance_entry: rebalance_entry spawn start");
|
||||
// wk.take().await;
|
||||
// tokio::spawn(async move {
|
||||
warn!("rebalance_entry: rebalance_entry spawn start2");
|
||||
this.rebalance_entry(bucket, pool_index, entry, set).await;
|
||||
warn!("rebalance_entry: rebalance_entry spawn done");
|
||||
// });
|
||||
})
|
||||
}
|
||||
});
|
||||
@@ -984,62 +1059,68 @@ impl ECStore {
|
||||
let set = set.clone();
|
||||
let rx = rx.resubscribe();
|
||||
let bucket = bucket.clone();
|
||||
let wk = wk.clone();
|
||||
tokio::spawn(async move {
|
||||
// let wk = wk.clone();
|
||||
|
||||
let job = tokio::spawn(async move {
|
||||
if let Err(err) = set.list_objects_to_rebalance(rx, bucket, rebalance_entry).await {
|
||||
error!("Rebalance worker {} error: {}", set_idx, err);
|
||||
} else {
|
||||
info!("Rebalance worker {} done", set_idx);
|
||||
}
|
||||
wk.clone().give().await;
|
||||
// wk.clone().give().await;
|
||||
});
|
||||
|
||||
jobs.push(job);
|
||||
}
|
||||
|
||||
wk.wait().await;
|
||||
// wk.wait().await;
|
||||
for job in jobs {
|
||||
job.await.unwrap();
|
||||
}
|
||||
warn!("rebalance_bucket: rebalance_bucket done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn save_rebalance_stats(&self, pool_idx: usize, opt: RebalSaveOpt) -> Result<()> {
|
||||
// TODO: NSLOOK
|
||||
|
||||
// TODO: lock
|
||||
let mut meta = RebalanceMeta::new();
|
||||
meta.load_with_opts(
|
||||
self.pools[0].clone(),
|
||||
ObjectOptions {
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if opt == RebalSaveOpt::StoppedAt {
|
||||
meta.stopped_at = Some(SystemTime::now());
|
||||
}
|
||||
|
||||
let mut rebalance_meta = self.rebalance_meta.write().await;
|
||||
|
||||
if let Some(rb) = rebalance_meta.as_mut() {
|
||||
if opt == RebalSaveOpt::Stats {
|
||||
meta.pool_stats[pool_idx] = rb.pool_stats[pool_idx].clone();
|
||||
if let Err(err) = meta.load(self.pools[0].clone()).await {
|
||||
if err != Error::ConfigNotFound {
|
||||
warn!("save_rebalance_stats: load err: {:?}", err);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
*rb = meta;
|
||||
} else {
|
||||
*rebalance_meta = Some(meta);
|
||||
}
|
||||
|
||||
if let Some(meta) = rebalance_meta.as_mut() {
|
||||
meta.save_with_opts(
|
||||
self.pools[0].clone(),
|
||||
ObjectOptions {
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
match opt {
|
||||
RebalSaveOpt::Stats => {
|
||||
{
|
||||
let mut rebalance_meta = self.rebalance_meta.write().await;
|
||||
if let Some(rbm) = rebalance_meta.as_mut() {
|
||||
meta.pool_stats[pool_idx] = rbm.pool_stats[pool_idx].clone();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(pool_stat) = meta.pool_stats.get_mut(pool_idx) {
|
||||
pool_stat.info.end_time = Some(OffsetDateTime::now_utc());
|
||||
}
|
||||
}
|
||||
RebalSaveOpt::StoppedAt => {
|
||||
meta.stopped_at = Some(OffsetDateTime::now_utc());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let mut rebalance_meta = self.rebalance_meta.write().await;
|
||||
*rebalance_meta = Some(meta.clone());
|
||||
}
|
||||
|
||||
warn!(
|
||||
"save_rebalance_stats: save rebalance meta, pool_idx: {}, opt: {:?}, meta: {:?}",
|
||||
pool_idx, opt, meta
|
||||
);
|
||||
meta.save(self.pools[0].clone()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1052,12 +1133,15 @@ impl SetDisks {
|
||||
bucket: String,
|
||||
cb: ListCallback,
|
||||
) -> Result<()> {
|
||||
warn!("list_objects_to_rebalance: start list_objects_to_rebalance");
|
||||
// Placeholder for actual object listing logic
|
||||
let (disks, _) = self.get_online_disks_with_healing(false).await;
|
||||
if disks.is_empty() {
|
||||
warn!("list_objects_to_rebalance: no disk available");
|
||||
return Err(Error::other("errNoDiskAvailable"));
|
||||
}
|
||||
|
||||
warn!("list_objects_to_rebalance: get online disks with healing");
|
||||
let listing_quorum = self.set_drive_count.div_ceil(2);
|
||||
|
||||
let resolver = MetadataResolutionParams {
|
||||
@@ -1075,7 +1159,10 @@ impl SetDisks {
|
||||
bucket: bucket.clone(),
|
||||
recursice: true,
|
||||
min_disks: listing_quorum,
|
||||
agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))),
|
||||
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
|
||||
warn!("list_objects_to_rebalance: agreed: {:?}", &entry.name);
|
||||
Box::pin(cb1(entry))
|
||||
})),
|
||||
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {
|
||||
// let cb = cb.clone();
|
||||
let resolver = resolver.clone();
|
||||
@@ -1083,11 +1170,11 @@ impl SetDisks {
|
||||
|
||||
match entries.resolve(resolver) {
|
||||
Some(entry) => {
|
||||
warn!("rebalance: list_objects_to_decommission get {}", &entry.name);
|
||||
warn!("list_objects_to_rebalance: list_objects_to_decommission get {}", &entry.name);
|
||||
Box::pin(async move { cb(entry).await })
|
||||
}
|
||||
None => {
|
||||
warn!("rebalance: list_objects_to_decommission get none");
|
||||
warn!("list_objects_to_rebalance: list_objects_to_decommission get none");
|
||||
Box::pin(async {})
|
||||
}
|
||||
}
|
||||
@@ -1097,6 +1184,7 @@ impl SetDisks {
|
||||
)
|
||||
.await?;
|
||||
|
||||
warn!("list_objects_to_rebalance: list_objects_to_rebalance done");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -860,6 +860,7 @@ impl SetDisks {
|
||||
};
|
||||
|
||||
if let Some(err) = reduce_read_quorum_errs(errs, OBJECT_OP_IGNORED_ERRS, expected_rquorum) {
|
||||
error!("object_quorum_from_meta: {:?}, errs={:?}", err, errs);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
@@ -872,6 +873,7 @@ impl SetDisks {
|
||||
let parity_blocks = Self::common_parity(&parities, default_parity_count as i32);
|
||||
|
||||
if parity_blocks < 0 {
|
||||
error!("object_quorum_from_meta: parity_blocks < 0, errs={:?}", errs);
|
||||
return Err(DiskError::ErasureReadQuorum);
|
||||
}
|
||||
|
||||
@@ -936,6 +938,7 @@ impl SetDisks {
|
||||
Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count).map_err(map_err_notfound)?;
|
||||
|
||||
if read_quorum < 0 {
|
||||
error!("check_upload_id_exists: read_quorum < 0, errs={:?}", errs);
|
||||
return Err(Error::ErasureReadQuorum);
|
||||
}
|
||||
|
||||
@@ -977,6 +980,7 @@ impl SetDisks {
|
||||
quorum: usize,
|
||||
) -> disk::error::Result<FileInfo> {
|
||||
if quorum < 1 {
|
||||
error!("find_file_info_in_quorum: quorum < 1");
|
||||
return Err(DiskError::ErasureReadQuorum);
|
||||
}
|
||||
|
||||
@@ -1035,6 +1039,7 @@ impl SetDisks {
|
||||
}
|
||||
|
||||
if max_count < quorum {
|
||||
error!("find_file_info_in_quorum: max_count < quorum, max_val={:?}", max_val);
|
||||
return Err(DiskError::ErasureReadQuorum);
|
||||
}
|
||||
|
||||
@@ -1079,7 +1084,7 @@ impl SetDisks {
|
||||
return Ok(fi);
|
||||
}
|
||||
|
||||
warn!("QuorumError::Read, find_file_info_in_quorum fileinfo not found");
|
||||
error!("find_file_info_in_quorum: fileinfo not found");
|
||||
|
||||
Err(DiskError::ErasureReadQuorum)
|
||||
}
|
||||
@@ -1763,10 +1768,18 @@ impl SetDisks {
|
||||
|
||||
let _min_disks = self.set_drive_count - self.default_parity_count;
|
||||
|
||||
let (read_quorum, _) = Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count)
|
||||
.map_err(|err| to_object_err(err.into(), vec![bucket, object]))?;
|
||||
let (read_quorum, _) = match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count)
|
||||
.map_err(|err| to_object_err(err.into(), vec![bucket, object]))
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("Self::object_quorum_from_meta: {:?}, bucket: {}, object: {}", &e, bucket, object);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum as usize) {
|
||||
error!("reduce_read_quorum_errs: {:?}, bucket: {}, object: {}", &err, bucket, object);
|
||||
return Err(to_object_err(err.into(), vec![bucket, object]));
|
||||
}
|
||||
|
||||
@@ -1896,6 +1909,7 @@ impl SetDisks {
|
||||
let nil_count = errors.iter().filter(|&e| e.is_none()).count();
|
||||
if nil_count < erasure.data_shards {
|
||||
if let Some(read_err) = reduce_read_quorum_errs(&errors, OBJECT_OP_IGNORED_ERRS, erasure.data_shards) {
|
||||
error!("create_bitrot_reader reduce_read_quorum_errs {:?}", &errors);
|
||||
return Err(to_object_err(read_err.into(), vec![bucket, object]));
|
||||
}
|
||||
|
||||
@@ -2942,6 +2956,7 @@ impl SetDisks {
|
||||
}
|
||||
Ok(m)
|
||||
} else {
|
||||
error!("delete_if_dang_ling: is_object_dang_ling errs={:?}", errs);
|
||||
Err(DiskError::ErasureReadQuorum)
|
||||
}
|
||||
}
|
||||
@@ -3004,41 +3019,56 @@ impl SetDisks {
|
||||
}
|
||||
|
||||
let (buckets_results_tx, mut buckets_results_rx) = mpsc::channel::<DataUsageEntryInfo>(disks.len());
|
||||
// 新增:从环境变量读取基础间隔,默认30秒
|
||||
let set_disk_update_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.unwrap_or(30);
|
||||
let update_time = {
|
||||
let mut rng = rand::rng();
|
||||
Duration::from_secs(30) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0))
|
||||
Duration::from_secs(set_disk_update_interval_secs) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0))
|
||||
};
|
||||
let mut ticker = interval(update_time);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let last_save = Some(SystemTime::now());
|
||||
let mut need_loop = true;
|
||||
while need_loop {
|
||||
select! {
|
||||
_ = ticker.tick() => {
|
||||
if !cache.info.last_update.eq(&last_save) {
|
||||
let _ = cache.save(DATA_USAGE_CACHE_NAME).await;
|
||||
let _ = updates.send(cache.clone()).await;
|
||||
}
|
||||
}
|
||||
result = buckets_results_rx.recv() => {
|
||||
match result {
|
||||
Some(result) => {
|
||||
cache.replace(&result.name, &result.parent, result.entry);
|
||||
cache.info.last_update = Some(SystemTime::now());
|
||||
},
|
||||
None => {
|
||||
need_loop = false;
|
||||
cache.info.next_cycle = want_cycle;
|
||||
cache.info.last_update = Some(SystemTime::now());
|
||||
// 检查是否需要运行后台任务
|
||||
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<bool>().ok())
|
||||
.unwrap_or(false);
|
||||
|
||||
let task = if !skip_background_task {
|
||||
Some(tokio::spawn(async move {
|
||||
let last_save = Some(SystemTime::now());
|
||||
let mut need_loop = true;
|
||||
while need_loop {
|
||||
select! {
|
||||
_ = ticker.tick() => {
|
||||
if !cache.info.last_update.eq(&last_save) {
|
||||
let _ = cache.save(DATA_USAGE_CACHE_NAME).await;
|
||||
let _ = updates.send(cache.clone()).await;
|
||||
}
|
||||
}
|
||||
result = buckets_results_rx.recv() => {
|
||||
match result {
|
||||
Some(result) => {
|
||||
cache.replace(&result.name, &result.parent, result.entry);
|
||||
cache.info.last_update = Some(SystemTime::now());
|
||||
},
|
||||
None => {
|
||||
need_loop = false;
|
||||
cache.info.next_cycle = want_cycle;
|
||||
cache.info.last_update = Some(SystemTime::now());
|
||||
let _ = cache.save(DATA_USAGE_CACHE_NAME).await;
|
||||
let _ = updates.send(cache.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Restrict parallelism for disk usage scanner
|
||||
let max_procs = num_cpus::get();
|
||||
@@ -3142,7 +3172,9 @@ impl SetDisks {
|
||||
|
||||
info!("ns_scanner start");
|
||||
let _ = join_all(futures).await;
|
||||
let _ = task.await;
|
||||
if let Some(task) = task {
|
||||
let _ = task.await;
|
||||
}
|
||||
info!("ns_scanner completed");
|
||||
Ok(())
|
||||
}
|
||||
@@ -3894,7 +3926,13 @@ impl ObjectIO for SetDisks {
|
||||
|
||||
let stream = mem::replace(&mut data.stream, HashReader::new(Box::new(Cursor::new(Vec::new())), 0, 0, None, false)?);
|
||||
|
||||
let (reader, w_size) = Arc::new(erasure).encode(stream, &mut writers, write_quorum).await?; // TODO: 出错,删除临时目录
|
||||
let (reader, w_size) = match Arc::new(erasure).encode(stream, &mut writers, write_quorum).await {
|
||||
Ok((r, w)) => (r, w),
|
||||
Err(e) => {
|
||||
error!("encode err {:?}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}; // TODO: 出错,删除临时目录
|
||||
|
||||
let _ = mem::replace(&mut data.stream, reader);
|
||||
// if let Err(err) = close_bitrot_writers(&mut writers).await {
|
||||
|
||||
@@ -847,9 +847,26 @@ impl ECStore {
|
||||
let (update_closer_tx, mut update_close_rx) = mpsc::channel(10);
|
||||
let mut ctx_clone = cancel.subscribe();
|
||||
let all_buckets_clone = all_buckets.clone();
|
||||
// 新增:从环境变量读取interval,默认30秒
|
||||
let ns_scanner_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.unwrap_or(30);
|
||||
|
||||
// 检查是否跳过后台任务
|
||||
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<bool>().ok())
|
||||
.unwrap_or(false);
|
||||
|
||||
if skip_background_task {
|
||||
info!("跳过后台任务执行: RUSTFS_SKIP_BACKGROUND_TASK=true");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let mut last_update: Option<SystemTime> = None;
|
||||
let mut interval = interval(Duration::from_secs(30));
|
||||
let mut interval = interval(Duration::from_secs(ns_scanner_interval_secs));
|
||||
let all_merged = Arc::new(RwLock::new(DataUsageCache::default()));
|
||||
loop {
|
||||
select! {
|
||||
|
||||
@@ -364,6 +364,7 @@ impl ECStore {
|
||||
max_keys: i32,
|
||||
) -> Result<ListObjectVersionsInfo> {
|
||||
if marker.is_none() && version_marker.is_some() {
|
||||
warn!("inner_list_object_versions: marker is none and version_marker is some");
|
||||
return Err(StorageError::NotImplemented);
|
||||
}
|
||||
|
||||
|
||||
@@ -95,38 +95,45 @@ where
|
||||
self.clone().save_iam_formatter().await?;
|
||||
self.clone().load().await?;
|
||||
|
||||
// Background thread starts periodic updates or receives signal updates
|
||||
tokio::spawn({
|
||||
let s = Arc::clone(&self);
|
||||
async move {
|
||||
let ticker = tokio::time::interval(Duration::from_secs(120));
|
||||
tokio::pin!(ticker, reciver);
|
||||
loop {
|
||||
select! {
|
||||
_ = ticker.tick() => {
|
||||
if let Err(err) =s.clone().load().await{
|
||||
error!("iam load err {:?}", err);
|
||||
}
|
||||
},
|
||||
i = reciver.recv() => {
|
||||
match i {
|
||||
Some(t) => {
|
||||
let last = s.last_timestamp.load(Ordering::Relaxed);
|
||||
if last <= t {
|
||||
// 检查环境变量是否设置
|
||||
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK").is_ok();
|
||||
|
||||
if let Err(err) =s.clone().load().await{
|
||||
error!("iam load err {:?}", err);
|
||||
if !skip_background_task {
|
||||
// Background thread starts periodic updates or receives signal updates
|
||||
tokio::spawn({
|
||||
let s = Arc::clone(&self);
|
||||
async move {
|
||||
let ticker = tokio::time::interval(Duration::from_secs(120));
|
||||
tokio::pin!(ticker, reciver);
|
||||
loop {
|
||||
select! {
|
||||
_ = ticker.tick() => {
|
||||
warn!("iam load ticker");
|
||||
if let Err(err) =s.clone().load().await{
|
||||
error!("iam load err {:?}", err);
|
||||
}
|
||||
},
|
||||
i = reciver.recv() => {
|
||||
warn!("iam load reciver");
|
||||
match i {
|
||||
Some(t) => {
|
||||
let last = s.last_timestamp.load(Ordering::Relaxed);
|
||||
if last <= t {
|
||||
warn!("iam load reciver load");
|
||||
if let Err(err) =s.clone().load().await{
|
||||
error!("iam load err {:?}", err);
|
||||
}
|
||||
ticker.reset();
|
||||
}
|
||||
ticker.reset();
|
||||
}
|
||||
},
|
||||
None => return,
|
||||
},
|
||||
None => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -10,7 +10,8 @@ use http::{HeaderMap, StatusCode};
|
||||
use matchit::Params;
|
||||
use s3s::{Body, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::admin::router::Operation;
|
||||
@@ -56,8 +57,8 @@ pub struct RebalanceAdminStatus {
|
||||
pub id: String, // Identifies the ongoing rebalance operation by a UUID
|
||||
#[serde(rename = "pools")]
|
||||
pub pools: Vec<RebalancePoolStatus>, // Contains all pools, including inactive
|
||||
#[serde(rename = "stoppedAt")]
|
||||
pub stopped_at: Option<SystemTime>, // Optional timestamp when rebalance was stopped
|
||||
#[serde(rename = "stoppedAt", with = "offsetdatetime_rfc3339")]
|
||||
pub stopped_at: Option<OffsetDateTime>, // Optional timestamp when rebalance was stopped
|
||||
}
|
||||
|
||||
pub struct RebalanceStart {}
|
||||
@@ -101,11 +102,13 @@ impl Operation for RebalanceStart {
|
||||
}
|
||||
};
|
||||
|
||||
store.start_rebalance().await;
|
||||
|
||||
warn!("Rebalance started with id: {}", id);
|
||||
if let Some(notification_sys) = get_global_notification_sys() {
|
||||
warn!("Loading rebalance meta");
|
||||
warn!("RebalanceStart Loading rebalance meta start");
|
||||
notification_sys.load_rebalance_meta(true).await;
|
||||
warn!("Rebalance meta loaded");
|
||||
warn!("RebalanceStart Loading rebalance meta done");
|
||||
}
|
||||
|
||||
let resp = RebalanceResp { id };
|
||||
@@ -175,15 +178,14 @@ impl Operation for RebalanceStatus {
|
||||
let total_bytes_to_rebal = ps.init_capacity as f64 * meta.percent_free_goal - ps.init_free_space as f64;
|
||||
|
||||
let mut elapsed = if let Some(start_time) = ps.info.start_time {
|
||||
SystemTime::now()
|
||||
.duration_since(start_time)
|
||||
.map_err(|e| s3_error!(InternalError, "Failed to calculate elapsed time: {}", e))?
|
||||
let now = OffsetDateTime::now_utc();
|
||||
now - start_time
|
||||
} else {
|
||||
return Err(s3_error!(InternalError, "Start time is not available"));
|
||||
};
|
||||
|
||||
let mut eta = if ps.bytes > 0 {
|
||||
Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_secs_f64() / ps.bytes as f64)
|
||||
Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_seconds_f64() / ps.bytes as f64)
|
||||
} else {
|
||||
Duration::ZERO
|
||||
};
|
||||
@@ -193,10 +195,8 @@ impl Operation for RebalanceStatus {
|
||||
}
|
||||
|
||||
if let Some(stopped_at) = stop_time {
|
||||
if let Ok(du) = stopped_at.duration_since(ps.info.start_time.unwrap_or(stopped_at)) {
|
||||
elapsed = du;
|
||||
} else {
|
||||
return Err(s3_error!(InternalError, "Failed to calculate elapsed time"));
|
||||
if let Some(start_time) = ps.info.start_time {
|
||||
elapsed = stopped_at - start_time;
|
||||
}
|
||||
|
||||
eta = Duration::ZERO;
|
||||
@@ -208,7 +208,7 @@ impl Operation for RebalanceStatus {
|
||||
bytes: ps.bytes,
|
||||
bucket: ps.bucket.clone(),
|
||||
object: ps.object.clone(),
|
||||
elapsed: elapsed.as_secs(),
|
||||
elapsed: elapsed.whole_seconds() as u64,
|
||||
eta: eta.as_secs(),
|
||||
});
|
||||
}
|
||||
@@ -244,10 +244,45 @@ impl Operation for RebalanceStop {
|
||||
.await
|
||||
.map_err(|e| s3_error!(InternalError, "Failed to stop rebalance: {}", e))?;
|
||||
|
||||
warn!("handle RebalanceStop save_rebalance_stats done ");
|
||||
if let Some(notification_sys) = get_global_notification_sys() {
|
||||
notification_sys.load_rebalance_meta(true).await;
|
||||
warn!("handle RebalanceStop notification_sys load_rebalance_meta");
|
||||
notification_sys.load_rebalance_meta(false).await;
|
||||
warn!("handle RebalanceStop notification_sys load_rebalance_meta done");
|
||||
}
|
||||
|
||||
return Err(s3_error!(NotImplemented));
|
||||
Ok(S3Response::new((StatusCode::OK, Body::empty())))
|
||||
}
|
||||
}
|
||||
|
||||
mod offsetdatetime_rfc3339 {
|
||||
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
|
||||
|
||||
pub fn serialize<S>(dt: &Option<OffsetDateTime>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match dt {
|
||||
Some(dt) => {
|
||||
let s = dt.format(&Rfc3339).map_err(serde::ser::Error::custom)?;
|
||||
serializer.serialize_some(&s)
|
||||
}
|
||||
None => serializer.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<OffsetDateTime>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let opt = Option::<String>::deserialize(deserializer)?;
|
||||
match opt {
|
||||
Some(s) => {
|
||||
let dt = OffsetDateTime::parse(&s, &Rfc3339).map_err(serde::de::Error::custom)?;
|
||||
Ok(Some(dt))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use super::router::Operation;
|
||||
use super::router::S3Router;
|
||||
use crate::storage::ecfs::bytes_stream;
|
||||
use ecstore::disk::DiskAPI;
|
||||
use ecstore::disk::WalkDirOptions;
|
||||
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
|
||||
use ecstore::store::find_local_disk;
|
||||
use futures::TryStreamExt;
|
||||
@@ -18,6 +19,7 @@ use s3s::s3_error;
|
||||
use serde_urlencoded::from_bytes;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::warn;
|
||||
|
||||
pub const RPC_PREFIX: &str = "/rustfs/rpc";
|
||||
|
||||
@@ -28,12 +30,30 @@ pub fn regist_rpc_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
|
||||
AdminOperation(&ReadFile {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::HEAD,
|
||||
format!("{}{}", RPC_PREFIX, "/read_file_stream").as_str(),
|
||||
AdminOperation(&ReadFile {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", RPC_PREFIX, "/put_file_stream").as_str(),
|
||||
AdminOperation(&PutFile {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", RPC_PREFIX, "/walk_dir").as_str(),
|
||||
AdminOperation(&WalkDir {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::HEAD,
|
||||
format!("{}{}", RPC_PREFIX, "/walk_dir").as_str(),
|
||||
AdminOperation(&WalkDir {}),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -50,6 +70,9 @@ pub struct ReadFile {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ReadFile {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
if req.method == Method::HEAD {
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::empty())));
|
||||
}
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: ReadFileQuery =
|
||||
@@ -79,6 +102,61 @@ impl Operation for ReadFile {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, serde::Deserialize)]
|
||||
pub struct WalkDirQuery {
|
||||
disk: String,
|
||||
}
|
||||
|
||||
pub struct WalkDir {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for WalkDir {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
if req.method == Method::HEAD {
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::empty())));
|
||||
}
|
||||
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: WalkDirQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|e| s3_error!(InvalidArgument, "get query failed1 {:?}", e))?;
|
||||
input
|
||||
} else {
|
||||
WalkDirQuery::default()
|
||||
}
|
||||
};
|
||||
|
||||
let mut input = req.input;
|
||||
let body = match input.store_all_unlimited().await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("get body failed, e: {:?}", e);
|
||||
return Err(s3_error!(InvalidRequest, "get body failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// let body_bytes = decrypt_data(input_cred.secret_key.expose().as_bytes(), &body)
|
||||
// .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, format!("decrypt_data err {}", e)))?;
|
||||
|
||||
let args: WalkDirOptions =
|
||||
serde_json::from_slice(&body).map_err(|e| s3_error!(InternalError, "unmarshal body err {}", e))?;
|
||||
let Some(disk) = find_local_disk(&query.disk).await else {
|
||||
return Err(s3_error!(InvalidArgument, "disk not found"));
|
||||
};
|
||||
|
||||
let (rd, mut wd) = tokio::io::duplex(DEFAULT_READ_BUFFER_SIZE);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = disk.walk_dir(args, &mut wd).await {
|
||||
warn!("walk dir err {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let body = Body::from(StreamingBlob::wrap(ReaderStream::with_capacity(rd, DEFAULT_READ_BUFFER_SIZE)));
|
||||
Ok(S3Response::new((StatusCode::OK, body)))
|
||||
}
|
||||
}
|
||||
|
||||
// /rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}"
|
||||
#[derive(Debug, Default, serde::Deserialize)]
|
||||
pub struct PutFileQuery {
|
||||
|
||||
@@ -807,11 +807,38 @@ impl Node for NodeService {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
if err == rustfs_filemeta::Error::Unexpected {
|
||||
let _ = tx
|
||||
.send(Ok(WalkDirResponse {
|
||||
success: false,
|
||||
meta_cache_entry: "".to_string(),
|
||||
error_info: Some(err.to_string()),
|
||||
}))
|
||||
.await;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if rustfs_filemeta::is_io_eof(&err) {
|
||||
let _ = tx
|
||||
.send(Ok(WalkDirResponse {
|
||||
success: false,
|
||||
meta_cache_entry: "".to_string(),
|
||||
error_info: Some(err.to_string()),
|
||||
}))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
|
||||
println!("get err {:?}", err);
|
||||
|
||||
let _ = tx
|
||||
.send(Ok(WalkDirResponse {
|
||||
success: false,
|
||||
meta_cache_entry: "".to_string(),
|
||||
error_info: Some(err.to_string()),
|
||||
}))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
11
scripts/dev_clear.sh
Normal file
11
scripts/dev_clear.sh
Normal file
@@ -0,0 +1,11 @@
|
||||
for i in {0..3}; do
|
||||
DIR="/data/rustfs$i"
|
||||
echo "处理 $DIR"
|
||||
if [ -d "$DIR" ]; then
|
||||
echo "清空 $DIR"
|
||||
sudo rm -rf "$DIR"/* "$DIR"/.[!.]* "$DIR"/..?* 2>/dev/null || true
|
||||
echo "已清空 $DIR"
|
||||
else
|
||||
echo "$DIR 不存在,跳过"
|
||||
fi
|
||||
done
|
||||
@@ -4,18 +4,20 @@
|
||||
|
||||
rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip
|
||||
# 压缩./target/x86_64-unknown-linux-musl/release/rustfs
|
||||
zip ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs
|
||||
|
||||
zip -j ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs
|
||||
|
||||
# 本地文件路径
|
||||
LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip"
|
||||
REMOTE_PATH="~"
|
||||
|
||||
# 定义服务器列表数组
|
||||
# 格式:服务器 IP 用户名 目标路径
|
||||
SERVER_LIST=(
|
||||
"root@121.89.80.13"
|
||||
)
|
||||
# 必须传入IP参数,否则报错退出
|
||||
if [ -z "$1" ]; then
|
||||
echo "用法: $0 <server_ip>"
|
||||
echo "请传入目标服务器IP地址"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
SERVER_LIST=("root@$1")
|
||||
|
||||
# 遍历服务器列表
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
@@ -26,7 +28,4 @@ for SERVER in "${SERVER_LIST[@]}"; do
|
||||
else
|
||||
echo "复制到 $SERVER 失败"
|
||||
fi
|
||||
done
|
||||
|
||||
|
||||
# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9
|
||||
done
|
||||
11
scripts/dev_rustfs.env
Normal file
11
scripts/dev_rustfs.env
Normal file
@@ -0,0 +1,11 @@
|
||||
RUSTFS_ROOT_USER=rustfsadmin
|
||||
RUSTFS_ROOT_PASSWORD=rustfsadmin
|
||||
|
||||
RUSTFS_VOLUMES="http://node{1...4}:7000/data/rustfs{0...3} http://node{5...8}:7000/data/rustfs{0...3}"
|
||||
RUSTFS_ADDRESS=":7000"
|
||||
RUSTFS_CONSOLE_ENABLE=true
|
||||
RUSTFS_CONSOLE_ADDRESS=":7001"
|
||||
RUST_LOG=warn
|
||||
RUSTFS_OBS_LOG_DIRECTORY="/var/logs/rustfs/"
|
||||
RUSTFS_NS_SCANNER_INTERVAL=60
|
||||
RUSTFS_SKIP_BACKGROUND_TASK=true
|
||||
199
scripts/dev_rustfs.sh
Normal file
199
scripts/dev_rustfs.sh
Normal file
@@ -0,0 +1,199 @@
|
||||
#!/bin/bash
|
||||
|
||||
# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9
|
||||
|
||||
# 本地 rustfs.zip 路径
|
||||
ZIP_FILE="./rustfs.zip"
|
||||
# 解压目标
|
||||
UNZIP_TARGET="./"
|
||||
|
||||
|
||||
SERVER_LIST=(
|
||||
"root@172.23.215.2" # node1
|
||||
"root@172.23.215.4" # node2
|
||||
"root@172.23.215.7" # node3
|
||||
"root@172.23.215.3" # node4
|
||||
"root@172.23.215.8" # node5
|
||||
"root@172.23.215.5" # node6
|
||||
"root@172.23.215.9" # node7
|
||||
"root@172.23.215.6" # node8
|
||||
)
|
||||
|
||||
REMOTE_TMP="~/rustfs"
|
||||
|
||||
# 部署 rustfs 到所有服务器
|
||||
deploy() {
|
||||
echo "解压 $ZIP_FILE ..."
|
||||
unzip -o "$ZIP_FILE" -d "$UNZIP_TARGET"
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "解压失败,退出"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
LOCAL_RUSTFS="${UNZIP_TARGET}rustfs"
|
||||
if [ ! -f "$LOCAL_RUSTFS" ]; then
|
||||
echo "未找到解压后的 rustfs 文件,退出"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "上传 $LOCAL_RUSTFS 到 $SERVER:$REMOTE_TMP"
|
||||
scp "$LOCAL_RUSTFS" "${SERVER}:${REMOTE_TMP}"
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "❌ 上传到 $SERVER 失败,跳过"
|
||||
continue
|
||||
fi
|
||||
|
||||
echo "在 $SERVER 上操作 systemctl 和文件替换"
|
||||
ssh "$SERVER" bash <<EOF
|
||||
set -e
|
||||
echo "停止 rustfs 服务"
|
||||
sudo systemctl stop rustfs || true
|
||||
echo "覆盖 /usr/local/bin/rustfs"
|
||||
sudo cp ~/rustfs /usr/local/bin/rustfs
|
||||
sudo chmod +x /usr/local/bin/rustfs
|
||||
echo "启动 rustfs 服务"
|
||||
sudo systemctl start rustfs
|
||||
echo "检测 rustfs 服务状态"
|
||||
sudo systemctl status rustfs --no-pager --lines=10
|
||||
EOF
|
||||
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "✅ $SERVER 部署并重启 rustfs 成功"
|
||||
else
|
||||
echo "❌ $SERVER 部署或重启 rustfs 失败"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# 清空 /data/rustfs0~3 目录下所有文件(包括隐藏文件)
|
||||
clear_data_dirs() {
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "清空 $SERVER:/data/rustfs0~3 下所有文件"
|
||||
ssh "$SERVER" bash <<EOF
|
||||
for i in {0..3}; do
|
||||
DIR="/data/rustfs$i"
|
||||
echo "处理 $DIR"
|
||||
if [ -d "$DIR" ]; then
|
||||
echo "清空 $DIR"
|
||||
sudo rm -rf "$DIR"/* "$DIR"/.[!.]* "$DIR"/..?* 2>/dev/null || true
|
||||
echo "已清空 $DIR"
|
||||
else
|
||||
echo "$DIR 不存在,跳过"
|
||||
fi
|
||||
done
|
||||
EOF
|
||||
done
|
||||
}
|
||||
|
||||
# 控制 rustfs 服务
|
||||
stop_rustfs() {
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "停止 $SERVER rustfs 服务"
|
||||
ssh "$SERVER" "sudo systemctl stop rustfs"
|
||||
done
|
||||
}
|
||||
|
||||
start_rustfs() {
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "启动 $SERVER rustfs 服务"
|
||||
ssh "$SERVER" "sudo systemctl start rustfs"
|
||||
done
|
||||
}
|
||||
|
||||
restart_rustfs() {
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "重启 $SERVER rustfs 服务"
|
||||
ssh "$SERVER" "sudo systemctl restart rustfs"
|
||||
done
|
||||
}
|
||||
|
||||
# 向所有服务器追加公钥到 ~/.ssh/authorized_keys
|
||||
add_ssh_key() {
|
||||
if [ -z "$2" ]; then
|
||||
echo "用法: $0 addkey <pubkey_file>"
|
||||
exit 1
|
||||
fi
|
||||
PUBKEY_FILE="$2"
|
||||
if [ ! -f "$PUBKEY_FILE" ]; then
|
||||
echo "指定的公钥文件不存在: $PUBKEY_FILE"
|
||||
exit 1
|
||||
fi
|
||||
PUBKEY_CONTENT=$(cat "$PUBKEY_FILE")
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "追加公钥到 $SERVER:~/.ssh/authorized_keys"
|
||||
ssh "$SERVER" "mkdir -p ~/.ssh && chmod 700 ~/.ssh && echo '$PUBKEY_CONTENT' >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys"
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "✅ $SERVER 公钥追加成功"
|
||||
else
|
||||
echo "❌ $SERVER 公钥追加失败"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
monitor_logs() {
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "监控 $SERVER:/var/logs/rustfs/rustfs.log ..."
|
||||
ssh "$SERVER" "tail -F /var/logs/rustfs/rustfs.log" |
|
||||
sed "s/^/[$SERVER] /" &
|
||||
done
|
||||
wait
|
||||
}
|
||||
|
||||
set_env_file() {
|
||||
if [ -z "$2" ]; then
|
||||
echo "用法: $0 setenv <env_file>"
|
||||
exit 1
|
||||
fi
|
||||
ENV_FILE="$2"
|
||||
if [ ! -f "$ENV_FILE" ]; then
|
||||
echo "指定的环境变量文件不存在: $ENV_FILE"
|
||||
exit 1
|
||||
fi
|
||||
for SERVER in "${SERVER_LIST[@]}"; do
|
||||
echo "上传 $ENV_FILE 到 $SERVER:~/rustfs.env"
|
||||
scp "$ENV_FILE" "${SERVER}:~/rustfs.env"
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "❌ 上传到 $SERVER 失败,跳过"
|
||||
continue
|
||||
fi
|
||||
echo "覆盖 $SERVER:/etc/default/rustfs"
|
||||
ssh "$SERVER" "sudo mv ~/rustfs.env /etc/default/rustfs"
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "✅ $SERVER /etc/default/rustfs 覆盖成功"
|
||||
else
|
||||
echo "❌ $SERVER /etc/default/rustfs 覆盖失败"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# 主命令分发
|
||||
case "$1" in
|
||||
deploy)
|
||||
deploy
|
||||
;;
|
||||
clear)
|
||||
clear_data_dirs
|
||||
;;
|
||||
stop)
|
||||
stop_rustfs
|
||||
;;
|
||||
start)
|
||||
start_rustfs
|
||||
;;
|
||||
restart)
|
||||
restart_rustfs
|
||||
;;
|
||||
addkey)
|
||||
add_ssh_key "$@"
|
||||
;;
|
||||
monitor_logs)
|
||||
monitor_logs
|
||||
;;
|
||||
setenv)
|
||||
set_env_file "$@"
|
||||
;;
|
||||
*)
|
||||
echo "用法: $0 {deploy|clear|stop|start|restart|addkey <pubkey_file>|monitor_logs|setenv <env_file>}"
|
||||
;;
|
||||
esac
|
||||
Reference in New Issue
Block a user