diff --git a/crates/rio/src/http_reader.rs b/crates/rio/src/http_reader.rs index 80801d05..62c39c1c 100644 --- a/crates/rio/src/http_reader.rs +++ b/crates/rio/src/http_reader.rs @@ -46,7 +46,7 @@ pin_project! { impl HttpReader { pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option>) -> io::Result { - http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}"); + // http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}"); Self::with_capacity(url, method, headers, body, 0).await } /// Create a new HttpReader from a URL. The request is performed immediately. @@ -57,10 +57,10 @@ impl HttpReader { body: Option>, _read_buf_size: usize, ) -> io::Result { - http_log!( - "[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}", - _read_buf_size - ); + // http_log!( + // "[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}", + // _read_buf_size + // ); // First, check if the connection is available (HEAD) let client = get_http_client(); let head_resp = client.head(&url).headers(headers.clone()).send().await; @@ -119,12 +119,12 @@ impl HttpReader { impl AsyncRead for HttpReader { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - http_log!( - "[HttpReader::poll_read] url: {}, method: {:?}, buf.remaining: {}", - self.url, - self.method, - buf.remaining() - ); + // http_log!( + // "[HttpReader::poll_read] url: {}, method: {:?}, buf.remaining: {}", + // self.url, + // self.method, + // buf.remaining() + // ); // Read from the inner stream Pin::new(&mut self.inner).poll_read(cx, buf) } @@ -157,20 +157,20 @@ impl Stream for ReceiverStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let poll = Pin::new(&mut self.receiver).poll_recv(cx); - match &poll { - Poll::Ready(Some(Some(bytes))) => { - http_log!("[ReceiverStream] poll_next: got {} bytes", bytes.len()); - } - Poll::Ready(Some(None)) => { - http_log!("[ReceiverStream] poll_next: sender shutdown"); - } - Poll::Ready(None) => { - http_log!("[ReceiverStream] poll_next: channel closed"); - } - Poll::Pending => { - // http_log!("[ReceiverStream] poll_next: pending"); - } - } + // match &poll { + // Poll::Ready(Some(Some(bytes))) => { + // // http_log!("[ReceiverStream] poll_next: got {} bytes", bytes.len()); + // } + // Poll::Ready(Some(None)) => { + // // http_log!("[ReceiverStream] poll_next: sender shutdown"); + // } + // Poll::Ready(None) => { + // // http_log!("[ReceiverStream] poll_next: channel closed"); + // } + // Poll::Pending => { + // // http_log!("[ReceiverStream] poll_next: pending"); + // } + // } match poll { Poll::Ready(Some(Some(bytes))) => Poll::Ready(Some(Ok(bytes))), Poll::Ready(Some(None)) => Poll::Ready(None), // Sender shutdown @@ -196,7 +196,7 @@ pin_project! { impl HttpWriter { /// Create a new HttpWriter for the given URL. The HTTP request is performed in the background. pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result { - http_log!("[HttpWriter::new] url: {url}, method: {method:?}, headers: {headers:?}"); + // http_log!("[HttpWriter::new] url: {url}, method: {method:?}, headers: {headers:?}"); let url_clone = url.clone(); let method_clone = method.clone(); let headers_clone = headers.clone(); @@ -206,13 +206,13 @@ impl HttpWriter { let resp = client.put(&url).headers(headers.clone()).body(Vec::new()).send().await; match resp { Ok(resp) => { - http_log!("[HttpWriter::new] empty PUT status: {}", resp.status()); + // http_log!("[HttpWriter::new] empty PUT status: {}", resp.status()); if !resp.status().is_success() { return Err(Error::other(format!("Empty PUT failed: status {}", resp.status()))); } } Err(e) => { - http_log!("[HttpWriter::new] empty PUT error: {e}"); + // http_log!("[HttpWriter::new] empty PUT error: {e}"); return Err(Error::other(format!("Empty PUT failed: {e}"))); } } @@ -223,9 +223,9 @@ impl HttpWriter { let handle = tokio::spawn(async move { let stream = ReceiverStream { receiver }; let body = reqwest::Body::wrap_stream(stream); - http_log!( - "[HttpWriter::spawn] sending HTTP request: url={url_clone}, method={method_clone:?}, headers={headers_clone:?}" - ); + // http_log!( + // "[HttpWriter::spawn] sending HTTP request: url={url_clone}, method={method_clone:?}, headers={headers_clone:?}" + // ); let client = get_http_client(); let request = client @@ -238,7 +238,7 @@ impl HttpWriter { match response { Ok(resp) => { - http_log!("[HttpWriter::spawn] got response: status={}", resp.status()); + // http_log!("[HttpWriter::spawn] got response: status={}", resp.status()); if !resp.status().is_success() { let _ = err_tx.send(Error::other(format!( "HttpWriter HTTP request failed with non-200 status {}", @@ -248,17 +248,17 @@ impl HttpWriter { } } Err(e) => { - http_log!("[HttpWriter::spawn] HTTP request error: {e}"); + // http_log!("[HttpWriter::spawn] HTTP request error: {e}"); let _ = err_tx.send(Error::other(format!("HTTP request failed: {}", e))); return Err(Error::other(format!("HTTP request failed: {}", e))); } } - http_log!("[HttpWriter::spawn] HTTP request completed, exiting"); + // http_log!("[HttpWriter::spawn] HTTP request completed, exiting"); Ok(()) }); - http_log!("[HttpWriter::new] connection established successfully"); + // http_log!("[HttpWriter::new] connection established successfully"); Ok(Self { url, method, @@ -285,12 +285,12 @@ impl HttpWriter { impl AsyncWrite for HttpWriter { fn poll_write(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - http_log!( - "[HttpWriter::poll_write] url: {}, method: {:?}, buf.len: {}", - self.url, - self.method, - buf.len() - ); + // http_log!( + // "[HttpWriter::poll_write] url: {}, method: {:?}, buf.len: {}", + // self.url, + // self.method, + // buf.len() + // ); if let Ok(e) = Pin::new(&mut self.err_rx).try_recv() { return Poll::Ready(Err(e)); } @@ -307,12 +307,19 @@ impl AsyncWrite for HttpWriter { } fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // let url = self.url.clone(); + // let method = self.method.clone(); + if !self.finish { - http_log!("[HttpWriter::poll_shutdown] url: {}, method: {:?}", self.url, self.method); + // http_log!("[HttpWriter::poll_shutdown] url: {}, method: {:?}", url, method); self.sender .try_send(None) .map_err(|e| Error::other(format!("HttpWriter shutdown error: {}", e)))?; - http_log!("[HttpWriter::poll_shutdown] sent shutdown signal to HTTP request"); + // http_log!( + // "[HttpWriter::poll_shutdown] sent shutdown signal to HTTP request, url: {}, method: {:?}", + // url, + // method + // ); self.finish = true; } @@ -320,13 +327,18 @@ impl AsyncWrite for HttpWriter { use futures::FutureExt; match Pin::new(&mut self.get_mut().handle).poll_unpin(_cx) { Poll::Ready(Ok(_)) => { - http_log!("[HttpWriter::poll_shutdown] HTTP request finished successfully"); + // http_log!( + // "[HttpWriter::poll_shutdown] HTTP request finished successfully, url: {}, method: {:?}", + // url, + // method + // ); } Poll::Ready(Err(e)) => { - http_log!("[HttpWriter::poll_shutdown] HTTP request failed: {e}"); + // http_log!("[HttpWriter::poll_shutdown] HTTP request failed: {e}, url: {}, method: {:?}", url, method); return Poll::Ready(Err(Error::other(format!("HTTP request failed: {}", e)))); } Poll::Pending => { + // http_log!("[HttpWriter::poll_shutdown] HTTP request pending, url: {}, method: {:?}", url, method); return Poll::Pending; } } diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index 181a401f..7fefab48 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -28,7 +28,7 @@ pub async fn create_bitrot_reader( checksum_algo: HashAlgorithm, ) -> disk::error::Result>>> { // Calculate the total length to read, including the checksum overhead - let length = offset.div_ceil(shard_size) * checksum_algo.size() + length; + let length = length.div_ceil(shard_size) * checksum_algo.size() + length; if let Some(data) = inline_data { // Use inline data diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 19a6f4c5..3ba992a1 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1399,8 +1399,6 @@ impl DiskAPI for LocalDisk { #[tracing::instrument(level = "debug", skip(self))] async fn create_file(&self, origvolume: &str, volume: &str, path: &str, _file_size: i64) -> Result { - // warn!("disk create_file: origvolume: {}, volume: {}, path: {}", origvolume, volume, path); - if !origvolume.is_empty() { let origvolume_dir = self.get_bucket_path(origvolume)?; if !skip_access_checks(origvolume) { @@ -1431,8 +1429,6 @@ impl DiskAPI for LocalDisk { #[tracing::instrument(level = "debug", skip(self))] // async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result { async fn append_file(&self, volume: &str, path: &str) -> Result { - warn!("disk append_file: volume: {}, path: {}", volume, path); - let volume_dir = self.get_bucket_path(volume)?; if !skip_access_checks(volume) { access(&volume_dir) @@ -1497,7 +1493,9 @@ impl DiskAPI for LocalDisk { return Err(DiskError::FileCorrupt); } - f.seek(SeekFrom::Start(offset as u64)).await?; + if offset > 0 { + f.seek(SeekFrom::Start(offset as u64)).await?; + } Ok(Box::new(f)) } diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 6783a573..e3a04195 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -608,7 +608,14 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(level = "debug", skip(self))] async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { - info!("read_file_stream {}/{}/{}", self.endpoint.to_string(), volume, path); + // warn!( + // "disk remote read_file_stream {}/{}/{} offset={} length={}", + // self.endpoint.to_string(), + // volume, + // path, + // offset, + // length + // ); let url = format!( "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", self.endpoint.grid_host(), @@ -641,7 +648,13 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(level = "debug", skip(self))] async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, file_size: i64) -> Result { - info!("create_file {}/{}/{}", self.endpoint.to_string(), volume, path); + // warn!( + // "disk remote create_file {}/{}/{} file_size={}", + // self.endpoint.to_string(), + // volume, + // path, + // file_size + // ); let url = format!( "{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}", diff --git a/ecstore/src/erasure_coding/bitrot.rs b/ecstore/src/erasure_coding/bitrot.rs index 63421d7b..68df1b13 100644 --- a/ecstore/src/erasure_coding/bitrot.rs +++ b/ecstore/src/erasure_coding/bitrot.rs @@ -1,7 +1,9 @@ use bytes::Bytes; use pin_project_lite::pin_project; -use rustfs_utils::{HashAlgorithm, read_full, write_all}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use rustfs_utils::HashAlgorithm; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tracing::error; +use uuid::Uuid; pin_project! { /// BitrotReader reads (hash+data) blocks from an async reader and verifies hash integrity. @@ -12,10 +14,11 @@ pin_project! { shard_size: usize, buf: Vec, hash_buf: Vec, - hash_read: usize, - data_buf: Vec, - data_read: usize, - hash_checked: bool, + // hash_read: usize, + // data_buf: Vec, + // data_read: usize, + // hash_checked: bool, + id: Uuid, } } @@ -32,10 +35,11 @@ where shard_size, buf: Vec::new(), hash_buf: vec![0u8; hash_size], - hash_read: 0, - data_buf: Vec::new(), - data_read: 0, - hash_checked: false, + // hash_read: 0, + // data_buf: Vec::new(), + // data_read: 0, + // hash_checked: false, + id: Uuid::new_v4(), } } @@ -51,30 +55,31 @@ where let hash_size = self.hash_algo.size(); // Read hash - let mut hash_buf = vec![0u8; hash_size]; + if hash_size > 0 { - self.inner.read_exact(&mut hash_buf).await?; + self.inner.read_exact(&mut self.hash_buf).await.map_err(|e| { + error!("bitrot reader read hash error: {}", e); + e + })?; } - let data_len = read_full(&mut self.inner, out).await?; - - // // Read data - // let mut data_len = 0; - // while data_len < out.len() { - // let n = self.inner.read(&mut out[data_len..]).await?; - // if n == 0 { - // break; - // } - // data_len += n; - // // Only read up to one shard_size block - // if data_len >= self.shard_size { - // break; - // } - // } + // Read data + let mut data_len = 0; + while data_len < out.len() { + let n = self.inner.read(&mut out[data_len..]).await.map_err(|e| { + error!("bitrot reader read data error: {}", e); + e + })?; + if n == 0 { + break; + } + data_len += n; + } if hash_size > 0 { let actual_hash = self.hash_algo.hash_encode(&out[..data_len]); - if actual_hash.as_ref() != hash_buf.as_slice() { + if actual_hash.as_ref() != self.hash_buf.as_slice() { + error!("bitrot reader hash mismatch, id={} data_len={}, out_len={}", self.id, data_len, out.len()); return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitrot hash mismatch")); } } @@ -145,22 +150,20 @@ where self.buf.extend_from_slice(buf); - // Write hash+data in one call - let mut n = write_all(&mut self.inner, &self.buf).await?; + self.inner.write_all(&self.buf).await?; - if n < hash_algo.size() { - return Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "short write: not enough bytes written", - )); - } + self.inner.flush().await?; - n -= hash_algo.size(); + let n = self.buf.len(); self.buf.clear(); Ok(n) } + + pub async fn shutdown(&mut self) -> std::io::Result<()> { + self.inner.shutdown().await + } } pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: HashAlgorithm) -> usize { @@ -330,6 +333,10 @@ impl BitrotWriterWrapper { self.bitrot_writer.write(buf).await } + pub async fn shutdown(&mut self) -> std::io::Result<()> { + self.bitrot_writer.shutdown().await + } + /// Extract the inline buffer data, consuming the wrapper pub fn into_inline_data(self) -> Option> { match self.writer_type { diff --git a/ecstore/src/erasure_coding/decode.rs b/ecstore/src/erasure_coding/decode.rs index f6e18b19..5c2d6e23 100644 --- a/ecstore/src/erasure_coding/decode.rs +++ b/ecstore/src/erasure_coding/decode.rs @@ -67,36 +67,34 @@ where } // 使用并发读取所有分片 + let mut read_futs = Vec::with_capacity(self.readers.len()); - let read_futs: Vec<_> = self - .readers - .iter_mut() - .enumerate() - .map(|(i, opt_reader)| { - if let Some(reader) = opt_reader.as_mut() { + for (i, opt_reader) in self.readers.iter_mut().enumerate() { + let future = if let Some(reader) = opt_reader.as_mut() { + Box::pin(async move { let mut buf = vec![0u8; shard_size]; - // 需要move i, buf - Some(async move { - match reader.read(&mut buf).await { - Ok(n) => { - buf.truncate(n); - (i, Ok(buf)) - } - Err(e) => (i, Err(Error::from(e))), + match reader.read(&mut buf).await { + Ok(n) => { + buf.truncate(n); + (i, Ok(buf)) } - }) - } else { - None - } - }) - .collect(); + Err(e) => (i, Err(Error::from(e))), + } + }) as std::pin::Pin, Error>)> + Send>> + } else { + // reader是None时返回FileNotFound错误 + Box::pin(async move { (i, Err(Error::FileNotFound)) }) + as std::pin::Pin, Error>)> + Send>> + }; + read_futs.push(future); + } - // 过滤掉None,join_all - let mut results = join_all(read_futs.into_iter().flatten()).await; + let results = join_all(read_futs).await; let mut shards: Vec>> = vec![None; self.readers.len()]; let mut errs = vec![None; self.readers.len()]; - for (i, shard) in results.drain(..) { + + for (i, shard) in results.into_iter() { match shard { Ok(data) => { if !data.is_empty() { diff --git a/ecstore/src/erasure_coding/encode.rs b/ecstore/src/erasure_coding/encode.rs index 899b8f57..dda42075 100644 --- a/ecstore/src/erasure_coding/encode.rs +++ b/ecstore/src/erasure_coding/encode.rs @@ -97,6 +97,13 @@ impl<'a> MultiWriter<'a> { .join(", ") ))) } + + pub async fn _shutdown(&mut self) -> std::io::Result<()> { + for writer in self.writers.iter_mut().flatten() { + writer.shutdown().await?; + } + Ok(()) + } } impl Erasure { @@ -147,7 +154,7 @@ impl Erasure { } let (reader, total) = task.await??; - + // writers.shutdown().await?; Ok((reader, total)) } } diff --git a/ecstore/src/erasure_coding/erasure.rs b/ecstore/src/erasure_coding/erasure.rs index c8045e99..2f673d68 100644 --- a/ecstore/src/erasure_coding/erasure.rs +++ b/ecstore/src/erasure_coding/erasure.rs @@ -555,6 +555,13 @@ mod tests { use super::*; + #[test] + fn test_shard_file_size_cases2() { + let erasure = Erasure::new(12, 4, 1024 * 1024); + + assert_eq!(erasure.shard_file_size(1572864), 131074); + } + #[test] fn test_shard_file_size_cases() { let erasure = Erasure::new(4, 2, 8); @@ -577,6 +584,8 @@ mod tests { assert_eq!(erasure.shard_file_size(1248739), 312186); // 1248739/8=156092, last=3, 3 div_ceil 4=1, 156092*2+1=312185 assert_eq!(erasure.shard_file_size(43), 12); // 43/8=5, last=3, 3 div_ceil 4=1, 5*2+1=11 + + assert_eq!(erasure.shard_file_size(1572864), 393216); // 43/8=5, last=3, 3 div_ceil 4=1, 5*2+1=11 } #[test] @@ -677,9 +686,14 @@ mod tests { #[test] fn test_shard_file_offset() { - let erasure = Erasure::new(4, 2, 8); - let offset = erasure.shard_file_offset(0, 16, 32); + let erasure = Erasure::new(8, 8, 1024 * 1024); + let offset = erasure.shard_file_offset(0, 86, 86); + println!("offset={}", offset); assert!(offset > 0); + + let total_length = erasure.shard_file_size(86); + println!("total_length={}", total_length); + assert!(total_length > 0); } #[tokio::test] diff --git a/ecstore/src/rebalance.rs b/ecstore/src/rebalance.rs index cc6a6ca9..e68f448f 100644 --- a/ecstore/src/rebalance.rs +++ b/ecstore/src/rebalance.rs @@ -243,7 +243,7 @@ impl ECStore { return Err(err); } - error!("rebalanceMeta: not found, rebalance not started"); + warn!("rebalanceMeta: not found, rebalance not started"); } } @@ -501,7 +501,7 @@ impl ECStore { if let Some(meta) = rebalance_meta.as_mut() { meta.cancel = Some(tx) } else { - error!("start_rebalance: rebalance_meta is None exit"); + warn!("start_rebalance: rebalance_meta is None exit"); return; } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 94277052..b35711ab 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1859,8 +1859,6 @@ impl SetDisks { let (last_part_index, _) = fi.to_part_offset(end_offset)?; - // let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); - let erasure = erasure_coding::Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); let mut total_readed = 0; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 44f0f47b..458cb597 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -91,6 +91,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive; use tokio_util::io::ReaderStream; use tokio_util::io::StreamReader; +use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; @@ -1787,7 +1788,7 @@ impl S3 for FS { let object_lock_configuration = match metadata_sys::get_object_lock_config(&bucket).await { Ok((cfg, _created)) => Some(cfg), Err(err) => { - warn!("get_object_lock_config err {:?}", err); + debug!("get_object_lock_config err {:?}", err); None } }; diff --git a/scripts/dev_rustfs.sh b/scripts/dev_rustfs.sh index 76f5ab61..fcf2fb6d 100644 --- a/scripts/dev_rustfs.sh +++ b/scripts/dev_rustfs.sh @@ -9,14 +9,14 @@ UNZIP_TARGET="./" SERVER_LIST=( - "root@172.23.215.2" # node1 - "root@172.23.215.4" # node2 - "root@172.23.215.7" # node3 - "root@172.23.215.3" # node4 - "root@172.23.215.8" # node5 - "root@172.23.215.5" # node6 - "root@172.23.215.9" # node7 - "root@172.23.215.6" # node8 + "root@node1" # node1 + "root@node2" # node2 + "root@node3" # node3 + "root@node4" # node4 + # "root@node5" # node5 + # "root@node6" # node6 + # "root@node7" # node7 + # "root@node8" # node8 ) REMOTE_TMP="~/rustfs"