From 7a7aee20491d8375ea9bbc930392a7f8092aaf37 Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 19 Feb 2025 17:41:18 +0800 Subject: [PATCH] use filereader as asyncread --- Cargo.lock | 167 +++++++++++- Cargo.toml | 2 +- ecstore/Cargo.toml | 1 + ecstore/src/bitrot.rs | 502 +++++++++++++++++++++---------------- ecstore/src/disk/local.rs | 68 ++++- ecstore/src/disk/mod.rs | 442 +++++++++++++++++++++----------- ecstore/src/disk/remote.rs | 29 ++- ecstore/src/erasure.rs | 2 + ecstore/src/set_disk.rs | 56 +++-- rustfs/src/admin/mod.rs | 3 + rustfs/src/admin/router.rs | 7 +- rustfs/src/admin/rpc.rs | 97 +++++++ rustfs/src/grpc.rs | 182 +++++++------- scripts/run.sh | 8 +- 14 files changed, 1067 insertions(+), 499 deletions(-) create mode 100644 rustfs/src/admin/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index 89db6bc2..b6bba26a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1377,7 +1377,7 @@ dependencies = [ "futures-util", "generational-box", "longest-increasing-subsequence", - "rustc-hash", + "rustc-hash 1.1.0", "rustversion", "serde", "slab", @@ -1443,7 +1443,7 @@ dependencies = [ "objc_id", "once_cell", "rfd 0.14.1", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde_json", "signal-hook", @@ -1604,7 +1604,7 @@ dependencies = [ "dioxus-html", "js-sys", "lazy-js-bundle", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "sledgehammer_bindgen", "sledgehammer_utils", @@ -1710,7 +1710,7 @@ dependencies = [ "generational-box", "once_cell", "parking_lot 0.12.3", - "rustc-hash", + "rustc-hash 1.1.0", "tracing", "warnings", ] @@ -1737,7 +1737,7 @@ dependencies = [ "generational-box", "js-sys", "lazy-js-bundle", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde-wasm-bindgen", "serde_json", @@ -1939,6 +1939,7 @@ dependencies = [ "reader", "reed-solomon-erasure", "regex", + "reqwest", "rmp", "rmp-serde", "s3s", @@ -1971,6 +1972,15 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7914353092ddf589ad78f25c5c1c21b7f80b0ff8621e7c814c3485b5306da9d" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endi" version = "1.1.0" @@ -2881,6 +2891,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + [[package]] name = "hyper-timeout" version = "0.5.2" @@ -4783,6 +4811,58 @@ dependencies = [ "serde", ] +[[package]] +name = "quinn" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.1.1", + "rustls", + "socket2", + "thiserror 2.0.11", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" +dependencies = [ + "bytes", + "getrandom 0.2.15", + "rand 0.8.5", + "ring", + "rustc-hash 2.1.1", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.11", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.38" @@ -5038,12 +5118,16 @@ checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64", "bytes", + "encoding_rs", + "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "http-body-util", "hyper", + "hyper-rustls", "hyper-util", "ipnet", "js-sys", @@ -5053,11 +5137,17 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", + "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", + "system-configuration", "tokio", + "tokio-rustls", "tokio-util", "tower 0.5.2", "tower-service", @@ -5066,6 +5156,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", "windows-registry", ] @@ -5199,6 +5290,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -5335,6 +5432,9 @@ name = "rustls-pki-types" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -5837,7 +5937,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "debdd4b83524961983cea3c55383b3910fd2f24fd13a188f5b091d2d504a61ae" dependencies = [ - "rustc-hash", + "rustc-hash 1.1.0", ] [[package]] @@ -6021,6 +6121,27 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "system-deps" version = "6.2.2" @@ -6248,6 +6369,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.43.0" @@ -7015,6 +7151,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webbrowser" version = "0.8.15" @@ -7076,6 +7222,15 @@ dependencies = [ "system-deps", ] +[[package]] +name = "webpki-roots" +version = "0.26.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2210b291f7ea53617fbafcc4939f10914214ec15aace5ba62293a668f322c5c9" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webview2-com" version = "0.33.0" diff --git a/Cargo.toml b/Cargo.toml index c034c32d..01e4a072 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ prost-types = "0.13.4" protobuf = "3.7" protos = { path = "./common/protos" } rand = "0.8.5" -reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream"] } +reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream","blocking"] } rfd = { version = "0.15.2", default-features = false, features = ["xdg-portal", "tokio"] } rmp = "0.8.14" rmp-serde = "1.3.0" diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index cacafa49..7ddc751d 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -66,6 +66,7 @@ pin-project-lite.workspace = true md-5.workspace = true madmin.workspace = true workers.workspace = true +reqwest = { workspace = true } [target.'cfg(not(windows))'.dependencies] diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index e26d039b..86a92ada 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -1,27 +1,22 @@ use crate::{ - disk::{error::DiskError, DiskAPI, DiskStore, FileReader, FileWriter, Reader}, + disk::{error::DiskError, BufferReader, Disk, DiskAPI, DiskStore, FileReader, FileWriter}, erasure::{ReadAt, Writer}, error::{Error, Result}, store_api::BitrotAlgorithm, }; - use blake2::Blake2b512; use blake2::Digest as _; use highway::{HighwayHash, HighwayHasher, Key}; use lazy_static::lazy_static; use sha2::{digest::core_api::BlockSizeUser, Digest, Sha256}; -use std::{ - any::Any, - collections::HashMap, - io::{Cursor, Read}, -}; -use tracing::{error, info}; - +use std::{any::Any, collections::HashMap, sync::Arc}; use tokio::{ + io::AsyncReadExt as _, spawn, sync::mpsc::{self, Sender}, task::JoinHandle, }; +use tracing::{error, info}; lazy_static! { static ref BITROT_ALGORITHMS: HashMap = { @@ -169,22 +164,22 @@ pub async fn new_bitrot_writer( pub type BitrotReader = Box; -#[allow(clippy::too_many_arguments)] -pub fn new_bitrot_reader( - disk: DiskStore, - data: &[u8], - bucket: &str, - file_path: &str, - till_offset: usize, - algo: BitrotAlgorithm, - sum: &[u8], - shard_size: usize, -) -> BitrotReader { - if algo == BitrotAlgorithm::HighwayHash256S { - return Box::new(StreamingBitrotReader::new(disk, data, bucket, file_path, algo, till_offset, shard_size)); - } - Box::new(WholeBitrotReader::new(disk, bucket, file_path, algo, till_offset, sum)) -} +// #[allow(clippy::too_many_arguments)] +// pub fn new_bitrot_reader( +// disk: DiskStore, +// data: &[u8], +// bucket: &str, +// file_path: &str, +// till_offset: usize, +// algo: BitrotAlgorithm, +// sum: &[u8], +// shard_size: usize, +// ) -> BitrotReader { +// if algo == BitrotAlgorithm::HighwayHash256S { +// return Box::new(StreamingBitrotReader::new(disk, data, bucket, file_path, algo, till_offset, shard_size)); +// } +// Box::new(WholeBitrotReader::new(disk, bucket, file_path, algo, till_offset, sum)) +// } pub async fn close_bitrot_writers(writers: &mut [Option]) -> Result<()> { for w in writers.iter_mut().flatten() { @@ -209,25 +204,25 @@ pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: BitrotAlgori size.div_ceil(shard_size) * algo.new_hasher().size() + size } -pub fn bitrot_verify( - r: &mut Cursor>, +pub async fn bitrot_verify( + r: FileReader, want_size: usize, part_size: usize, algo: BitrotAlgorithm, - want: Vec, + _want: Vec, mut shard_size: usize, ) -> Result<()> { - if algo != BitrotAlgorithm::HighwayHash256S { - let mut h = algo.new_hasher(); - h.update(r.get_ref()); - let hash = h.finalize(); - if hash != want { - info!("bitrot_verify except: {:?}, got: {:?}", want, hash); - return Err(Error::new(DiskError::FileCorrupt)); - } + // if algo != BitrotAlgorithm::HighwayHash256S { + // let mut h = algo.new_hasher(); + // h.update(r.get_ref()); + // let hash = h.finalize(); + // if hash != want { + // info!("bitrot_verify except: {:?}, got: {:?}", want, hash); + // return Err(Error::new(DiskError::FileCorrupt)); + // } - return Ok(()); - } + // return Ok(()); + // } let mut h = algo.new_hasher(); let mut hash_buf = vec![0; h.size()]; let mut left = want_size; @@ -240,9 +235,11 @@ pub fn bitrot_verify( return Err(Error::new(DiskError::FileCorrupt)); } + let mut r = r; + while left > 0 { h.reset(); - let n = r.read(&mut hash_buf)?; + let n = r.read_exact(&mut hash_buf).await?; left -= n; if left < shard_size { @@ -250,7 +247,7 @@ pub fn bitrot_verify( } let mut buf = vec![0; shard_size]; - let read = r.read(&mut buf)?; + let read = r.read_exact(&mut buf).await?; h.update(buf); left -= read; let hash = h.clone().finalize(); @@ -298,51 +295,54 @@ impl Writer for WholeBitrotWriter { } } -#[derive(Debug)] -pub struct WholeBitrotReader { - disk: DiskStore, - volume: String, - file_path: String, - _verifier: BitrotVerifier, - till_offset: usize, - buf: Option>, -} +// #[derive(Debug)] +// pub struct WholeBitrotReader { +// disk: DiskStore, +// volume: String, +// file_path: String, +// _verifier: BitrotVerifier, +// till_offset: usize, +// buf: Option>, +// } -impl WholeBitrotReader { - pub fn new(disk: DiskStore, volume: &str, file_path: &str, algo: BitrotAlgorithm, till_offset: usize, sum: &[u8]) -> Self { - Self { - disk, - volume: volume.to_string(), - file_path: file_path.to_string(), - _verifier: BitrotVerifier::new(algo, sum), - till_offset, - buf: None, - } - } -} +// impl WholeBitrotReader { +// pub fn new(disk: DiskStore, volume: &str, file_path: &str, algo: BitrotAlgorithm, till_offset: usize, sum: &[u8]) -> Self { +// Self { +// disk, +// volume: volume.to_string(), +// file_path: file_path.to_string(), +// _verifier: BitrotVerifier::new(algo, sum), +// till_offset, +// buf: None, +// } +// } +// } -#[async_trait::async_trait] -impl ReadAt for WholeBitrotReader { - async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { - if self.buf.is_none() { - let buf_len = self.till_offset - offset; - let mut file = self.disk.read_file(&self.volume, &self.file_path).await?; - let mut buf = vec![0u8; buf_len]; - file.read_at(offset, &mut buf).await?; - self.buf = Some(buf); - } +// #[async_trait::async_trait] +// impl ReadAt for WholeBitrotReader { +// async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { +// if self.buf.is_none() { +// let buf_len = self.till_offset - offset; +// let mut file = self +// .disk +// .read_file_stream(&self.volume, &self.file_path, offset, length) +// .await?; +// let mut buf = vec![0u8; buf_len]; +// file.read_at(offset, &mut buf).await?; +// self.buf = Some(buf); +// } - if let Some(buf) = &mut self.buf { - if buf.len() < length { - return Err(Error::new(DiskError::LessData)); - } +// if let Some(buf) = &mut self.buf { +// if buf.len() < length { +// return Err(Error::new(DiskError::LessData)); +// } - return Ok((buf.drain(0..length).collect::>(), length)); - } +// return Ok((buf.drain(0..length).collect::>(), length)); +// } - Err(Error::new(DiskError::LessData)) - } -} +// Err(Error::new(DiskError::LessData)) +// } +// } struct StreamingBitrotWriter { hasher: Hasher, @@ -413,80 +413,80 @@ impl Writer for StreamingBitrotWriter { } } -#[derive(Debug)] -struct StreamingBitrotReader { - disk: DiskStore, - _data: Vec, - volume: String, - file_path: String, - till_offset: usize, - curr_offset: usize, - hasher: Hasher, - shard_size: usize, - buf: Vec, - hash_bytes: Vec, -} +// #[derive(Debug)] +// struct StreamingBitrotReader { +// disk: DiskStore, +// _data: Vec, +// volume: String, +// file_path: String, +// till_offset: usize, +// curr_offset: usize, +// hasher: Hasher, +// shard_size: usize, +// buf: Vec, +// hash_bytes: Vec, +// } -impl StreamingBitrotReader { - pub fn new( - disk: DiskStore, - data: &[u8], - volume: &str, - file_path: &str, - algo: BitrotAlgorithm, - till_offset: usize, - shard_size: usize, - ) -> Self { - let hasher = algo.new_hasher(); - Self { - disk, - _data: data.to_vec(), - volume: volume.to_string(), - file_path: file_path.to_string(), - till_offset: till_offset.div_ceil(shard_size) * hasher.size() + till_offset, - curr_offset: 0, - hash_bytes: Vec::with_capacity(hasher.size()), - hasher, - shard_size, - buf: Vec::new(), - } - } -} +// impl StreamingBitrotReader { +// pub fn new( +// disk: DiskStore, +// data: &[u8], +// volume: &str, +// file_path: &str, +// algo: BitrotAlgorithm, +// till_offset: usize, +// shard_size: usize, +// ) -> Self { +// let hasher = algo.new_hasher(); +// Self { +// disk, +// _data: data.to_vec(), +// volume: volume.to_string(), +// file_path: file_path.to_string(), +// till_offset: till_offset.div_ceil(shard_size) * hasher.size() + till_offset, +// curr_offset: 0, +// hash_bytes: Vec::with_capacity(hasher.size()), +// hasher, +// shard_size, +// buf: Vec::new(), +// } +// } +// } -#[async_trait::async_trait] -impl ReadAt for StreamingBitrotReader { - async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { - if offset % self.shard_size != 0 { - return Err(Error::new(DiskError::Unexpected)); - } - if self.buf.is_empty() { - self.curr_offset = offset; - let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset; - let buf_len = self.till_offset - stream_offset; - let mut file = self.disk.read_file(&self.volume, &self.file_path).await?; - let mut buf = vec![0u8; buf_len]; - file.read_at(stream_offset, &mut buf).await?; - self.buf = buf; - } - if offset != self.curr_offset { - return Err(Error::new(DiskError::Unexpected)); - } +// #[async_trait::async_trait] +// impl ReadAt for StreamingBitrotReader { +// async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { +// if offset % self.shard_size != 0 { +// return Err(Error::new(DiskError::Unexpected)); +// } +// if self.buf.is_empty() { +// self.curr_offset = offset; +// let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset; +// let buf_len = self.till_offset - stream_offset; +// let mut file = self.disk.read_file(&self.volume, &self.file_path).await?; +// let mut buf = vec![0u8; buf_len]; +// file.read_at(stream_offset, &mut buf).await?; +// self.buf = buf; +// } +// if offset != self.curr_offset { +// return Err(Error::new(DiskError::Unexpected)); +// } - self.hash_bytes = self.buf.drain(0..self.hash_bytes.capacity()).collect(); - let buf = self.buf.drain(0..length).collect::>(); - self.hasher.reset(); - self.hasher.update(&buf); - let actual = self.hasher.clone().finalize(); - if actual != self.hash_bytes { - return Err(Error::new(DiskError::FileCorrupt)); - } +// self.hash_bytes = self.buf.drain(0..self.hash_bytes.capacity()).collect(); +// let buf = self.buf.drain(0..length).collect::>(); +// self.hasher.reset(); +// self.hasher.update(&buf); +// let actual = self.hasher.clone().finalize(); +// if actual != self.hash_bytes { +// return Err(Error::new(DiskError::FileCorrupt)); +// } - let readed_len = buf.len(); - self.curr_offset += readed_len; +// let readed_len = buf.len(); +// self.curr_offset += readed_len; - Ok((buf, readed_len)) - } -} +// Ok((buf, readed_len)) +// } +// } pub struct BitrotFileWriter { pub inner: FileWriter, @@ -535,8 +535,12 @@ pub fn new_bitrot_filewriter(inner: FileWriter, algo: BitrotAlgorithm, shard_siz #[derive(Debug)] struct BitrotFileReader { - pub inner: FileReader, - // till_offset: usize, + disk: Arc, + data: Option>, + volume: String, + file_path: String, + reader: Option, + till_offset: usize, curr_offset: usize, hasher: Hasher, shard_size: usize, @@ -545,28 +549,41 @@ struct BitrotFileReader { read_buf: Vec, } -// fn ceil(a: usize, b: usize) -> usize { -// (a + b - 1) / b -// } +fn ceil(a: usize, b: usize) -> usize { + a.div_ceil(b) +} impl BitrotFileReader { - pub fn new(inner: FileReader, algo: BitrotAlgorithm, _till_offset: usize, shard_size: usize) -> Self { + pub fn new( + disk: Arc, + data: Option>, + volume: String, + file_path: String, + algo: BitrotAlgorithm, + till_offset: usize, + shard_size: usize, + ) -> Self { let hasher = algo.new_hasher(); Self { - inner, - // till_offset: ceil(till_offset, shard_size) * hasher.size() + till_offset, + disk, + data, + volume, + file_path, + till_offset: ceil(till_offset, shard_size) * hasher.size() + till_offset, curr_offset: 0, hash_bytes: vec![0u8; hasher.size()], hasher, shard_size, // buf: Vec::new(), read_buf: Vec::new(), + reader: None, } } } #[async_trait::async_trait] impl ReadAt for BitrotFileReader { + // 读取数据 async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { if offset % self.shard_size != 0 { error!( @@ -578,53 +595,112 @@ impl ReadAt for BitrotFileReader { return Err(Error::new(DiskError::Unexpected)); } - let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset; - let buf_len = self.hasher.size() + length; + if self.reader.is_none() { + self.curr_offset = offset; + let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset; + + if let Some(data) = self.data.clone() { + self.reader = Some(FileReader::Buffer(BufferReader::new( + data, + stream_offset, + self.till_offset - stream_offset, + ))); + } else { + self.reader = Some( + self.disk + .read_file_stream(&self.volume, &self.file_path, stream_offset, self.till_offset - stream_offset) + .await?, + ); + } + } + + if offset != self.curr_offset { + error!("BitrotFileReader read_at offset != self.curr_offset, {} != {}", offset, self.curr_offset); + return Err(Error::new(DiskError::Unexpected)); + } + + let reader = self.reader.as_mut().unwrap(); + // let mut hash_buf = self.hash_bytes; + + self.hash_bytes.clear(); + self.hash_bytes.resize(self.hasher.size(), 0u8); + + reader.read_exact(&mut self.hash_bytes).await?; self.read_buf.clear(); - self.read_buf.resize(buf_len, 0u8); + self.read_buf.resize(length, 0u8); - self.inner.read_at(stream_offset, &mut self.read_buf).await?; - - let hash_bytes = &self.read_buf.as_slice()[0..self.hash_bytes.capacity()]; - - self.hash_bytes.clone_from_slice(hash_bytes); - let buf = self.read_buf.as_slice()[self.hash_bytes.capacity()..self.hash_bytes.capacity() + length].to_vec(); + reader.read_exact(&mut self.read_buf).await?; self.hasher.reset(); - self.hasher.update(&buf); + self.hasher.update(&self.read_buf); let actual = self.hasher.clone().finalize(); - if actual != self.hash_bytes { + error!( + "BitrotFileReader read_at actual != self.hash_bytes, {:?} != {:?}", + actual, self.hash_bytes + ); return Err(Error::new(DiskError::FileCorrupt)); } - let readed_len = buf.len(); + let readed_len = self.read_buf.len(); self.curr_offset += readed_len; - Ok((buf, readed_len)) + Ok((self.read_buf.clone(), readed_len)) + + // let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset; + // let buf_len = self.hasher.size() + length; + + // self.read_buf.clear(); + // self.read_buf.resize(buf_len, 0u8); + + // self.inner.read_at(stream_offset, &mut self.read_buf).await?; + + // let hash_bytes = &self.read_buf.as_slice()[0..self.hash_bytes.capacity()]; + + // self.hash_bytes.clone_from_slice(hash_bytes); + // let buf = self.read_buf.as_slice()[self.hash_bytes.capacity()..self.hash_bytes.capacity() + length].to_vec(); + + // self.hasher.reset(); + // self.hasher.update(&buf); + // let actual = self.hasher.clone().finalize(); + + // if actual != self.hash_bytes { + // return Err(Error::new(DiskError::FileCorrupt)); + // } + + // let readed_len = buf.len(); + // self.curr_offset += readed_len; + + // Ok((buf, readed_len)) } } -pub fn new_bitrot_filereader(inner: FileReader, till_offset: usize, algo: BitrotAlgorithm, shard_size: usize) -> BitrotReader { - Box::new(BitrotFileReader::new(inner, algo, till_offset, shard_size)) +pub fn new_bitrot_filereader( + disk: Arc, + data: Option>, + volume: String, + file_path: String, + till_offset: usize, + algo: BitrotAlgorithm, + shard_size: usize, +) -> BitrotReader { + Box::new(BitrotFileReader::new(disk, data, volume, file_path, algo, till_offset, shard_size)) } #[cfg(test)] mod test { - use std::{collections::HashMap, fs}; + use std::collections::HashMap; use hex_simd::decode_to_vec; - use tempfile::TempDir; use crate::{ - bitrot::{new_bitrot_writer, BITROT_ALGORITHMS}, - disk::{endpoint::Endpoint, error::DiskError, new_disk, DiskAPI, DiskOption}, + disk::error::DiskError, error::{Error, Result}, store_api::BitrotAlgorithm, }; - use super::{bitrot_writer_sum, new_bitrot_reader}; + // use super::{bitrot_writer_sum, new_bitrot_reader}; #[test] fn bitrot_self_test() -> Result<()> { @@ -674,47 +750,47 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_all_bitrot_algorithms() -> Result<()> { - for algo in BITROT_ALGORITHMS.keys() { - test_bitrot_reader_writer_algo(algo.clone()).await?; - } + // #[tokio::test] + // async fn test_all_bitrot_algorithms() -> Result<()> { + // for algo in BITROT_ALGORITHMS.keys() { + // test_bitrot_reader_writer_algo(algo.clone()).await?; + // } - Ok(()) - } + // Ok(()) + // } - async fn test_bitrot_reader_writer_algo(algo: BitrotAlgorithm) -> Result<()> { - let temp_dir = TempDir::new().unwrap().path().to_string_lossy().to_string(); - fs::create_dir_all(&temp_dir)?; - let volume = "testvol"; - let file_path = "testfile"; + // async fn test_bitrot_reader_writer_algo(algo: BitrotAlgorithm) -> Result<()> { + // let temp_dir = TempDir::new().unwrap().path().to_string_lossy().to_string(); + // fs::create_dir_all(&temp_dir)?; + // let volume = "testvol"; + // let file_path = "testfile"; - let ep = Endpoint::try_from(temp_dir.as_str())?; - let opt = DiskOption::default(); - let disk = new_disk(&ep, &opt).await?; - disk.make_volume(volume).await?; - let mut writer = new_bitrot_writer(disk.clone(), "", volume, file_path, 35, algo.clone(), 10).await?; + // let ep = Endpoint::try_from(temp_dir.as_str())?; + // let opt = DiskOption::default(); + // let disk = new_disk(&ep, &opt).await?; + // disk.make_volume(volume).await?; + // let mut writer = new_bitrot_writer(disk.clone(), "", volume, file_path, 35, algo.clone(), 10).await?; - writer.write(b"aaaaaaaaaa").await?; - writer.write(b"aaaaaaaaaa").await?; - writer.write(b"aaaaaaaaaa").await?; - writer.write(b"aaaaa").await?; + // writer.write(b"aaaaaaaaaa").await?; + // writer.write(b"aaaaaaaaaa").await?; + // writer.write(b"aaaaaaaaaa").await?; + // writer.write(b"aaaaa").await?; - let sum = bitrot_writer_sum(&writer); - writer.close().await?; + // let sum = bitrot_writer_sum(&writer); + // writer.close().await?; - let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10); - let read_len = 10; - let mut result: Vec; - (result, _) = reader.read_at(0, read_len).await?; - assert_eq!(result, b"aaaaaaaaaa"); - (result, _) = reader.read_at(10, read_len).await?; - assert_eq!(result, b"aaaaaaaaaa"); - (result, _) = reader.read_at(20, read_len).await?; - assert_eq!(result, b"aaaaaaaaaa"); - (result, _) = reader.read_at(30, read_len / 2).await?; - assert_eq!(result, b"aaaaa"); + // let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10); + // let read_len = 10; + // let mut result: Vec; + // (result, _) = reader.read_at(0, read_len).await?; + // assert_eq!(result, b"aaaaaaaaaa"); + // (result, _) = reader.read_at(10, read_len).await?; + // assert_eq!(result, b"aaaaaaaaaa"); + // (result, _) = reader.read_at(20, read_len).await?; + // assert_eq!(result, b"aaaaaaaaaa"); + // (result, _) = reader.read_at(30, read_len / 2).await?; + // assert_eq!(result, b"aaaaa"); - Ok(()) - } + // Ok(()) + // } } diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 7f8bd1cc..a1f1e0c7 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -49,7 +49,8 @@ use common::defer; use path_absolutize::Absolutize; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; -use std::io::Cursor; +use std::io::SeekFrom; +use std::os::unix::fs::MetadataExt; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -59,7 +60,7 @@ use std::{ }; use time::OffsetDateTime; use tokio::fs::{self, File}; -use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, ErrorKind}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ErrorKind}; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use tracing::{error, info, warn}; @@ -735,13 +736,24 @@ impl LocalDisk { sum: &[u8], shard_size: usize, ) -> Result<()> { - let mut file = utils::fs::open_file(part_path, O_CREATE | O_WRONLY) + let file = utils::fs::open_file(part_path, O_CREATE | O_WRONLY) .await .map_err(os_err_to_file_err)?; - let mut data = Vec::new(); - let n = file.read_to_end(&mut data).await?; - bitrot_verify(&mut Cursor::new(data), n, part_size, algo, sum.to_vec(), shard_size) + // let mut data = Vec::new(); + // let n = file.read_to_end(&mut data).await?; + + let meta = file.metadata().await?; + + bitrot_verify( + FileReader::Local(LocalFileReader::new(file)), + meta.size() as usize, + part_size, + algo, + sum.to_vec(), + shard_size, + ) + .await } async fn scan_dir( @@ -1533,6 +1545,50 @@ impl DiskAPI for LocalDisk { Ok(FileReader::Local(LocalFileReader::new(f))) } + async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { + let volume_dir = self.get_bucket_path(volume)?; + if !skip_access_checks(volume) { + if let Err(e) = utils::fs::access(&volume_dir).await { + return Err(convert_access_error(e, DiskError::VolumeAccessDenied)); + } + } + + let file_path = volume_dir.join(Path::new(&path)); + check_path_length(file_path.to_string_lossy().to_string().as_str())?; + + let mut f = self.open_file(file_path, O_RDONLY, volume_dir).await.map_err(|err| { + if let Some(e) = err.to_io_err() { + if os_is_not_exist(&e) { + Error::new(DiskError::FileNotFound) + } else if os_is_permission(&e) || is_sys_err_not_dir(&e) { + Error::new(DiskError::FileAccessDenied) + } else if is_sys_err_io(&e) { + Error::new(DiskError::FaultyDisk) + } else if is_sys_err_too_many_files(&e) { + Error::new(DiskError::TooManyOpenFiles) + } else { + Error::new(e) + } + } else { + err + } + })?; + + let meta = f.metadata().await?; + if meta.len() < (offset + length) as u64 { + error!( + "read_file_stream: file size is less than offset + length {} + {} = {}", + offset, + length, + meta.len() + ); + return Err(Error::new(DiskError::FileCorrupt)); + } + + f.seek(SeekFrom::Start(offset as u64)).await?; + + Ok(FileReader::Local(LocalFileReader::new(f))) + } #[tracing::instrument(level = "debug", skip(self))] async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result> { if !origvolume.is_empty() { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 0b7aba4c..b1d341d7 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -28,28 +28,24 @@ use crate::{ store_api::{FileInfo, ObjectInfo, RawFileInfo}, utils::path::SLASH_SEPARATOR, }; + use endpoint::Endpoint; use error::DiskError; use futures::StreamExt; use local::LocalDisk; use madmin::info_commands::DiskMetrics; -use protos::proto_gen::node_service::{ - node_service_client::NodeServiceClient, ReadAtRequest, ReadAtResponse, WriteRequest, WriteResponse, -}; +use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, WriteRequest, WriteResponse}; use remote::RemoteDisk; use serde::{Deserialize, Serialize}; -use std::{ - any::Any, - cmp::Ordering, - fmt::Debug, - io::{Cursor, SeekFrom}, - path::PathBuf, - sync::Arc, -}; +use std::io::Read as _; +use std::pin::Pin; +use std::task::Poll; +use std::{any::Any, cmp::Ordering, fmt::Debug, io::Cursor, path::PathBuf, sync::Arc}; use time::OffsetDateTime; +use tokio::io::AsyncRead; use tokio::{ fs::File, - io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}, + io::{AsyncWrite, AsyncWriteExt}, sync::mpsc::{self, Sender}, }; use tokio_stream::wrappers::ReceiverStream; @@ -206,6 +202,13 @@ impl DiskAPI for Disk { } } + async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { + match self { + Disk::Local(local_disk) => local_disk.read_file_stream(volume, path, offset, length).await, + Disk::Remote(remote_disk) => remote_disk.read_file_stream(volume, path, offset, length).await, + } + } + async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result> { match self { Disk::Local(local_disk) => local_disk.list_dir(_origvolume, volume, _dir_path, _count).await, @@ -451,6 +454,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { // 读目录下的所有文件、目录 async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result>; async fn read_file(&self, volume: &str, path: &str) -> Result; + async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result; async fn append_file(&self, volume: &str, path: &str) -> Result; async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize) -> Result; // ReadFileStream @@ -1411,186 +1415,340 @@ impl Writer for RemoteFileWriter { } } -#[async_trait::async_trait] -pub trait Reader { - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result; - async fn seek(&mut self, offset: usize) -> Result<()>; - async fn read_exact(&mut self, buf: &mut [u8]) -> Result; -} +// #[async_trait::async_trait] +// pub trait Reader { +// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result; +// // async fn seek(&mut self, offset: usize) -> Result<()>; +// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result; +// } #[derive(Debug)] pub enum FileReader { Local(LocalFileReader), - Remote(RemoteFileReader), + // Remote(RemoteFileReader), Buffer(BufferReader), + Http(HttpFileReader), } -#[async_trait::async_trait] -impl Reader for FileReader { - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { - match self { - Self::Local(reader) => reader.read_at(offset, buf).await, - Self::Remote(reader) => reader.read_at(offset, buf).await, - Self::Buffer(reader) => reader.read_at(offset, buf).await, - } - } - async fn seek(&mut self, offset: usize) -> Result<()> { - match self { - Self::Local(reader) => reader.seek(offset).await, - Self::Remote(reader) => reader.seek(offset).await, - Self::Buffer(reader) => reader.seek(offset).await, - } - } - async fn read_exact(&mut self, buf: &mut [u8]) -> Result { - match self { - Self::Local(reader) => reader.read_exact(buf).await, - Self::Remote(reader) => reader.read_exact(buf).await, - Self::Buffer(reader) => reader.read_exact(buf).await, +impl AsyncRead for FileReader { + #[tracing::instrument(level = "debug", skip(self, buf))] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match &mut *self { + Self::Local(reader) => Pin::new(&mut reader.inner).poll_read(cx, buf), + Self::Buffer(reader) => Pin::new(&mut reader.inner).poll_read(cx, buf), + Self::Http(reader) => Pin::new(reader).poll_read(cx, buf), } } } +// #[async_trait::async_trait] +// impl Reader for FileReader { +// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { +// match self { +// Self::Local(reader) => reader.read_at(offset, buf).await, +// Self::Remote(reader) => reader.read_at(offset, buf).await, +// Self::Buffer(reader) => reader.read_at(offset, buf).await, +// Self::Http(reader) => reader.read_at(offset, buf).await, +// } +// } +// // async fn seek(&mut self, offset: usize) -> Result<()> { +// // match self { +// // Self::Local(reader) => reader.seek(offset).await, +// // Self::Remote(reader) => reader.seek(offset).await, +// // Self::Buffer(reader) => reader.seek(offset).await, +// // } +// // } +// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result { +// // match self { +// // Self::Local(reader) => reader.read_exact(buf).await, +// // Self::Remote(reader) => reader.read_exact(buf).await, +// // Self::Buffer(reader) => reader.read_exact(buf).await, +// // } +// // } +// } + #[derive(Debug)] pub struct BufferReader { pub inner: Cursor>, - pos: usize, + remaining: usize, } impl BufferReader { - pub fn new(inner: Vec) -> Self { + pub fn new(inner: Vec, offset: usize, read_length: usize) -> Self { + let mut cur = Cursor::new(inner); + cur.set_position(offset as u64); Self { - inner: Cursor::new(inner), - pos: 0, + inner: cur, + remaining: offset + read_length, } } } -#[async_trait::async_trait] -impl Reader for BufferReader { +impl AsyncRead for BufferReader { #[tracing::instrument(level = "debug", skip(self, buf))] - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { - self.seek(offset).await?; - self.read_exact(buf).await - } - #[tracing::instrument(level = "debug", skip(self))] - async fn seek(&mut self, offset: usize) -> Result<()> { - if self.pos != offset { - self.inner.set_position(offset as u64); + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match Pin::new(&mut self.inner).poll_read(cx, buf) { + Poll::Ready(Ok(_)) => { + if self.inner.position() as usize >= self.remaining { + self.remaining -= buf.filled().len(); + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, } - - Ok(()) - } - #[tracing::instrument(level = "debug", skip(self))] - async fn read_exact(&mut self, buf: &mut [u8]) -> Result { - let bytes_read = self.inner.read_exact(buf).await?; - self.pos += buf.len(); - Ok(bytes_read) } } +// #[async_trait::async_trait] +// impl Reader for BufferReader { +// #[tracing::instrument(level = "debug", skip(self, buf))] +// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { +// if self.pos != offset { +// self.inner.set_position(offset as u64); +// } +// self.inner.read_exact(buf).await?; +// self.pos += buf.len(); +// Ok(buf.len()) +// } +// // #[tracing::instrument(level = "debug", skip(self))] +// // async fn seek(&mut self, offset: usize) -> Result<()> { +// // if self.pos != offset { +// // self.inner.set_position(offset as u64); +// // } + +// // Ok(()) +// // } +// // #[tracing::instrument(level = "debug", skip(self))] +// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result { +// // let bytes_read = self.inner.read_exact(buf).await?; +// // self.pos += buf.len(); +// // Ok(bytes_read) +// // } +// } + #[derive(Debug)] pub struct LocalFileReader { pub inner: File, - pos: usize, + // pos: usize, } impl LocalFileReader { pub fn new(inner: File) -> Self { - Self { inner, pos: 0 } + Self { inner } } } -#[async_trait::async_trait] -impl Reader for LocalFileReader { - #[tracing::instrument(level = "debug", skip(self, buf))] - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { - self.seek(offset).await?; - self.read_exact(buf).await - } +// #[async_trait::async_trait] +// impl Reader for LocalFileReader { +// #[tracing::instrument(level = "debug", skip(self, buf))] +// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { +// if self.pos != offset { +// self.inner.seek(SeekFrom::Start(offset as u64)).await?; +// self.pos = offset; +// } +// self.inner.read_exact(buf).await?; +// self.pos += buf.len(); +// Ok(buf.len()) +// } - #[tracing::instrument(level = "debug", skip(self))] - async fn seek(&mut self, offset: usize) -> Result<()> { - if self.pos != offset { - self.inner.seek(SeekFrom::Start(offset as u64)).await?; - self.pos = offset; - } +// // #[tracing::instrument(level = "debug", skip(self))] +// // async fn seek(&mut self, offset: usize) -> Result<()> { +// // if self.pos != offset { +// // self.inner.seek(SeekFrom::Start(offset as u64)).await?; +// // self.pos = offset; +// // } - Ok(()) - } +// // Ok(()) +// // } +// // #[tracing::instrument(level = "debug", skip(self, buf))] +// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result { +// // let bytes_read = self.inner.read_exact(buf).await?; +// // self.pos += buf.len(); +// // Ok(bytes_read) +// // } +// } + +impl AsyncRead for LocalFileReader { #[tracing::instrument(level = "debug", skip(self, buf))] - async fn read_exact(&mut self, buf: &mut [u8]) -> Result { - let bytes_read = self.inner.read_exact(buf).await?; - self.pos += buf.len(); - Ok(bytes_read) + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) } } +// #[derive(Debug)] +// pub struct RemoteFileReader { +// pub endpoint: Endpoint, +// pub volume: String, +// pub path: String, +// tx: Sender, +// resp_stream: Streaming, +// } + +// impl RemoteFileReader { +// pub async fn new(endpoint: Endpoint, volume: String, path: String, mut client: NodeClient) -> Result { +// let (tx, rx) = mpsc::channel(128); +// let in_stream = ReceiverStream::new(rx); + +// let response = client.read_at(in_stream).await.unwrap(); + +// let resp_stream = response.into_inner(); + +// Ok(Self { +// endpoint, +// volume, +// path, +// tx, +// resp_stream, +// }) +// } +// } + +// #[async_trait::async_trait] +// impl Reader for RemoteFileReader { +// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { +// let request = ReadAtRequest { +// disk: self.endpoint.to_string(), +// volume: self.volume.to_string(), +// path: self.path.to_string(), +// offset: offset.try_into().unwrap(), +// // length: length.try_into().unwrap(), +// length: buf.len().try_into().unwrap(), +// }; +// self.tx.send(request).await?; + +// if let Some(resp) = self.resp_stream.next().await { +// let resp = resp?; +// if resp.success { +// info!("read at stream success"); + +// buf.copy_from_slice(&resp.data); + +// Ok(resp.read_size.try_into().unwrap()) +// } else { +// return if let Some(err) = &resp.error { +// Err(proto_err_to_err(err)) +// } else { +// Err(Error::from_string("")) +// }; +// } +// } else { +// let error_info = "can not get response"; +// info!("read at stream failed: {}", error_info); +// Err(Error::from_string(error_info)) +// } +// } +// // async fn seek(&mut self, _offset: usize) -> Result<()> { +// // unimplemented!() +// // } +// // async fn read_exact(&mut self, _buf: &mut [u8]) -> Result { +// // unimplemented!() +// // } +// } + +// impl AsyncRead for RemoteFileReader { +// #[tracing::instrument(level = "debug", skip(self, buf))] +// fn poll_read( +// mut self: Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// buf: &mut tokio::io::ReadBuf<'_>, +// ) -> std::task::Poll> { +// unimplemented!("poll_read") +// } +// } + #[derive(Debug)] -pub struct RemoteFileReader { - pub endpoint: Endpoint, - pub volume: String, - pub path: String, - tx: Sender, - resp_stream: Streaming, +pub struct HttpFileReader { + // client: reqwest::Client, + // url: String, + // disk: String, + // volume: String, + // path: String, + // offset: usize, + // length: usize, + inner: reqwest::blocking::Response, + // buf: Vec, + pos: usize, } -impl RemoteFileReader { - pub async fn new(endpoint: Endpoint, volume: String, path: String, mut client: NodeClient) -> Result { - let (tx, rx) = mpsc::channel(128); - let in_stream = ReceiverStream::new(rx); - - let response = client.read_at(in_stream).await.unwrap(); - - let resp_stream = response.into_inner(); - +impl HttpFileReader { + pub async fn new(url: &str, disk: &str, volume: &str, path: &str, offset: usize, length: usize) -> Result { + let client = reqwest::blocking::Client::new(); + let resp = client + .get(format!( + "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", + url, disk, volume, path, offset, length + )) + .send()?; Ok(Self { - endpoint, - volume, - path, - tx, - resp_stream, + // client: reqwest::Client::new(), + // url: url.to_string(), + // disk: disk.to_string(), + // volume: volume.to_string(), + // path: path.to_string(), + // offset, + // length, + inner: resp, + // buf: Vec::new(), + pos: 0, }) } + + // pub async fn get_response(&self) -> Result<&Response, std::io::Error> { + // if let Some(resp) = self.inner.get() { + // return Ok(resp); + // } else { + // let client = reqwest::Client::new(); + // let resp = client + // .get(&format!( + // "{}/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", + // self.url, self.disk, self.volume, self.path, self.offset, self.length + // )) + // .send() + // .await + // .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + // self.inner.set(resp); + // Ok(self.inner.get().unwrap()) + // } + // } } -#[async_trait::async_trait] -impl Reader for RemoteFileReader { - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { - let request = ReadAtRequest { - disk: self.endpoint.to_string(), - volume: self.volume.to_string(), - path: self.path.to_string(), - offset: offset.try_into().unwrap(), - // length: length.try_into().unwrap(), - length: buf.len().try_into().unwrap(), - }; - self.tx.send(request).await?; - - if let Some(resp) = self.resp_stream.next().await { - let resp = resp?; - if resp.success { - info!("read at stream success"); - - buf.copy_from_slice(&resp.data); - - Ok(resp.read_size.try_into().unwrap()) - } else { - return if let Some(err) = &resp.error { - Err(proto_err_to_err(err)) - } else { - Err(Error::from_string("")) - }; - } - } else { - let error_info = "can not get response"; - info!("read at stream failed: {}", error_info); - Err(Error::from_string(error_info)) - } - } - async fn seek(&mut self, _offset: usize) -> Result<()> { - unimplemented!() - } - async fn read_exact(&mut self, _buf: &mut [u8]) -> Result { - unimplemented!() +impl AsyncRead for HttpFileReader { + #[tracing::instrument(level = "debug", skip(self, buf))] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let buf = buf.initialize_unfilled(); + self.inner.read_exact(buf)?; + self.pos += buf.len(); + Poll::Ready(Ok(())) } } + +// impl Reader for HttpFileReader { +// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { +// if self.pos != offset { +// self.inner.seek(SeekFrom::Start(offset as u64))?; +// self.pos = offset; +// } +// let bytes_read = self.inner.read(buf)?; +// self.pos += bytes_read; +// Ok(bytes_read) +// } +// } diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 68a5ab31..2fd2ca3f 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -23,10 +23,9 @@ use uuid::Uuid; use super::{ endpoint::Endpoint, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, - FileInfoVersions, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, - RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, + FileInfoVersions, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileWriter, RenameDataResp, + UpdateMetadataOpts, VolumeInfo, WalkDirOptions, }; -use crate::utils::proto_err_to_err; use crate::{ disk::error::DiskError, error::{Error, Result}, @@ -37,6 +36,7 @@ use crate::{ }, store_api::{FileInfo, RawFileInfo}, }; +use crate::{disk::HttpFileReader, utils::proto_err_to_err}; use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter}; use protos::proto_gen::node_service::RenamePartRequst; @@ -346,14 +346,21 @@ impl DiskAPI for RemoteDisk { async fn read_file(&self, volume: &str, path: &str) -> Result { info!("read_file"); - Ok(FileReader::Remote( - RemoteFileReader::new( - self.endpoint.clone(), - volume.to_string(), - path.to_string(), - node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?, + Ok(FileReader::Http( + HttpFileReader::new(self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), volume, path, 0, 0) + .await?, + )) + } + + async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { + Ok(FileReader::Http( + HttpFileReader::new( + self.endpoint.grid_host().as_str(), + self.endpoint.to_string().as_str(), + volume, + path, + offset, + length, ) .await?, )) diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 4121ddf6..f068b109 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -419,6 +419,7 @@ impl Erasure { // num_shards * self.shard_size(self.block_size) } + // where erasure reading begins. pub fn shard_file_offset(&self, start_offset: usize, length: usize, total_length: usize) -> usize { let shard_size = self.shard_size(self.block_size); let shard_file_size = self.shard_file_size(total_length); @@ -528,6 +529,7 @@ impl ShardReader { pub async fn read(&mut self) -> Result>>> { // let mut disks = self.readers; let reader_length = self.readers.len(); + // 需要读取的块长度 let mut read_length = self.shard_size; if self.offset + read_length > self.shard_file_size { read_length = self.shard_file_size - self.offset diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index c6db0f8e..0855fa94 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - io::{Cursor, Write}, + io::Write, path::Path, sync::Arc, time::Duration, @@ -1855,20 +1855,23 @@ impl SetDisks { // debug!("read part_path {}", &part_path); if let Some(disk) = disk_op { - let filereader = { - if let Some(ref data) = files[idx].data { - FileReader::Buffer(BufferReader::new(data.clone())) - } else { - let disk = disk.clone(); - let part_path = - format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number); + // let filereader = { + // if let Some(ref data) = files[idx].data { + // FileReader::Buffer(BufferReader::new(data.clone())) + // } else { + // let disk = disk.clone(); + // let part_path = + // format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number); - disk.read_file(bucket, &part_path).await? - } - }; + // disk.read_file(bucket, &part_path).await? + // } + // }; let checksum_info = files[idx].erasure.get_checksum_info(part_number); let reader = new_bitrot_filereader( - filereader, + disk.clone(), + files[idx].data.clone(), + bucket.to_owned(), + format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number), till_offset, checksum_info.algorithm, erasure.shard_size(erasure.block_size), @@ -2411,18 +2414,21 @@ impl SetDisks { let mut prefer = vec![false; latest_disks.len()]; for (index, disk) in latest_disks.iter().enumerate() { if let (Some(disk), Some(metadata)) = (disk, ©_parts_metadata[index]) { - let filereader = { - if let Some(ref data) = metadata.data { - FileReader::Buffer(BufferReader::new(data.clone())) - } else { - let disk = disk.clone(); - let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number); + // let filereader = { + // if let Some(ref data) = metadata.data { + // FileReader::Buffer(BufferReader::new(data.clone())) + // } else { + // let disk = disk.clone(); + // let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number); - disk.read_file(bucket, &part_path).await? - } - }; + // disk.read_file(bucket, &part_path).await? + // } + // }; let reader = new_bitrot_filereader( - filereader, + disk.clone(), + metadata.data.clone(), + bucket.to_owned(), + format!("{}/{}/part.{}", object, src_data_dir, part.number), till_offset, checksum_algo.clone(), erasure.shard_size(erasure.block_size), @@ -5239,13 +5245,15 @@ async fn disks_with_all_parts( let checksum_info = meta.erasure.get_checksum_info(meta.parts[0].number); let data_len = data.len(); let verify_err = match bitrot_verify( - &mut Cursor::new(data.to_vec()), + FileReader::Buffer(BufferReader::new(data.clone(), 0, data_len)), data_len, meta.erasure.shard_file_size(meta.size), checksum_info.algorithm, checksum_info.hash, meta.erasure.shard_size(meta.erasure.block_size), - ) { + ) + .await + { Ok(_) => None, Err(err) => Some(err), }; diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 9c8f2403..4b398132 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -1,5 +1,6 @@ pub mod handlers; pub mod router; +mod rpc; pub mod utils; use common::error::Result; @@ -11,6 +12,7 @@ use handlers::{ }; use hyper::Method; use router::{AdminOperation, S3Router}; +use rpc::regist_rpc_route; use s3s::route::S3Route; const ADMIN_PREFIX: &str = "/rustfs/admin"; @@ -21,6 +23,7 @@ pub fn make_admin_route() -> Result { // 1 r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?; + regist_rpc_route(&mut r)?; regist_user_route(&mut r)?; r.insert( diff --git a/rustfs/src/admin/router.rs b/rustfs/src/admin/router.rs index 46a6bb9a..4fb605d6 100644 --- a/rustfs/src/admin/router.rs +++ b/rustfs/src/admin/router.rs @@ -14,6 +14,7 @@ use s3s::S3Request; use s3s::S3Response; use s3s::S3Result; +use super::rpc::RPC_PREFIX; use super::ADMIN_PREFIX; pub struct S3Router { @@ -63,7 +64,7 @@ where } } - uri.path().starts_with(ADMIN_PREFIX) + uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) } async fn call(&self, req: S3Request) -> S3Result> { @@ -81,6 +82,10 @@ where // check_access before call async fn check_access(&self, req: &mut S3Request) -> S3Result<()> { + // TODO: check access by req.credentials + if req.uri.path().starts_with(RPC_PREFIX) { + return Ok(()); + } match req.credentials { Some(_) => Ok(()), None => Err(s3_error!(AccessDenied, "Signature is required")), diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs new file mode 100644 index 00000000..fc7b5652 --- /dev/null +++ b/rustfs/src/admin/rpc.rs @@ -0,0 +1,97 @@ +use super::router::AdminOperation; +use super::router::Operation; +use super::router::S3Router; +use crate::storage::ecfs::bytes_stream; +use common::error::Result; +use ecstore::disk::DiskAPI; +use ecstore::disk::FileReader; +use ecstore::store::find_local_disk; +use http::StatusCode; +use hyper::Method; +use matchit::Params; +use s3s::dto::StreamingBlob; +use s3s::s3_error; +use s3s::Body; +use s3s::S3Request; +use s3s::S3Response; +use s3s::S3Result; +use serde_urlencoded::from_bytes; +use tokio_util::io::ReaderStream; +use tracing::warn; + +pub const RPC_PREFIX: &str = "/rustfs/rpc"; + +pub fn regist_rpc_route(r: &mut S3Router) -> Result<()> { + r.insert( + Method::GET, + format!("{}{}", RPC_PREFIX, "/read_file_stream").as_str(), + AdminOperation(&ReadFile {}), + )?; + + Ok(()) +} + +// /rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}" +#[derive(Debug, Default, serde::Deserialize)] +pub struct ReadFileQuery { + disk: String, + volume: String, + path: String, + offset: usize, + length: usize, +} +pub struct ReadFile {} +#[async_trait::async_trait] +impl Operation for ReadFile { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle ReadFile"); + + let query = { + if let Some(query) = req.uri.query() { + let input: ReadFileQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed1"))?; + input + } else { + ReadFileQuery::default() + } + }; + + let Some(disk) = find_local_disk(&query.disk).await else { + return Err(s3_error!(InvalidArgument, "disk not found")); + }; + + let file: FileReader = disk + .read_file_stream(&query.volume, &query.path, query.offset, query.length) + .await + .map_err(|e| s3_error!(InternalError, "read file err {}", e))?; + + let s = bytes_stream(ReaderStream::new(file), query.length); + + Ok(S3Response::new((StatusCode::OK, Body::from(StreamingBlob::wrap(s))))) + + // let querys = req.uri.query().map(|q| { + // let mut querys = HashMap::new(); + // for (k, v) in url::form_urlencoded::parse(q.as_bytes()) { + // println!("{}={}", k, v); + // querys.insert(k.to_string(), v.to_string()); + // } + // querys + // }); + + // // TODO: file_path from root + + // if let Some(file_path) = querys.and_then(|q| q.get("file_path").cloned()) { + // let file = fs::OpenOptions::new() + // .read(true) + // .open(file_path) + // .await + // .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("open file err {}", e)))?; + + // let s = bytes_stream(ReaderStream::new(file), 0); + + // return Ok(S3Response::new((StatusCode::OK, Body::from(StreamingBlob::wrap(s))))); + // } + + // Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::empty()))) + } +} diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 0411d095..c25c0519 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -9,8 +9,7 @@ use ecstore::{ admin_server_info::get_local_server_property, bucket::{metadata::load_bucket_metadata, metadata_sys}, disk::{ - DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, Reader, - UpdateMetadataOpts, + DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, UpdateMetadataOpts, }, erasure::Writer, error::Error as EcsError, @@ -694,103 +693,104 @@ impl Node for NodeService { } type ReadAtStream = ResponseStream; - async fn read_at(&self, request: Request>) -> Result, Status> { + async fn read_at(&self, _request: Request>) -> Result, Status> { info!("read_at"); + unimplemented!("read_at"); - let mut in_stream = request.into_inner(); - let (tx, rx) = mpsc::channel(128); + // let mut in_stream = request.into_inner(); + // let (tx, rx) = mpsc::channel(128); - tokio::spawn(async move { - let mut file_ref = None; - while let Some(result) = in_stream.next().await { - match result { - Ok(v) => { - match file_ref.as_ref() { - Some(_) => (), - None => { - if let Some(disk) = find_local_disk(&v.disk).await { - match disk.read_file(&v.volume, &v.path).await { - Ok(file_reader) => file_ref = Some(file_reader), - Err(err) => { - tx.send(Ok(ReadAtResponse { - success: false, - data: Vec::new(), - error: Some(err_to_proto_err(&err, &format!("read file failed: {}", err))), - read_size: -1, - })) - .await - .expect("working rx"); - break; - } - } - } else { - tx.send(Ok(ReadAtResponse { - success: false, - data: Vec::new(), - error: Some(err_to_proto_err( - &EcsError::new(StorageError::InvalidArgument( - Default::default(), - Default::default(), - Default::default(), - )), - "can not find disk", - )), - read_size: -1, - })) - .await - .expect("working rx"); - break; - } - } - }; + // tokio::spawn(async move { + // let mut file_ref = None; + // while let Some(result) = in_stream.next().await { + // match result { + // Ok(v) => { + // match file_ref.as_ref() { + // Some(_) => (), + // None => { + // if let Some(disk) = find_local_disk(&v.disk).await { + // match disk.read_file(&v.volume, &v.path).await { + // Ok(file_reader) => file_ref = Some(file_reader), + // Err(err) => { + // tx.send(Ok(ReadAtResponse { + // success: false, + // data: Vec::new(), + // error: Some(err_to_proto_err(&err, &format!("read file failed: {}", err))), + // read_size: -1, + // })) + // .await + // .expect("working rx"); + // break; + // } + // } + // } else { + // tx.send(Ok(ReadAtResponse { + // success: false, + // data: Vec::new(), + // error: Some(err_to_proto_err( + // &EcsError::new(StorageError::InvalidArgument( + // Default::default(), + // Default::default(), + // Default::default(), + // )), + // "can not find disk", + // )), + // read_size: -1, + // })) + // .await + // .expect("working rx"); + // break; + // } + // } + // }; - let mut data = vec![0u8; v.length.try_into().unwrap()]; + // let mut data = vec![0u8; v.length.try_into().unwrap()]; - match file_ref - .as_mut() - .unwrap() - .read_at(v.offset.try_into().unwrap(), &mut data) - .await - { - Ok(read_size) => tx.send(Ok(ReadAtResponse { - success: true, - data, - read_size: read_size.try_into().unwrap(), - error: None, - })), - Err(err) => tx.send(Ok(ReadAtResponse { - success: false, - data: Vec::new(), - error: Some(err_to_proto_err(&err, &format!("read at failed: {}", err))), - read_size: -1, - })), - } - .await - .unwrap(); - } - Err(err) => { - if let Some(io_err) = match_for_io_error(&err) { - if io_err.kind() == ErrorKind::BrokenPipe { - // here you can handle special case when client - // disconnected in unexpected way - eprintln!("\tclient disconnected: broken pipe"); - break; - } - } + // match file_ref + // .as_mut() + // .unwrap() + // .read_at(v.offset.try_into().unwrap(), &mut data) + // .await + // { + // Ok(read_size) => tx.send(Ok(ReadAtResponse { + // success: true, + // data, + // read_size: read_size.try_into().unwrap(), + // error: None, + // })), + // Err(err) => tx.send(Ok(ReadAtResponse { + // success: false, + // data: Vec::new(), + // error: Some(err_to_proto_err(&err, &format!("read at failed: {}", err))), + // read_size: -1, + // })), + // } + // .await + // .unwrap(); + // } + // Err(err) => { + // if let Some(io_err) = match_for_io_error(&err) { + // if io_err.kind() == ErrorKind::BrokenPipe { + // // here you can handle special case when client + // // disconnected in unexpected way + // eprintln!("\tclient disconnected: broken pipe"); + // break; + // } + // } - match tx.send(Err(err)).await { - Ok(_) => (), - Err(_err) => break, // response was dropped - } - } - } - } - println!("\tstream ended"); - }); + // match tx.send(Err(err)).await { + // Ok(_) => (), + // Err(_err) => break, // response was dropped + // } + // } + // } + // } + // println!("\tstream ended"); + // }); - let out_stream = ReceiverStream::new(rx); + // let out_stream = ReceiverStream::new(rx); - Ok(tonic::Response::new(Box::pin(out_stream))) + // Ok(tonic::Response::new(Box::pin(out_stream))) } async fn list_dir(&self, request: Request) -> Result, Status> { diff --git a/scripts/run.sh b/scripts/run.sh index bd114184..2b00ade0 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -6,8 +6,8 @@ fi current_dir=$(pwd) -mkdir -p ./target/volume/test -# mkdir -p ./target/volume/test{0..4} +# mkdir -p ./target/volume/test +mkdir -p ./target/volume/test{0..4} if [ -z "$RUST_LOG" ]; then @@ -19,8 +19,8 @@ fi # export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB" -# RUSTFS_VOLUMES="./target/volume/test{0...4}" -export RUSTFS_VOLUMES="./target/volume/test" +export RUSTFS_VOLUMES="./target/volume/test{0...4}" +# export RUSTFS_VOLUMES="./target/volume/test" export RUSTFS_ADDRESS="0.0.0.0:9000" export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002"