From ae06d87f1019d972970aa68a870babc2a3b4009f Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 14 Mar 2025 00:35:27 +0800 Subject: [PATCH] opt network io --- .cargo/config.toml | 4 +- .gitignore | 1 + Cargo.lock | 17 - Cargo.toml | 2 - Dockerfile | 17 + docker-compose.yaml | 90 +++-- ecstore/Cargo.toml | 1 - ecstore/src/bitrot.rs | 3 +- ecstore/src/bucket/metadata.rs | 6 +- ecstore/src/cache_value/metacache_set.rs | 2 +- ecstore/src/config/common.rs | 22 +- ecstore/src/disk/io.rs | 121 ------ ecstore/src/disk/local.rs | 22 +- ecstore/src/disk/mod.rs | 434 +------------------- ecstore/src/disk/os.rs | 16 +- ecstore/src/disk/remote.rs | 70 ++-- ecstore/src/erasure.rs | 39 +- ecstore/src/heal/data_scanner.rs | 4 +- ecstore/src/heal/data_usage.rs | 5 +- ecstore/src/heal/data_usage_cache.rs | 4 +- ecstore/src/io.rs | 325 ++++++--------- ecstore/src/metacache/writer.rs | 15 +- ecstore/src/peer_rest_client.rs | 2 +- ecstore/src/pools.rs | 2 +- ecstore/src/set_disk.rs | 99 ++--- ecstore/src/sets.rs | 4 +- ecstore/src/store_api.rs | 40 +- iam/src/store/object.rs | 2 +- reader/Cargo.toml | 24 -- reader/src/error.rs | 12 - reader/src/hasher.rs | 170 -------- reader/src/lib.rs | 7 - reader/src/reader.rs | 493 ----------------------- reader/src/readme.md | 5 - rustfs/src/admin/rpc.rs | 6 +- rustfs/src/main.rs | 8 +- rustfs/src/storage/ecfs.rs | 18 +- 37 files changed, 420 insertions(+), 1692 deletions(-) create mode 100644 Dockerfile delete mode 100644 ecstore/src/disk/io.rs delete mode 100644 reader/Cargo.toml delete mode 100644 reader/src/error.rs delete mode 100644 reader/src/hasher.rs delete mode 100644 reader/src/lib.rs delete mode 100644 reader/src/reader.rs delete mode 100644 reader/src/readme.md diff --git a/.cargo/config.toml b/.cargo/config.toml index 624d2724..52d967b7 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,4 @@ [target.x86_64-unknown-linux-gnu] -rustflags = ["-Clink-arg=-fuse-ld=lld"] +rustflags = [ + "-C", "link-arg=-fuse-ld=bfd" +] diff --git a/.gitignore b/.gitignore index 45147d58..83b9ef43 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ .vscode /test /logs +/data .devcontainer rustfs/static/* vendor diff --git a/Cargo.lock b/Cargo.lock index b5722b5c..a7d6a71a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1921,7 +1921,6 @@ dependencies = [ "pin-project-lite", "protos", "rand 0.8.5", - "reader", "reed-solomon-erasure", "regex", "reqwest", @@ -4981,22 +4980,6 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" -[[package]] -name = "reader" -version = "0.0.1" -dependencies = [ - "bytes", - "futures", - "hex-simd", - "md-5", - "pin-project-lite", - "s3s", - "sha2 0.11.0-pre.4", - "thiserror 2.0.11", - "tokio", - "tracing", -] - [[package]] name = "redox_syscall" version = "0.2.16" diff --git a/Cargo.toml b/Cargo.toml index a78ec0d7..72c912bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,6 @@ members = [ "common/common", # Shared utilities and data structures "common/lock", # Distributed locking implementation "common/protos", # Protocol buffer definitions - "reader", # Object reading service "common/workers", # Worker thread pools and task scheduling "iam", # Identity and Access Management "crypto", # Cryptography and security features @@ -43,7 +42,6 @@ flatbuffers = "24.12.23" futures = "0.3.31" futures-util = "0.3.31" common = { path = "./common/common" } -reader = { path = "./reader" } hex = "0.4.3" hyper = "1.6.0" hyper-util = { version = "0.1.10", features = [ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..035a2c08 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM alpine:latest + +# RUN apk add --no-cache + +WORKDIR /app + +RUN mkdir -p /data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3 + +COPY ./target/x86_64-unknown-linux-musl/release/rustfs /app/rustfs + +RUN chmod +x /app/rustfs + +EXPOSE 9000 +EXPOSE 9001 + + +CMD ["/app/rustfs"] \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 5c68534b..bee2f4a0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,74 +1,82 @@ -version: '3.8' - services: - node1: + node0: image: rustfs:v1 # 替换为你的镜像名称和标签 - container_name: node1 + container_name: node0 + hostname: node0 environment: - - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} + - RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3} - RUSTFS_ADDRESS=0.0.0.0:9000 - RUSTFS_CONSOLE_ENABLE=true - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 platform: linux/amd64 ports: - - "9001:9000" # 映射宿主机的 9001 端口到容器的 9000 端口 + - "9000:9000" # 映射宿主机的 9001 端口到容器的 9000 端口 + - "8000:9001" # 映射宿主机的 9001 端口到容器的 9000 端口 volumes: - - ..:/root/data # 将当前路径挂载到容器内的 /root/data - command: "/root/rustfs" - networks: - - my_network + - ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs + # - ./data/node0:/data # 将当前路径挂载到容器内的 /root/data + command: "/app/rustfs" + + node1: + image: rustfs:v1 + container_name: node1 + hostname: node1 + environment: + - RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3} + - RUSTFS_ADDRESS=0.0.0.0:9000 + - RUSTFS_CONSOLE_ENABLE=true + - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 + platform: linux/amd64 + ports: + - "9001:9000" # 映射宿主机的 9002 端口到容器的 9000 端口 + volumes: + - ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs + # - ./data/node1:/data + command: "/app/rustfs" node2: image: rustfs:v1 container_name: node2 + hostname: node2 environment: - - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} + - RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3} - RUSTFS_ADDRESS=0.0.0.0:9000 - RUSTFS_CONSOLE_ENABLE=true - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 platform: linux/amd64 ports: - - "9002:9000" # 映射宿主机的 9002 端口到容器的 9000 端口 + - "9002:9000" # 映射宿主机的 9003 端口到容器的 9000 端口 volumes: - - ..:/root/data - command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs" - networks: - - my_network + - ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs + # - ./data/node2:/data + command: "/app/rustfs" node3: image: rustfs:v1 container_name: node3 + hostname: node3 environment: - - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} + - RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3} - RUSTFS_ADDRESS=0.0.0.0:9000 - RUSTFS_CONSOLE_ENABLE=true - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 platform: linux/amd64 ports: - - "9003:9000" # 映射宿主机的 9003 端口到容器的 9000 端口 + - "9003:9000" # 映射宿主机的 9004 端口到容器的 9000 端口 volumes: - - ..:/root/data - command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs" - networks: - - my_network + - ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs + # - ./data/node3:/data + command: "/app/rustfs" - node4: - image: rustfs:v1 - container_name: node4 - environment: - - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} - - RUSTFS_ADDRESS=0.0.0.0:9000 - - RUSTFS_CONSOLE_ENABLE=true - - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 - platform: linux/amd64 - ports: - - "9004:9000" # 映射宿主机的 9004 端口到容器的 9000 端口 - volumes: - - ..:/root/data - command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs" - networks: - - my_network -networks: - my_network: - driver: bridge \ No newline at end of file + + + + 2025-03-14T05:23:15.661154Z INFO ecstore::disk::os: reliable_rename rm dst failed. src_file_path: "/data/rustfs1/.rustfs.sys/tmp/c7fabb9c-48c8-4827-b5e2-13271c3867c3x1741929793/part.38", dst_file_path: "/data/rustfs1/.rustfs.sys/multipart/494d877741f5e87d5160dc4e1bd4fbdacda64559ea0b7d16cdbeed61f252b98f/a83dc20f-e73a-46d0-a02b-11b330ba6e7ex1741929773056730169/641d3efd-cca0-418e-983b-ca2d47652900/part.38", base_dir: "/data/rustfs1/.rustfs.sys/multipart", err: Os { code: 2, kind: NotFound, message: "No such file or directory" } + at ecstore/src/disk/os.rs:144 + + 2025-03-14T05:23:15.953116Z INFO ecstore::disk::os: reliable_rename rm dst failed. src_file_path: "/data/rustfs3/.rustfs.sys/tmp/e712821f-bc3f-4ffe-8a0c-0daa379d00d4x1741929793/part.39", dst_file_path: "/data/rustfs3/.rustfs.sys/multipart/494d877741f5e87d5160dc4e1bd4fbdacda64559ea0b7d16cdbeed61f252b98f/a83dc20f-e73a-46d0-a02b-11b330ba6e7ex1741929773056730169/641d3efd-cca0-418e-983b-ca2d47652900/part.39", base_dir: "/data/rustfs3/.rustfs.sys/multipart", err: Os { code: 2, kind: NotFound, message: "No such file or directory" } + at ecstore/src/disk/os.rs:144 + + 2025-03-14T05:23:15.953218Z INFO ecstore::disk::os: reliable_rename rm dst failed. src_file_path: "/data/rustfs2/.rustfs.sys/tmp/e712821f-bc3f-4ffe-8a0c-0daa379d00d4x1741929793/part.39", dst_file_path: "/data/rustfs2/.rustfs.sys/multipart/494d877741f5e87d5160dc4e1bd4fbdacda64559ea0b7d16cdbeed61f252b98f/a83dc20f-e73a-46d0-a02b-11b330ba6e7ex1741929773056730169/641d3efd-cca0-418e-983b-ca2d47652900/part.39", base_dir: "/data/rustfs2/.rustfs.sys/multipart", err: Os { code: 2, kind: NotFound, message: "No such file or directory" } + at ecstore/src/disk/os.rs:144 \ No newline at end of file diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index ab3bd5aa..f7357e91 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -17,7 +17,6 @@ blake2 = "0.10.6" bytes.workspace = true common.workspace = true chrono.workspace = true -reader.workspace = true glob = "0.3.2" thiserror.workspace = true flatbuffers.workspace = true diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index b69ef859..849e54b9 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -1,7 +1,8 @@ use crate::{ - disk::{error::DiskError, Disk, DiskAPI, FileReader, FileWriter}, + disk::{error::DiskError, Disk, DiskAPI}, erasure::{ReadAt, Writer}, error::{Error, Result}, + io::{FileReader, FileWriter}, store_api::BitrotAlgorithm, }; use blake2::Blake2b512; diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index 02d25e8f..fc97224e 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use time::OffsetDateTime; -use tracing::{error, info}; +use tracing::error; use crate::config::common::{read_config, save_config}; use crate::error::{Error, Result}; @@ -311,7 +311,7 @@ impl BucketMetadata { buf.extend_from_slice(&data); - save_config(store, self.save_file_path().as_str(), &buf).await?; + save_config(store, self.save_file_path().as_str(), buf).await?; Ok(()) } @@ -367,7 +367,7 @@ pub async fn load_bucket_metadata_parse(api: Arc, bucket: &str, parse: return Err(err); } - info!("bucketmeta {} not found with err {:?}, start to init ", bucket, &err); + // info!("bucketmeta {} not found with err {:?}, start to init ", bucket, &err); BucketMetadata::new(bucket) } diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index 263b7fa0..401561b6 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -164,7 +164,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - let entry = match r.peek().await { Ok(res) => { if let Some(entry) = res { - info!("read entry disk: {}, name: {}", i, entry.name); + // info!("read entry disk: {}, name: {}", i, entry.name); entry } else { // eof diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs index 14c386cd..837f577a 100644 --- a/ecstore/src/config/common.rs +++ b/ecstore/src/config/common.rs @@ -1,6 +1,3 @@ -use std::collections::HashSet; -use std::sync::Arc; - use super::error::{is_err_config_not_found, ConfigError}; use super::{storageclass, Config, GLOBAL_StorageClass, KVS}; use crate::disk::RUSTFS_META_BUCKET; @@ -10,8 +7,9 @@ use crate::store_err::is_err_object_not_found; use crate::utils::path::SLASH_SEPARATOR; use http::HeaderMap; use lazy_static::lazy_static; -use s3s::dto::StreamingBlob; -use s3s::Body; +use std::collections::HashSet; +use std::io::Cursor; +use std::sync::Arc; use tracing::{error, warn}; pub const CONFIG_PREFIX: &str = "config"; @@ -59,7 +57,7 @@ pub async fn read_config_with_metadata( Ok((data, rd.object_info)) } -pub async fn save_config(api: Arc, file: &str, data: &[u8]) -> Result<()> { +pub async fn save_config(api: Arc, file: &str, data: Vec) -> Result<()> { save_config_with_opts( api, file, @@ -96,14 +94,10 @@ pub async fn delete_config(api: Arc, file: &str) -> Result<()> } } -async fn save_config_with_opts(api: Arc, file: &str, data: &[u8], opts: &ObjectOptions) -> Result<()> { +async fn save_config_with_opts(api: Arc, file: &str, data: Vec, opts: &ObjectOptions) -> Result<()> { + let size = data.len(); let _ = api - .put_object( - RUSTFS_META_BUCKET, - file, - &mut PutObjReader::new(StreamingBlob::from(Body::from(data.to_vec())), data.len()), - opts, - ) + .put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::new(Box::new(Cursor::new(data)), size), opts) .await?; Ok(()) } @@ -174,7 +168,7 @@ async fn save_server_config(api: Arc, cfg: &Config) -> Result< let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR, CONFIG_FILE); - save_config(api, &config_file, data.as_slice()).await + save_config(api, &config_file, data).await } pub async fn lookup_configs(cfg: &mut Config, api: Arc) { diff --git a/ecstore/src/disk/io.rs b/ecstore/src/disk/io.rs deleted file mode 100644 index c0360050..00000000 --- a/ecstore/src/disk/io.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::error::Result; -use futures::TryStreamExt; -use std::pin::Pin; -use std::task::Poll; -use tokio::io::AsyncWrite; -use tokio::sync::oneshot; -use tokio_util::io::ReaderStream; -use tokio_util::io::StreamReader; -use tracing::error; -use tracing::warn; - -use super::FileReader; - -#[derive(Debug)] -pub struct HttpFileWriter { - wd: tokio::io::WriteHalf, - err_rx: oneshot::Receiver, -} - -impl HttpFileWriter { - pub fn new(url: &str, disk: &str, volume: &str, path: &str, size: usize, append: bool) -> Result { - let (rd, wd) = tokio::io::simplex(4096); - - let (err_tx, err_rx) = oneshot::channel::(); - - let body = reqwest::Body::wrap_stream(ReaderStream::new(rd)); - - let url = url.to_owned(); - let disk = disk.to_owned(); - let volume = volume.to_owned(); - let path = path.to_owned(); - - tokio::spawn(async move { - let client = reqwest::Client::new(); - if let Err(err) = client - .put(format!( - "{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}", - url, - urlencoding::encode(&disk), - urlencoding::encode(&volume), - urlencoding::encode(&path), - append, - size - )) - .body(body) - .send() - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - { - error!("HttpFileWriter put file err: {:?}", err); - - if let Err(er) = err_tx.send(err) { - error!("HttpFileWriter tx.send err: {:?}", er); - } - // return; - } - - // error!("http write done {}", path); - }); - - Ok(Self { - wd, - err_rx, - // client: reqwest::Client::new(), - // url: url.to_string(), - // disk: disk.to_string(), - // volume: volume.to_string(), - }) - } -} - -impl AsyncWrite for HttpFileWriter { - #[tracing::instrument(level = "debug", skip(self, buf))] - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - if let Ok(err) = self.as_mut().err_rx.try_recv() { - return Poll::Ready(Err(err)); - } - - Pin::new(&mut self.wd).poll_write(cx, buf) - } - - #[tracing::instrument(level = "debug", skip(self))] - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - Pin::new(&mut self.wd).poll_flush(cx) - } - - #[tracing::instrument(level = "debug", skip(self))] - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - Pin::new(&mut self.wd).poll_shutdown(cx) - } -} - -pub async fn new_http_reader( - url: &str, - disk: &str, - volume: &str, - path: &str, - offset: usize, - length: usize, -) -> Result { - let resp = reqwest::Client::new() - .get(format!( - "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", - url, - urlencoding::encode(disk), - urlencoding::encode(volume), - urlencoding::encode(path), - offset, - length - )) - .send() - .await?; - - let inner = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other)); - - Ok(Box::new(inner)) -} diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 4690450c..69b1717c 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -5,9 +5,9 @@ use super::error::{ use super::os::{is_root_disk, rename_all}; use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; use super::{ - os, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics, FileInfoVersions, - FileReader, FileWriter, Info, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, - UpdateMetadataOpts, VolumeInfo, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET, STORAGE_FORMAT_FILE_BACKUP, + os, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics, FileInfoVersions, Info, + MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, + WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET, STORAGE_FORMAT_FILE_BACKUP, }; use crate::bitrot::bitrot_verify; use crate::bucket::metadata_sys::{self}; @@ -27,6 +27,7 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry}; use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE}; use crate::heal::heal_commands::{HealScanMode, HealingTracker}; use crate::heal::heal_ops::HEALING_TRACKER_FILENAME; +use crate::io::{FileReader, FileWriter}; use crate::metacache::writer::MetacacheWriter; use crate::new_object_layer_fn; use crate::set_disk::{ @@ -326,7 +327,7 @@ impl LocalDisk { } } - // FIXME: 先清空回收站吧,有时间再添加判断逻辑 + // TODO: 优化 FIXME: 先清空回收站吧,有时间再添加判断逻辑 if let Err(err) = { if trash_path.is_dir() { @@ -1523,7 +1524,7 @@ impl DiskAPI for LocalDisk { // TODO: io verifier #[tracing::instrument(level = "debug", skip(self))] async fn read_file(&self, volume: &str, path: &str) -> Result { - warn!("disk read_file: volume: {}, path: {}", volume, path); + // warn!("disk read_file: volume: {}, path: {}", volume, path); let volume_dir = self.get_bucket_path(volume)?; if !skip_access_checks(volume) { if let Err(e) = utils::fs::access(&volume_dir).await { @@ -1557,10 +1558,10 @@ impl DiskAPI for LocalDisk { #[tracing::instrument(level = "debug", skip(self))] async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { - warn!( - "disk read_file_stream: volume: {}, path: {}, offset: {}, length: {}", - volume, path, offset, length - ); + // warn!( + // "disk read_file_stream: volume: {}, path: {}, offset: {}, length: {}", + // volume, path, offset, length + // ); let volume_dir = self.get_bucket_path(volume)?; if !skip_access_checks(volume) { @@ -1748,7 +1749,7 @@ impl DiskAPI for LocalDisk { return Err(os_err_to_file_err(e)); } - info!("read xl.meta failed, dst_file_path: {:?}, err: {:?}", dst_file_path, e); + // info!("read xl.meta failed, dst_file_path: {:?}, err: {:?}", dst_file_path, e); None } }; @@ -2247,7 +2248,6 @@ impl DiskAPI for LocalDisk { } async fn delete_volume(&self, volume: &str) -> Result<()> { - info!("delete_volume, volume: {}", volume); let p = self.get_bucket_path(volume)?; // TODO: 不能用递归删除,如果目录下面有文件,返回errVolumeNotEmpty diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index a708977e..8d737e92 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -1,7 +1,6 @@ pub mod endpoint; pub mod error; pub mod format; -pub mod io; pub mod local; pub mod os; pub mod remote; @@ -24,6 +23,7 @@ use crate::{ data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::{HealScanMode, HealingTracker}, }, + io::{FileReader, FileWriter}, store_api::{FileInfo, ObjectInfo, RawFileInfo}, utils::path::SLASH_SEPARATOR, }; @@ -35,11 +35,7 @@ use remote::RemoteDisk; use serde::{Deserialize, Serialize}; use std::{cmp::Ordering, fmt::Debug, path::PathBuf, sync::Arc}; use time::OffsetDateTime; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::mpsc::Sender, -}; -use tracing::info; +use tokio::{io::AsyncWrite, sync::mpsc::Sender}; use tracing::warn; use uuid::Uuid; @@ -328,7 +324,6 @@ impl DiskAPI for Disk { } async fn delete_volume(&self, volume: &str) -> Result<()> { - info!("delete_volume, volume: {}", volume); match self { Disk::Local(local_disk) => local_disk.delete_volume(volume).await, Disk::Remote(remote_disk) => remote_disk.delete_volume(volume).await, @@ -349,7 +344,6 @@ impl DiskAPI for Disk { scan_mode: HealScanMode, we_sleep: ShouldSleepFn, ) -> Result { - info!("ns_scanner"); match self { Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await, Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await, @@ -374,9 +368,6 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result; -pub type FileWriter = Box; - #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { fn to_string(&self) -> String; @@ -1184,20 +1175,6 @@ pub struct ReadMultipleResp { pub mod_time: Option, } -// impl Default for ReadMultipleResp { -// fn default() -> Self { -// Self { -// bucket: String::new(), -// prefix: String::new(), -// file: String::new(), -// exists: false, -// error: String::new(), -// data: Vec::new(), -// mod_time: OffsetDateTime::UNIX_EPOCH, -// } -// } -// } - #[derive(Debug, Deserialize, Serialize)] pub struct VolumeInfo { pub name: String, @@ -1210,410 +1187,3 @@ pub struct ReadOptions { pub read_data: bool, pub healing: bool, } - -// pub struct FileWriter { -// pub inner: Pin>, -// } - -// impl AsyncWrite for FileWriter { -// fn poll_write( -// mut self: Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &[u8], -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner).poll_write(cx, buf) -// } - -// fn poll_flush( -// mut self: Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner).poll_flush(cx) -// } - -// fn poll_shutdown( -// mut self: Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner).poll_shutdown(cx) -// } -// } - -// impl FileWriter { -// pub fn new(inner: W) -> Self -// where -// W: AsyncWrite + Send + Sync + 'static, -// { -// Self { inner: Box::pin(inner) } -// } -// } - -// #[derive(Debug)] -// pub struct BufferWriter { -// pub inner: Vec, -// } - -// impl BufferWriter { -// pub fn new(inner: Vec) -> Self { -// Self { inner } -// } -// #[allow(clippy::should_implement_trait)] -// pub fn as_ref(&self) -> &[u8] { -// self.inner.as_ref() -// } -// } - -// #[async_trait::async_trait] -// impl Writer for BufferWriter { -// fn as_any(&self) -> &dyn Any { -// self -// } - -// async fn write(&mut self, buf: &[u8]) -> Result<()> { -// let _ = self.inner.write(buf).await?; -// self.inner.flush().await?; - -// Ok(()) -// } -// } - -// #[derive(Debug)] -// pub struct LocalFileWriter { -// pub inner: File, -// } - -// impl LocalFileWriter { -// pub fn new(inner: File) -> Self { -// Self { inner } -// } -// } - -// #[async_trait::async_trait] -// impl Writer for LocalFileWriter { -// fn as_any(&self) -> &dyn Any { -// self -// } - -// async fn write(&mut self, buf: &[u8]) -> Result<()> { -// let _ = self.inner.write(buf).await?; -// self.inner.flush().await?; - -// Ok(()) -// } -// } - -// type NodeClient = NodeServiceClient< -// InterceptedService) -> Result, Status> + Send + Sync + 'static>>, -// >; - -// #[derive(Debug)] -// pub struct RemoteFileWriter { -// pub endpoint: Endpoint, -// pub volume: String, -// pub path: String, -// pub is_append: bool, -// tx: Sender, -// resp_stream: Streaming, -// } - -// impl RemoteFileWriter { -// pub async fn new(endpoint: Endpoint, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result { -// let (tx, rx) = mpsc::channel(128); -// let in_stream = ReceiverStream::new(rx); - -// let response = client.write_stream(in_stream).await.unwrap(); - -// let resp_stream = response.into_inner(); - -// Ok(Self { -// endpoint, -// volume, -// path, -// is_append, -// tx, -// resp_stream, -// }) -// } -// } - -// #[async_trait::async_trait] -// impl Writer for RemoteFileWriter { -// fn as_any(&self) -> &dyn Any { -// self -// } - -// async fn write(&mut self, buf: &[u8]) -> Result<()> { -// let request = WriteRequest { -// disk: self.endpoint.to_string(), -// volume: self.volume.to_string(), -// path: self.path.to_string(), -// is_append: self.is_append, -// data: buf.to_vec(), -// }; -// self.tx.send(request).await?; - -// if let Some(resp) = self.resp_stream.next().await { -// // match resp { -// // Ok(resp) => { -// // if resp.success { -// // info!("write stream success"); -// // } else { -// // info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string())); -// // } -// // } -// // Err(_err) => { - -// // } -// // } -// let resp = resp?; -// if resp.success { -// info!("write stream success"); -// } else { -// return if let Some(err) = &resp.error { -// Err(proto_err_to_err(err)) -// } else { -// Err(Error::from_string("")) -// }; -// } -// } else { -// let error_info = "can not get response"; -// info!("write stream failed: {}", error_info); -// return Err(Error::from_string(error_info)); -// } - -// Ok(()) -// } -// } - -// #[async_trait::async_trait] -// pub trait Reader { -// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result; -// // async fn seek(&mut self, offset: usize) -> Result<()>; -// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result; -// } - -// #[async_trait::async_trait] -// impl Reader for FileReader { -// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { -// match self { -// Self::Local(reader) => reader.read_at(offset, buf).await, -// Self::Remote(reader) => reader.read_at(offset, buf).await, -// Self::Buffer(reader) => reader.read_at(offset, buf).await, -// Self::Http(reader) => reader.read_at(offset, buf).await, -// } -// } -// // async fn seek(&mut self, offset: usize) -> Result<()> { -// // match self { -// // Self::Local(reader) => reader.seek(offset).await, -// // Self::Remote(reader) => reader.seek(offset).await, -// // Self::Buffer(reader) => reader.seek(offset).await, -// // } -// // } -// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result { -// // match self { -// // Self::Local(reader) => reader.read_exact(buf).await, -// // Self::Remote(reader) => reader.read_exact(buf).await, -// // Self::Buffer(reader) => reader.read_exact(buf).await, -// // } -// // } -// } - -// #[derive(Debug)] -// pub struct BufferReader { -// pub inner: Cursor>, -// remaining: usize, -// } - -// impl BufferReader { -// pub fn new(inner: Vec, offset: usize, read_length: usize) -> Self { -// let mut cur = Cursor::new(inner); -// cur.set_position(offset as u64); -// Self { -// inner: cur, -// remaining: offset + read_length, -// } -// } -// } - -// impl AsyncRead for BufferReader { -// #[tracing::instrument(level = "debug", skip(self, buf))] -// fn poll_read( -// mut self: Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &mut tokio::io::ReadBuf<'_>, -// ) -> std::task::Poll> { -// match Pin::new(&mut self.inner).poll_read(cx, buf) { -// Poll::Ready(Ok(_)) => { -// if self.inner.position() as usize >= self.remaining { -// self.remaining -= buf.filled().len(); -// Poll::Ready(Ok(())) -// } else { -// Poll::Pending -// } -// } -// Poll::Ready(Err(err)) => Poll::Ready(Err(err)), -// Poll::Pending => Poll::Pending, -// } -// } -// } - -// #[async_trait::async_trait] -// impl Reader for BufferReader { -// #[tracing::instrument(level = "debug", skip(self, buf))] -// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { -// if self.pos != offset { -// self.inner.set_position(offset as u64); -// } -// self.inner.read_exact(buf).await?; -// self.pos += buf.len(); -// Ok(buf.len()) -// } -// // #[tracing::instrument(level = "debug", skip(self))] -// // async fn seek(&mut self, offset: usize) -> Result<()> { -// // if self.pos != offset { -// // self.inner.set_position(offset as u64); -// // } - -// // Ok(()) -// // } -// // #[tracing::instrument(level = "debug", skip(self))] -// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result { -// // let bytes_read = self.inner.read_exact(buf).await?; -// // self.pos += buf.len(); -// // Ok(bytes_read) -// // } -// } - -// #[derive(Debug)] -// pub struct LocalFileReader { -// pub inner: File, -// // pos: usize, -// } - -// impl LocalFileReader { -// pub fn new(inner: File) -> Self { -// Self { inner } -// } -// } - -// #[async_trait::async_trait] -// impl Reader for LocalFileReader { -// #[tracing::instrument(level = "debug", skip(self, buf))] -// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { -// if self.pos != offset { -// self.inner.seek(SeekFrom::Start(offset as u64)).await?; -// self.pos = offset; -// } -// self.inner.read_exact(buf).await?; -// self.pos += buf.len(); -// Ok(buf.len()) -// } - -// // #[tracing::instrument(level = "debug", skip(self))] -// // async fn seek(&mut self, offset: usize) -> Result<()> { -// // if self.pos != offset { -// // self.inner.seek(SeekFrom::Start(offset as u64)).await?; -// // self.pos = offset; -// // } - -// // Ok(()) -// // } -// // #[tracing::instrument(level = "debug", skip(self, buf))] -// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result { -// // let bytes_read = self.inner.read_exact(buf).await?; -// // self.pos += buf.len(); -// // Ok(bytes_read) -// // } -// } - -// impl AsyncRead for LocalFileReader { -// #[tracing::instrument(level = "debug", skip(self, buf))] -// fn poll_read( -// mut self: Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &mut tokio::io::ReadBuf<'_>, -// ) -> std::task::Poll> { -// Pin::new(&mut self.inner).poll_read(cx, buf) -// } -// } - -// #[derive(Debug)] -// pub struct RemoteFileReader { -// pub endpoint: Endpoint, -// pub volume: String, -// pub path: String, -// tx: Sender, -// resp_stream: Streaming, -// } - -// impl RemoteFileReader { -// pub async fn new(endpoint: Endpoint, volume: String, path: String, mut client: NodeClient) -> Result { -// let (tx, rx) = mpsc::channel(128); -// let in_stream = ReceiverStream::new(rx); - -// let response = client.read_at(in_stream).await.unwrap(); - -// let resp_stream = response.into_inner(); - -// Ok(Self { -// endpoint, -// volume, -// path, -// tx, -// resp_stream, -// }) -// } -// } - -// #[async_trait::async_trait] -// impl Reader for RemoteFileReader { -// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { -// let request = ReadAtRequest { -// disk: self.endpoint.to_string(), -// volume: self.volume.to_string(), -// path: self.path.to_string(), -// offset: offset.try_into().unwrap(), -// // length: length.try_into().unwrap(), -// length: buf.len().try_into().unwrap(), -// }; -// self.tx.send(request).await?; - -// if let Some(resp) = self.resp_stream.next().await { -// let resp = resp?; -// if resp.success { -// info!("read at stream success"); - -// buf.copy_from_slice(&resp.data); - -// Ok(resp.read_size.try_into().unwrap()) -// } else { -// return if let Some(err) = &resp.error { -// Err(proto_err_to_err(err)) -// } else { -// Err(Error::from_string("")) -// }; -// } -// } else { -// let error_info = "can not get response"; -// info!("read at stream failed: {}", error_info); -// Err(Error::from_string(error_info)) -// } -// } -// // async fn seek(&mut self, _offset: usize) -> Result<()> { -// // unimplemented!() -// // } -// // async fn read_exact(&mut self, _buf: &mut [u8]) -> Result { -// // unimplemented!() -// // } -// } - -// impl AsyncRead for RemoteFileReader { -// #[tracing::instrument(level = "debug", skip(self, buf))] -// fn poll_read( -// mut self: Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// buf: &mut tokio::io::ReadBuf<'_>, -// ) -> std::task::Poll> { -// unimplemented!("poll_read") -// } -// } diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index fd5e043d..175052cb 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -141,13 +141,15 @@ pub async fn reliable_rename( } // need remove dst path if let Err(err) = utils::fs::remove_all(dst_file_path.as_ref()).await { - info!( - "reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", - src_file_path.as_ref(), - dst_file_path.as_ref(), - base_dir.as_ref(), - err - ); + if err.kind() != io::ErrorKind::NotFound { + info!( + "reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", + src_file_path.as_ref(), + dst_file_path.as_ref(), + base_dir.as_ref(), + err + ); + } } let mut i = 0; loop { diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 17c4c992..43f02832 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -23,8 +23,8 @@ use uuid::Uuid; use super::{ endpoint::Endpoint, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, - FileInfoVersions, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, - VolumeInfo, WalkDirOptions, + FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, + WalkDirOptions, }; use crate::{ disk::error::DiskError, @@ -36,11 +36,11 @@ use crate::{ }, store_api::{FileInfo, RawFileInfo}, }; +use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter}; use crate::{ - disk::io::{new_http_reader, HttpFileWriter}, + io::{FileReader, FileWriter, HttpFileReader, HttpFileWriter}, utils::proto_err_to_err, }; -use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter}; use protos::proto_gen::node_service::RenamePartRequst; #[derive(Debug)] @@ -135,7 +135,7 @@ impl DiskAPI for RemoteDisk { } async fn read_all(&self, volume: &str, path: &str) -> Result> { - info!("read_all"); + info!("read_all {}/{}", volume, path); let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; @@ -147,8 +147,6 @@ impl DiskAPI for RemoteDisk { let response = client.read_all(request).await?.into_inner(); - info!("read_all success"); - if !response.success { return Err(Error::new(DiskError::FileNotFound)); } @@ -182,7 +180,7 @@ impl DiskAPI for RemoteDisk { } async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { - info!("delete"); + info!("delete {}/{}/{}", self.endpoint.to_string(), volume, path); let options = serde_json::to_string(&opt)?; let mut client = node_service_time_out_client(&self.addr) .await @@ -264,7 +262,7 @@ impl DiskAPI for RemoteDisk { } async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec) -> Result<()> { - info!("rename_part"); + info!("rename_part {}/{}", src_volume, src_path); let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; @@ -318,7 +316,7 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(level = "debug", skip(self))] async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, file_size: usize) -> Result { - info!("create_file"); + info!("create_file {}/{}/{}", self.endpoint.to_string(), volume, path); Ok(Box::new(HttpFileWriter::new( self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), @@ -331,7 +329,7 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(level = "debug", skip(self))] async fn append_file(&self, volume: &str, path: &str) -> Result { - info!("append_file"); + info!("append_file {}/{}", volume, path); Ok(Box::new(HttpFileWriter::new( self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), @@ -344,25 +342,31 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(level = "debug", skip(self))] async fn read_file(&self, volume: &str, path: &str) -> Result { - info!("read_file"); - Ok(new_http_reader(self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), volume, path, 0, 0).await?) + info!("read_file {}/{}", volume, path); + Ok(Box::new( + HttpFileReader::new(self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), volume, path, 0, 0) + .await?, + )) } #[tracing::instrument(level = "debug", skip(self))] async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { - Ok(new_http_reader( - self.endpoint.grid_host().as_str(), - self.endpoint.to_string().as_str(), - volume, - path, - offset, - length, - ) - .await?) + info!("read_file_stream {}/{}/{}", self.endpoint.to_string(), volume, path); + Ok(Box::new( + HttpFileReader::new( + self.endpoint.grid_host().as_str(), + self.endpoint.to_string().as_str(), + volume, + path, + offset, + length, + ) + .await?, + )) } async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result> { - info!("list_dir"); + info!("list_dir {}/{}", volume, _dir_path); let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; @@ -386,7 +390,8 @@ impl DiskAPI for RemoteDisk { // FIXME: TODO: use writer async fn walk_dir(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> { - info!("walk_dir"); + let now = std::time::SystemTime::now(); + info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix); let mut wr = wr; let mut out = MetacacheWriter::new(&mut wr); let mut buf = Vec::new(); @@ -415,6 +420,12 @@ impl DiskAPI for RemoteDisk { } } + info!( + "walk_dir {}/{:?} done {:?}", + opts.bucket, + opts.filter_prefix, + now.elapsed().unwrap_or_default() + ); Ok(()) } @@ -426,7 +437,7 @@ impl DiskAPI for RemoteDisk { dst_volume: &str, dst_path: &str, ) -> Result { - info!("rename_data"); + info!("rename_data {}/{}/{}/{}", self.addr, self.endpoint.to_string(), dst_volume, dst_path); let file_info = serde_json::to_string(&fi)?; let mut client = node_service_time_out_client(&self.addr) .await @@ -608,7 +619,7 @@ impl DiskAPI for RemoteDisk { } async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { - info!("write_metadata"); + info!("write_metadata {}/{}", volume, path); let file_info = serde_json::to_string(&fi)?; let mut client = node_service_time_out_client(&self.addr) .await @@ -670,7 +681,7 @@ impl DiskAPI for RemoteDisk { } async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { - info!("read_xl"); + info!("read_xl {}/{}/{}", self.endpoint.to_string(), volume, path); let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; @@ -779,7 +790,7 @@ impl DiskAPI for RemoteDisk { } async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { - info!("read_multiple"); + info!("read_multiple {}/{}/{}", self.endpoint.to_string(), req.bucket, req.prefix); let read_multiple_req = serde_json::to_string(&req)?; let mut client = node_service_time_out_client(&self.addr) .await @@ -809,7 +820,7 @@ impl DiskAPI for RemoteDisk { } async fn delete_volume(&self, volume: &str) -> Result<()> { - info!("delete_volume"); + info!("delete_volume {}/{}", self.endpoint.to_string(), volume); let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; @@ -832,7 +843,6 @@ impl DiskAPI for RemoteDisk { } async fn disk_info(&self, opts: &DiskInfoOptions) -> Result { - info!("delete_volume"); let opts = serde_json::to_string(&opts)?; let mut client = node_service_time_out_client(&self.addr) .await diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 0056d4bf..83d1d9ae 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,13 +1,11 @@ use crate::bitrot::{BitrotReader, BitrotWriter}; -use crate::error::{Error, Result, StdError}; +use crate::error::{Error, Result}; use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs}; -use bytes::Bytes; use futures::future::join_all; -use futures::{pin_mut, Stream, StreamExt}; use reed_solomon_erasure::galois_8::ReedSolomon; use std::any::Any; use std::io::ErrorKind; -use tokio::io::DuplexStream; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::warn; use tracing::{error, info}; @@ -49,22 +47,22 @@ impl Erasure { } } - #[tracing::instrument(level = "debug", skip(self, body, writers))] + #[tracing::instrument(level = "debug", skip(self, reader, writers))] pub async fn encode( &mut self, - body: S, + reader: &mut S, writers: &mut [Option], // block_size: usize, total_size: usize, write_quorum: usize, ) -> Result where - S: Stream> + Send + Sync, + S: AsyncRead + Unpin + Send + 'static, { - pin_mut!(body); - let mut reader = tokio_util::io::StreamReader::new( - body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), - ); + // pin_mut!(body); + // let mut reader = tokio_util::io::StreamReader::new( + // body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), + // ); let mut total: usize = 0; @@ -101,6 +99,7 @@ impl Erasure { let blocks = self.encode_data(&self.buf)?; let mut errs = Vec::new(); + // TODO: 并发写入 for (i, w_op) in writers.iter_mut().enumerate() { if let Some(w) = w_op { match w.write(blocks[i].as_ref()).await { @@ -204,14 +203,17 @@ impl Erasure { // Ok(total) } - pub async fn decode( + pub async fn decode( &self, - writer: &mut DuplexStream, + writer: &mut W, readers: Vec>, offset: usize, length: usize, total_length: usize, - ) -> (usize, Option) { + ) -> (usize, Option) + where + W: AsyncWriteExt + Send + Unpin + 'static, + { if length == 0 { return (0, None); } @@ -281,14 +283,17 @@ impl Erasure { (bytes_writed, None) } - async fn write_data_blocks( + async fn write_data_blocks( &self, - writer: &mut DuplexStream, + writer: &mut W, bufs: Vec>>, data_blocks: usize, offset: usize, length: usize, - ) -> Result { + ) -> Result + where + W: AsyncWrite + Send + Unpin + 'static, + { if bufs.len() < data_blocks { return Err(Error::msg("read bufs not match data_blocks")); } diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 05b883af..66d796ce 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -217,7 +217,7 @@ async fn run_data_scanner() { globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; let mut wr = Vec::new(); cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &wr).await; + let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, wr).await; } Err(err) => { info!("ns_scanner failed: {:?}", err); @@ -268,7 +268,7 @@ async fn save_background_heal_info(store: Arc, info: &BackgroundHealInf Ok(info) => info, Err(_) => return, }; - let _ = save_config(store, &BACKGROUND_HEAL_INFO_PATH, &b).await; + let _ = save_config(store, &BACKGROUND_HEAL_INFO_PATH, b).await; } async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot_start_time: SystemTime) -> HealScanMode { diff --git a/ecstore/src/heal/data_usage.rs b/ecstore/src/heal/data_usage.rs index 5460de3c..ef569d6a 100644 --- a/ecstore/src/heal/data_usage.rs +++ b/ecstore/src/heal/data_usage.rs @@ -124,10 +124,11 @@ pub async fn store_data_usage_in_backend(mut rx: Receiver) { Some(data_usage_info) => { if let Ok(data) = serde_json::to_vec(&data_usage_info) { if attempts > 10 { - let _ = save_config(store.clone(), &format!("{}{}", *DATA_USAGE_OBJ_NAME_PATH, ".bkp"), &data).await; + let _ = + save_config(store.clone(), &format!("{}{}", *DATA_USAGE_OBJ_NAME_PATH, ".bkp"), data.clone()).await; attempts += 1; } - let _ = save_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH, &data).await; + let _ = save_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH, data).await; attempts += 1; } else { continue; diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index 2e459e2a..6b336790 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -458,9 +458,9 @@ impl DataUsageCache { let name_clone = name.clone(); tokio::spawn(async move { - let _ = save_config(store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await; + let _ = save_config(store_clone, &format!("{}{}", &name_clone, ".bkp"), buf_clone).await; }); - save_config(store, &name, &buf).await + save_config(store, &name, buf).await } pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) { diff --git a/ecstore/src/io.rs b/ecstore/src/io.rs index 7c149345..764c8834 100644 --- a/ecstore/src/io.rs +++ b/ecstore/src/io.rs @@ -1,226 +1,153 @@ -use std::io::Read; -use std::io::Write; +use futures::TryStreamExt; +use md5::Digest; +use md5::Md5; use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::fs::File; -use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf}; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio::sync::oneshot; +use tokio_util::io::ReaderStream; +use tokio_util::io::StreamReader; +use tracing::error; +use tracing::warn; -pub enum Reader { - File(File), - Buffer(VecAsyncReader), +pub type FileReader = Box; +pub type FileWriter = Box; + +pub const READ_BUFFER_SIZE: usize = 1024 * 1024; + +#[derive(Debug)] +pub struct HttpFileWriter { + wd: tokio::io::DuplexStream, + err_rx: oneshot::Receiver, } -impl AsyncRead for Reader { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - match self.get_mut() { - Reader::File(file) => Pin::new(file).poll_read(cx, buf), - Reader::Buffer(buffer) => Pin::new(buffer).poll_read(cx, buf), - } +impl HttpFileWriter { + pub fn new(url: &str, disk: &str, volume: &str, path: &str, size: usize, append: bool) -> std::io::Result { + let (rd, wd) = tokio::io::duplex(READ_BUFFER_SIZE); + + let (err_tx, err_rx) = oneshot::channel::(); + + let body = reqwest::Body::wrap_stream(ReaderStream::with_capacity(rd, READ_BUFFER_SIZE)); + + let url = url.to_owned(); + let disk = disk.to_owned(); + let volume = volume.to_owned(); + let path = path.to_owned(); + + tokio::spawn(async move { + let client = reqwest::Client::new(); + if let Err(err) = client + .put(format!( + "{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}", + url, + urlencoding::encode(&disk), + urlencoding::encode(&volume), + urlencoding::encode(&path), + append, + size + )) + .body(body) + .send() + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + { + error!("HttpFileWriter put file err: {:?}", err); + + if let Err(er) = err_tx.send(err) { + error!("HttpFileWriter tx.send err: {:?}", er); + } + } + }); + + Ok(Self { wd, err_rx }) } } -#[derive(Default)] -pub enum Writer { - #[default] - NotUse, - File(File), - Buffer(VecAsyncWriter), -} - -impl AsyncWrite for Writer { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - match self.get_mut() { - Writer::File(file) => Pin::new(file).poll_write(cx, buf), - Writer::Buffer(buff) => Pin::new(buff).poll_write(cx, buf), - Writer::NotUse => Poll::Ready(Ok(0)), +impl AsyncWrite for HttpFileWriter { + #[tracing::instrument(level = "debug", skip(self, buf))] + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + if let Ok(err) = self.as_mut().err_rx.try_recv() { + return Poll::Ready(Err(err)); } + + Pin::new(&mut self.wd).poll_write(cx, buf) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.get_mut() { - Writer::File(file) => Pin::new(file).poll_flush(cx), - Writer::Buffer(buff) => Pin::new(buff).poll_flush(cx), - Writer::NotUse => Poll::Ready(Ok(())), - } + #[tracing::instrument(level = "debug", skip(self))] + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + Pin::new(&mut self.wd).poll_flush(cx) } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.get_mut() { - Writer::File(file) => Pin::new(file).poll_shutdown(cx), - Writer::Buffer(buff) => Pin::new(buff).poll_shutdown(cx), - Writer::NotUse => Poll::Ready(Ok(())), - } + #[tracing::instrument(level = "debug", skip(self))] + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + Pin::new(&mut self.wd).poll_shutdown(cx) } } -pub struct AsyncToSync { +pub struct HttpFileReader { + inner: FileReader, +} + +impl HttpFileReader { + pub async fn new(url: &str, disk: &str, volume: &str, path: &str, offset: usize, length: usize) -> std::io::Result { + let resp = reqwest::Client::new() + .get(format!( + "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", + url, + urlencoding::encode(disk), + urlencoding::encode(volume), + urlencoding::encode(path), + offset, + length + )) + .send() + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + let inner = Box::new(StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other))); + + Ok(Self { inner }) + } +} + +impl AsyncRead for HttpFileReader { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +pub struct EtagReader { inner: R, + md5: Md5, } -impl AsyncToSync { - pub fn new_reader(inner: R) -> Self { - Self { inner } +impl EtagReader { + pub fn new(inner: R) -> Self { + EtagReader { inner, md5: Md5::new() } } - fn read_async(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let mut read_buf = ReadBuf::new(buf); - // Poll the underlying AsyncRead to fill the ReadBuf - match Pin::new(&mut self.inner).poll_read(cx, &mut read_buf) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())), - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } + + pub fn etag(self) -> String { + hex_simd::encode_to_string(self.md5.finalize(), hex_simd::AsciiCase::Lower) } } -impl AsyncToSync { - pub fn new_writer(inner: R) -> Self { - Self { inner } - } - // This function will perform a write using AsyncWrite - fn write_async(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let result = Pin::new(&mut self.inner).poll_write(cx, buf); - match result { - Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)), - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } - } +impl AsyncRead for EtagReader { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + let bytes = buf.filled(); + self.md5.update(bytes); - // This function will perform a flush using AsyncWrite - fn flush_async(&mut self, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } -} - -impl Read for AsyncToSync { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - loop { - match self.read_async(&mut cx, buf) { - Poll::Ready(Ok(n)) => return Ok(n), - Poll::Ready(Err(e)) => return Err(e), - Poll::Pending => { - // If Pending, we need to wait for the readiness. - // Here, we can use an arbitrary mechanism to yield control, - // this might be blocking until some readiness occurs can be complex. - // A full blocking implementation would require an async runtime to block on. - std::thread::sleep(std::time::Duration::from_millis(1)); // Replace with proper waiting if needed - } + Poll::Ready(Ok(())) } + other => other, } } } - -impl Write for AsyncToSync { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - loop { - match self.write_async(&mut cx, buf) { - Poll::Ready(Ok(n)) => return Ok(n), - Poll::Ready(Err(e)) => return Err(e), - Poll::Pending => { - // Here we are blocking and waiting for the async operation to complete. - std::thread::sleep(std::time::Duration::from_millis(1)); // Not efficient, see notes. - } - } - } - } - - fn flush(&mut self) -> std::io::Result<()> { - let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - loop { - match self.flush_async(&mut cx) { - Poll::Ready(Ok(())) => return Ok(()), - Poll::Ready(Err(e)) => return Err(e), - Poll::Pending => { - // Again, blocking to wait for flush. - std::thread::sleep(std::time::Duration::from_millis(1)); // Not efficient, see notes. - } - } - } - } -} - -pub struct VecAsyncWriter { - buffer: Vec, -} - -impl VecAsyncWriter { - /// Create a new VecAsyncWriter with an empty Vec. - pub fn new(buffer: Vec) -> Self { - VecAsyncWriter { buffer } - } - - /// Retrieve the underlying buffer. - pub fn get_buffer(&self) -> &[u8] { - &self.buffer - } -} - -// Implementing AsyncWrite trait for VecAsyncWriter -impl AsyncWrite for VecAsyncWriter { - fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let len = buf.len(); - - // Assume synchronous writing for simplicity - self.get_mut().buffer.extend_from_slice(buf); - - // Returning the length of written data - Poll::Ready(Ok(len)) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // In this case, flushing is a no-op for a Vec - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // Similar to flush, shutdown has no effect here - Poll::Ready(Ok(())) - } -} - -pub struct VecAsyncReader { - buffer: Vec, - position: usize, -} - -impl VecAsyncReader { - /// Create a new VecAsyncReader with the given Vec. - pub fn new(buffer: Vec) -> Self { - VecAsyncReader { buffer, position: 0 } - } - - /// Reset the reader position. - pub fn reset(&mut self) { - self.position = 0; - } -} - -// Implementing AsyncRead trait for VecAsyncReader -impl AsyncRead for VecAsyncReader { - fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf) -> Poll> { - let this = self.get_mut(); - - // Check how many bytes are available to read - let len = this.buffer.len(); - let bytes_available = len - this.position; - - if bytes_available == 0 { - // If there's no more data to read, return ready with an Eof - return Poll::Ready(Ok(())); - } - - // Calculate how much we can read into the provided buffer - let to_read = std::cmp::min(bytes_available, buf.remaining()); - - // Write the data to the buf - buf.put_slice(&this.buffer[this.position..this.position + to_read]); - - // Update the position - this.position += to_read; - - // Indicate how many bytes were read - Poll::Ready(Ok(())) - } -} diff --git a/ecstore/src/metacache/writer.rs b/ecstore/src/metacache/writer.rs index c1bc1d98..bd1b576b 100644 --- a/ecstore/src/metacache/writer.rs +++ b/ecstore/src/metacache/writer.rs @@ -350,10 +350,9 @@ impl MetacacheReader { #[tokio::test] async fn test_writer() { - use crate::io::VecAsyncReader; - use crate::io::VecAsyncWriter; + use std::io::Cursor; - let mut f = VecAsyncWriter::new(Vec::new()); + let mut f = Cursor::new(Vec::new()); let mut w = MetacacheWriter::new(&mut f); @@ -373,16 +372,16 @@ async fn test_writer() { w.close().await.unwrap(); - let data = f.get_buffer().to_vec(); + let data = f.into_inner(); - let nf = VecAsyncReader::new(data); + let nf = Cursor::new(data); let mut r = MetacacheReader::new(nf); let nobjs = r.read_all().await.unwrap(); - for info in nobjs.iter() { - println!("new {:?}", &info); - } + // for info in nobjs.iter() { + // println!("new {:?}", &info); + // } assert_eq!(objs, nobjs) } diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index 8778af5c..71f2e3f2 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -51,7 +51,7 @@ impl PeerRestClient { let eps = eps.clone(); let hosts = eps.hosts_sorted(); - let mut remote = vec![None; hosts.len()]; + let mut remote = Vec::with_capacity(hosts.len()); let mut all = vec![None; hosts.len()]; for (i, hs_host) in hosts.iter().enumerate() { if let Some(host) = hs_host { diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index d6a43f83..466098ce 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -116,7 +116,7 @@ impl PoolMeta { data.write_all(&buf)?; for pool in pools { - save_config(pool, POOL_META_NAME, &data).await?; + save_config(pool, POOL_META_NAME, data.clone()).await?; } Ok(()) diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index e91907bc..157881b9 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashMap, HashSet}, io::{Cursor, Write}, + mem::replace, path::Path, sync::Arc, time::Duration, @@ -34,6 +35,7 @@ use crate::{ }, heal_ops::BG_HEALING_UUID, }, + io::{EtagReader, READ_BUFFER_SIZE}, quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError}, store_api::{ BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec, @@ -66,6 +68,7 @@ use futures::future::join_all; use glob::Pattern; use http::HeaderMap; use lock::{ + // drwmutex::Options, drwmutex::Options, namespace_lock::{new_nslock, NsLockMap}, LockApi, @@ -76,14 +79,12 @@ use rand::{ thread_rng, {seq::SliceRandom, Rng}, }; -use reader::reader::EtagReader; -use s3s::{dto::StreamingBlob, Body}; use sha2::{Digest, Sha256}; use std::hash::Hash; use std::time::SystemTime; use time::OffsetDateTime; use tokio::{ - io::DuplexStream, + io::{empty, AsyncWrite}, sync::{broadcast, RwLock}, }; use tokio::{ @@ -1785,19 +1786,22 @@ impl SetDisks { skip( writer,disks,fi,files), fields(start_time=?time::OffsetDateTime::now_utc()) )] - async fn get_object_with_fileinfo( + async fn get_object_with_fileinfo( // &self, bucket: &str, object: &str, offset: usize, length: usize, - writer: &mut DuplexStream, + writer: &mut W, fi: FileInfo, files: Vec, disks: &[Option], set_index: usize, pool_index: usize, - ) -> Result<()> { + ) -> Result<()> + where + W: AsyncWrite + Send + Sync + Unpin + 'static, + { let (disks, files) = Self::shuffle_disks_and_parts_metadata_by_index(disks, &files, &fi); let total_size = fi.size; @@ -1854,17 +1858,6 @@ impl SetDisks { // debug!("read part_path {}", &part_path); if let Some(disk) = disk_op { - // let filereader = { - // if let Some(ref data) = files[idx].data { - // FileReader::Buffer(BufferReader::new(data.clone())) - // } else { - // let disk = disk.clone(); - // let part_path = - // format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number); - - // disk.read_file(bucket, &part_path).await? - // } - // }; let checksum_info = files[idx].erasure.get_checksum_info(part_number); let reader = new_bitrot_filereader( disk.clone(), @@ -2223,10 +2216,10 @@ impl SetDisks { let mut outdate_disks = vec![None; disk_len]; let mut disks_to_heal_count = 0; - info!( - "errs: {:?}, data_errs_by_disk: {:?}, lastest_meta: {:?}", - errs, data_errs_by_disk, lastest_meta - ); + // info!( + // "errs: {:?}, data_errs_by_disk: {:?}, lastest_meta: {:?}", + // errs, data_errs_by_disk, lastest_meta + // ); for index in 0..available_disks.len() { let (yes, reason) = should_heal_object_on_disk( &errs[index], @@ -2415,7 +2408,7 @@ impl SetDisks { if let (Some(disk), Some(metadata)) = (disk, ©_parts_metadata[index]) { // let filereader = { // if let Some(ref data) = metadata.data { - // FileReader::Buffer(BufferReader::new(data.clone())) + // Box::new(BufferReader::new(data.clone())) // } else { // let disk = disk.clone(); // let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number); @@ -3614,7 +3607,7 @@ impl ObjectIO for SetDisks { } let reader = GetObjectReader { - stream: StreamingBlob::from(Body::from(Vec::new())), + stream: Box::new(Cursor::new(Vec::new())), object_info, }; return Ok(reader); @@ -3622,10 +3615,9 @@ impl ObjectIO for SetDisks { // TODO: remote - let (rd, mut wd) = tokio::io::duplex(fi.erasure.block_size); + let (rd, wd) = tokio::io::duplex(READ_BUFFER_SIZE); - let (reader, offset, length) = - GetObjectReader::new(StreamingBlob::wrap(tokio_util::io::ReaderStream::new(rd)), range, &object_info, opts, &h)?; + let (reader, offset, length) = GetObjectReader::new(Box::new(rd), range, &object_info, opts, &h)?; // let disks = disks.clone(); let bucket = bucket.to_owned(); @@ -3634,12 +3626,23 @@ impl ObjectIO for SetDisks { let pool_index = self.pool_index; tokio::spawn(async move { if let Err(e) = Self::get_object_with_fileinfo( - &bucket, &object, offset, length, &mut wd, fi, files, &disks, set_index, pool_index, + &bucket, + &object, + offset, + length, + &mut Box::new(wd), + fi, + files, + &disks, + set_index, + pool_index, ) .await { error!("get_object_with_fileinfo err {:?}", e); }; + + // error!("get_object_with_fileinfo end"); }); Ok(reader) @@ -3769,8 +3772,10 @@ impl ObjectIO for SetDisks { } } + let stream = replace(&mut data.stream, Box::new(empty())); + let mut etag_stream = EtagReader::new(stream); + // TODO: etag from header - let mut etag_stream = EtagReader::new(&mut data.stream, None, None); let w_size = erasure .encode(&mut etag_stream, &mut writers, data.content_length, write_quorum) @@ -4356,7 +4361,8 @@ impl StorageAPI for SetDisks { let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); - let mut etag_stream = EtagReader::new(&mut data.stream, None, None); + let stream = replace(&mut data.stream, Box::new(empty())); + let mut etag_stream = EtagReader::new(stream); let w_size = erasure .encode(&mut etag_stream, &mut writers, data.content_length, write_quorum) @@ -4841,25 +4847,28 @@ impl StorageAPI for SetDisks { } } + // TODO: 优化 cleanupMultipartPath for p in curr_fi.parts.iter() { - self.remove_part_meta( - bucket, - object, - upload_id, - curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), - p.number, - ) - .await?; - - if !fi.parts.iter().any(|v| v.number == p.number) { - self.remove_object_part( + let _ = self + .remove_part_meta( bucket, object, upload_id, curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), p.number, ) - .await?; + .await; + + if !fi.parts.iter().any(|v| v.number == p.number) { + let _ = self + .remove_object_part( + bucket, + object, + upload_id, + curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), + p.number, + ) + .await; } } @@ -5235,7 +5244,7 @@ async fn disks_with_all_parts( } } } - info!("meta_errs: {:?}, errs: {:?}", meta_errs, errs); + // info!("meta_errs: {:?}, errs: {:?}", meta_errs, errs); meta_errs.iter().enumerate().for_each(|(index, err)| { if err.is_some() { let part_err = conv_part_err_to_int(err); @@ -5245,7 +5254,7 @@ async fn disks_with_all_parts( } }); - info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); + // info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); for (index, disk) in online_disks.iter().enumerate() { if meta_errs[index].is_some() { continue; @@ -5332,7 +5341,7 @@ async fn disks_with_all_parts( } } } - info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); + // info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); for (part, disks) in data_errs_by_part.iter() { for (idx, disk) in disks.iter().enumerate() { if let Some(vec) = data_errs_by_disk.get_mut(&idx) { @@ -5340,7 +5349,7 @@ async fn disks_with_all_parts( } } } - info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); + // info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); for (i, disk) in online_disks.iter().enumerate() { if meta_errs[i].is_none() && disk.is_some() && !has_part_err(&data_errs_by_disk[&i]) { available_disks[i] = Some(disk.clone().unwrap()); diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index e9923856..149c0e62 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -138,7 +138,7 @@ impl Sets { if let Some(_disk_id) = has_disk_id { set_drive.push(disk); } else { - warn!("sets new set_drive {}-{} get_disk_id is none", i, j); + error!("sets new set_drive {}-{} get_disk_id is none", i, j); set_drive.push(None); } } @@ -207,7 +207,7 @@ impl Sets { }, _ = cloned_token.cancelled() => { - warn!("ctx cancelled"); + warn!("monitor_and_connect_endpoints ctx cancelled"); break; } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 091ee1e8..69299616 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,4 +1,5 @@ use crate::heal::heal_ops::HealSequence; +use crate::io::FileReader; use crate::store_utils::clean_metadata; use crate::{ disk::DiskStore, @@ -7,15 +8,16 @@ use crate::{ utils::path::decode_dir_object, xhttp, }; -use futures::StreamExt; use http::{HeaderMap, HeaderValue}; use madmin::heal_commands::HealResultItem; use rmp_serde::Serializer; -use s3s::{dto::StreamingBlob, Body}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::fmt::Debug; +use std::io::Cursor; use std::sync::Arc; use time::OffsetDateTime; +use tokio::io::AsyncReadExt; use uuid::Uuid; pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; @@ -416,35 +418,42 @@ pub struct DeleteBucketOptions { pub srdelete_op: SRBucketDeleteOp, } -#[derive(Debug)] pub struct PutObjReader { - pub stream: StreamingBlob, + pub stream: FileReader, pub content_length: usize, } +impl Debug for PutObjReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PutObjReader") + .field("content_length", &self.content_length) + .finish() + } +} + impl PutObjReader { - pub fn new(stream: StreamingBlob, content_length: usize) -> Self { + pub fn new(stream: FileReader, content_length: usize) -> Self { PutObjReader { stream, content_length } } pub fn from_vec(data: Vec) -> Self { let content_length = data.len(); PutObjReader { - stream: Body::from(data).into(), + stream: Box::new(Cursor::new(data)), content_length, } } } pub struct GetObjectReader { - pub stream: StreamingBlob, + pub stream: FileReader, pub object_info: ObjectInfo, } impl GetObjectReader { #[tracing::instrument(level = "debug", skip(reader))] pub fn new( - reader: StreamingBlob, + reader: FileReader, rs: Option, oi: &ObjectInfo, opts: &ObjectOptions, @@ -482,14 +491,15 @@ impl GetObjectReader { } pub async fn read_all(&mut self) -> Result> { let mut data = Vec::new(); + self.stream.read_to_end(&mut data).await?; - while let Some(x) = self.stream.next().await { - let buf = match x { - Ok(res) => res, - Err(e) => return Err(Error::msg(e.to_string())), - }; - data.extend_from_slice(buf.as_ref()); - } + // while let Some(x) = self.stream.next().await { + // let buf = match x { + // Ok(res) => res, + // Err(e) => return Err(Error::msg(e.to_string())), + // }; + // data.extend_from_slice(buf.as_ref()); + // } Ok(data) } diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index 8b11ae15..40c9e97a 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -370,7 +370,7 @@ impl Store for ObjectStore { let mut data = serde_json::to_vec(&item)?; data = Self::encrypt_data(&data)?; - save_config(self.object_api.clone(), path.as_ref(), &data).await + save_config(self.object_api.clone(), path.as_ref(), data).await } async fn delete_iam_config(&self, path: impl AsRef + Send) -> Result<()> { delete_config(self.object_api.clone(), path.as_ref()).await diff --git a/reader/Cargo.toml b/reader/Cargo.toml deleted file mode 100644 index 2e171e94..00000000 --- a/reader/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "reader" -edition.workspace = true -license.workspace = true -repository.workspace = true -rust-version.workspace = true -version.workspace = true - -[lints] -workspace = true - -[dependencies] -tracing.workspace = true -s3s.workspace = true -thiserror.workspace = true -bytes.workspace = true -pin-project-lite.workspace = true -hex-simd = "0.8.0" -md-5.workspace = true -sha2 = { version = "0.11.0-pre.4" } -futures.workspace = true - -[dev-dependencies] -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/reader/src/error.rs b/reader/src/error.rs deleted file mode 100644 index 9c5017ee..00000000 --- a/reader/src/error.rs +++ /dev/null @@ -1,12 +0,0 @@ -#[derive(Debug, thiserror::Error, PartialEq, Eq)] -pub enum ReaderError { - #[error("stream input error {0}")] - StreamInput(String), - // - #[error("etag: expected ETag {0} does not match computed ETag {1}")] - VerifyError(String, String), - #[error("Bad checksum: Want {0} does not match calculated {1}")] - ChecksumMismatch(String, String), - #[error("Bad sha256: Expected {0} does not match calculated {1}")] - SHA256Mismatch(String, String), -} diff --git a/reader/src/hasher.rs b/reader/src/hasher.rs deleted file mode 100644 index 37f28509..00000000 --- a/reader/src/hasher.rs +++ /dev/null @@ -1,170 +0,0 @@ -use md5::{Digest as Md5Digest, Md5}; -use sha2::{ - digest::{Reset, Update}, - Digest, Sha256 as sha_sha256, -}; -pub trait Hasher { - fn write(&mut self, bytes: &[u8]); - fn reset(&mut self); - fn sum(&mut self) -> String; - fn size(&self) -> usize; - fn block_size(&self) -> usize; -} - -#[derive(Default)] -pub enum HashType { - #[default] - Undefined, - Uuid(Uuid), - Md5(MD5), - Sha256(Sha256), -} - -impl Hasher for HashType { - fn write(&mut self, bytes: &[u8]) { - match self { - HashType::Md5(md5) => md5.write(bytes), - HashType::Sha256(sha256) => sha256.write(bytes), - HashType::Uuid(uuid) => uuid.write(bytes), - HashType::Undefined => (), - } - } - - fn reset(&mut self) { - match self { - HashType::Md5(md5) => md5.reset(), - HashType::Sha256(sha256) => sha256.reset(), - HashType::Uuid(uuid) => uuid.reset(), - HashType::Undefined => (), - } - } - - fn sum(&mut self) -> String { - match self { - HashType::Md5(md5) => md5.sum(), - HashType::Sha256(sha256) => sha256.sum(), - HashType::Uuid(uuid) => uuid.sum(), - HashType::Undefined => "".to_owned(), - } - } - - fn size(&self) -> usize { - match self { - HashType::Md5(md5) => md5.size(), - HashType::Sha256(sha256) => sha256.size(), - HashType::Uuid(uuid) => uuid.size(), - HashType::Undefined => 0, - } - } - - fn block_size(&self) -> usize { - match self { - HashType::Md5(md5) => md5.block_size(), - HashType::Sha256(sha256) => sha256.block_size(), - HashType::Uuid(uuid) => uuid.block_size(), - HashType::Undefined => 64, - } - } -} - -pub struct Sha256 { - hasher: sha_sha256, -} - -impl Sha256 { - pub fn new() -> Self { - Self { - hasher: sha_sha256::new(), - } - } -} -impl Default for Sha256 { - fn default() -> Self { - Self::new() - } -} - -impl Hasher for Sha256 { - fn write(&mut self, bytes: &[u8]) { - Update::update(&mut self.hasher, bytes); - } - - fn reset(&mut self) { - Reset::reset(&mut self.hasher); - } - - fn sum(&mut self) -> String { - hex_simd::encode_to_string(self.hasher.clone().finalize(), hex_simd::AsciiCase::Lower) - } - - fn size(&self) -> usize { - 32 - } - - fn block_size(&self) -> usize { - 64 - } -} - -pub struct MD5 { - hasher: Md5, -} - -impl MD5 { - pub fn new() -> Self { - Self { hasher: Md5::new() } - } -} -impl Default for MD5 { - fn default() -> Self { - Self::new() - } -} - -impl Hasher for MD5 { - fn write(&mut self, bytes: &[u8]) { - self.hasher.update(bytes); - } - - fn reset(&mut self) {} - - fn sum(&mut self) -> String { - hex_simd::encode_to_string(self.hasher.clone().finalize(), hex_simd::AsciiCase::Lower) - } - - fn size(&self) -> usize { - 32 - } - - fn block_size(&self) -> usize { - 64 - } -} - -pub struct Uuid { - id: String, -} - -impl Uuid { - pub fn new(id: String) -> Self { - Self { id } - } -} - -impl Hasher for Uuid { - fn write(&mut self, _bytes: &[u8]) {} - - fn reset(&mut self) {} - - fn sum(&mut self) -> String { - self.id.clone() - } - - fn size(&self) -> usize { - self.id.len() - } - - fn block_size(&self) -> usize { - 64 - } -} diff --git a/reader/src/lib.rs b/reader/src/lib.rs deleted file mode 100644 index 433caaa2..00000000 --- a/reader/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod error; -pub mod hasher; -pub mod reader; - -pub fn hex(data: impl AsRef<[u8]>) -> String { - hex_simd::encode_to_string(data, hex_simd::AsciiCase::Lower) -} diff --git a/reader/src/reader.rs b/reader/src/reader.rs deleted file mode 100644 index 1758036c..00000000 --- a/reader/src/reader.rs +++ /dev/null @@ -1,493 +0,0 @@ -use bytes::Bytes; -use s3s::StdError; -use std::collections::VecDeque; - -use std::pin::Pin; -use std::task::Poll; - -use crate::{ - error::ReaderError, - hasher::{HashType, Uuid}, -}; - -// use futures::stream::Stream; -use super::hasher::{Hasher, Sha256, MD5}; -use futures::Stream; - -pin_project_lite::pin_project! { - #[derive(Default)] - pub struct EtagReader { - #[pin] - inner: S, - md5: HashType, - checksum:Option, - bytes_read:usize, - } -} - -impl EtagReader { - pub fn new(inner: S, etag: Option, force_md5: Option) -> Self { - let md5 = { - if let Some(m) = force_md5 { - HashType::Uuid(Uuid::new(m)) - } else { - HashType::Md5(MD5::new()) - } - }; - Self { - inner, - md5, - checksum: etag, - bytes_read: 0, - } - } - - pub fn etag(&mut self) -> String { - self.md5.sum() - } -} - -impl Stream for EtagReader -where - S: Stream>, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let this = self.project(); - let poll = this.inner.poll_next(cx); - - if let Poll::Ready(ref res) = poll { - match res { - Some(Ok(bytes)) => { - *this.bytes_read += bytes.len(); - this.md5.write(bytes); - } - Some(Err(err)) => { - return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string()))))); - } - None => { - if let Some(etag) = this.checksum { - let got = this.md5.sum(); - if got.as_str() != etag.as_str() { - return Poll::Ready(Some(Err(Box::new(ReaderError::VerifyError(etag.to_owned(), got))))); - } - } - } - } - } - - poll - } -} - -pin_project_lite::pin_project! { - #[derive(Default)] - pub struct HashReader { - #[pin] - inner: S, - sha256: Option, - md5: Option, - md5_hex:Option, - sha256_hex:Option, - size:usize, - actual_size: usize, - bytes_read:usize, - } -} - -impl HashReader { - pub fn new(inner: S, size: usize, md5_hex: Option, sha256_hex: Option, actual_size: usize) -> Self { - let md5 = { - if md5_hex.is_some() { - Some(MD5::new()) - } else { - None - } - }; - let sha256 = { - if sha256_hex.is_some() { - Some(Sha256::new()) - } else { - None - } - }; - Self { - inner, - size, - actual_size, - md5_hex, - sha256_hex, - bytes_read: 0, - md5, - sha256, - } - } -} - -impl Stream for HashReader -where - S: Stream>, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let this = self.project(); - let poll = this.inner.poll_next(cx); - - if let Poll::Ready(ref res) = poll { - match res { - Some(Ok(bytes)) => { - *this.bytes_read += bytes.len(); - if let Some(sha) = this.sha256 { - sha.write(bytes); - } - - if let Some(md5) = this.md5 { - md5.write(bytes); - } - } - Some(Err(err)) => { - return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string()))))); - } - None => { - if let Some(hash) = this.sha256 { - if let Some(hex) = this.sha256_hex { - let got = hash.sum(); - let src = hex.as_str(); - if src != got.as_str() { - println!("sha256 err src:{},got:{}", src, got); - return Poll::Ready(Some(Err(Box::new(ReaderError::SHA256Mismatch(src.to_string(), got))))); - } - } - } - - if let Some(hash) = this.md5 { - if let Some(hex) = this.md5_hex { - let got = hash.sum(); - let src = hex.as_str(); - if src != got.as_str() { - // TODO: ERR - println!("md5 err src:{},got:{}", src, got); - return Poll::Ready(Some(Err(Box::new(ReaderError::ChecksumMismatch(src.to_string(), got))))); - } - } - } - } - } - } - - // println!("poll {:?}", poll); - - poll - } -} - -pin_project_lite::pin_project! { - pub struct ChunkedStream { - #[pin] - inner: S, - chuck_size: usize, - streams: VecDeque, - remaining:Vec, - } -} - -impl ChunkedStream { - pub fn new(inner: S, chuck_size: usize) -> Self { - Self { - inner, - chuck_size, - streams: VecDeque::new(), - remaining: Vec::new(), - } - } -} - -impl Stream for ChunkedStream -where - S: Stream> + Send + Sync, - // E: std::error::Error + Send + Sync, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let (items, op_items) = self.inner.size_hint(); - let this = self.project(); - - if let Some(b) = this.streams.pop_front() { - return Poll::Ready(Some(Ok(b))); - } - - let poll = this.inner.poll_next(cx); - - match poll { - Poll::Ready(res_op) => match res_op { - Some(res) => match res { - Ok(bytes) => { - let chuck_size = *this.chuck_size; - let mut bytes = bytes; - - // println!("get len {}", bytes.len()); - // 如果有剩余 - if !this.remaining.is_empty() { - let need_size = chuck_size - this.remaining.len(); - // 传入的数据大小需要补齐的大小,使用传入数据补齐 - if bytes.len() >= need_size { - let add_bytes = bytes.split_to(need_size); - this.remaining.extend_from_slice(&add_bytes); - this.streams.push_back(Bytes::from(this.remaining.clone())); - this.remaining.clear(); - } else { - // 不够,直接追加 - let need_size = bytes.len(); - let add_bytes = bytes.split_to(need_size); - this.remaining.extend_from_slice(&add_bytes); - } - } - - loop { - if bytes.len() < chuck_size { - break; - } - let chuck = bytes.split_to(chuck_size); - this.streams.push_back(chuck); - } - - if !bytes.is_empty() { - this.remaining.extend_from_slice(&bytes); - } - - if let Some(b) = this.streams.pop_front() { - return Poll::Ready(Some(Ok(b))); - } - - if items > 0 || op_items.is_some() { - return Poll::Pending; - } - - if !this.remaining.is_empty() { - let b = this.remaining.clone(); - this.remaining.clear(); - return Poll::Ready(Some(Ok(Bytes::from(b)))); - } - Poll::Ready(None) - } - Err(err) => Poll::Ready(Some(Err(err))), - }, - None => { - // println!("get empty"); - if let Some(b) = this.streams.pop_front() { - return Poll::Ready(Some(Ok(b))); - } - if !this.remaining.is_empty() { - let b = this.remaining.clone(); - this.remaining.clear(); - return Poll::Ready(Some(Ok(Bytes::from(b)))); - } - Poll::Ready(None) - } - }, - Poll::Pending => { - // println!("get Pending"); - Poll::Pending - } - } - - // if let Poll::Ready(Some(res)) = poll { - // warn!("poll res ..."); - // match res { - // Ok(bytes) => { - // let chuck_size = *this.chuck_size; - // let mut bytes = bytes; - // if this.remaining.len() > 0 { - // let need_size = chuck_size - this.remaining.len(); - // let add_bytes = bytes.split_to(need_size); - // this.remaining.extend_from_slice(&add_bytes); - // warn!("poll push_back remaining ...1"); - // this.streams.push_back(Bytes::from(this.remaining.clone())); - // this.remaining.clear(); - // } - - // loop { - // if bytes.len() < chuck_size { - // break; - // } - // let chuck = bytes.split_to(chuck_size); - // warn!("poll push_back ...1"); - // this.streams.push_back(chuck); - // } - - // warn!("poll remaining extend_from_slice...1"); - // this.remaining.extend_from_slice(&bytes); - // } - // Err(err) => return Poll::Ready(Some(Err(err))), - // } - // } - - // if let Some(b) = this.streams.pop_front() { - // warn!("poll pop_front ..."); - // return Poll::Ready(Some(Ok(b))); - // } - - // if this.remaining.len() > 0 { - // let b = this.remaining.clone(); - // this.remaining.clear(); - - // warn!("poll remaining ...1"); - // return Poll::Ready(Some(Ok(Bytes::from(b)))); - // } - // Poll::Pending - } - - fn size_hint(&self) -> (usize, Option) { - let mut items = self.streams.len(); - if !self.remaining.is_empty() { - items += 1; - } - (items, Some(items)) - } -} - -#[cfg(test)] -mod test { - - use super::*; - use futures::StreamExt; - - #[tokio::test] - async fn test_etag_reader() { - let data1 = vec![1u8; 60]; // 65536 - let data2 = vec![0u8; 32]; // 65536 - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; - - let mut stream = futures::stream::iter(chunk_results); - - let mut hash_reader = EtagReader::new(&mut stream, None, None); - - // let chunk_size = 8; - - // let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size); - - loop { - match hash_reader.next().await { - Some(res) => match res { - Ok(bytes) => { - println!("bytes: {}, {:?}", bytes.len(), bytes); - } - Err(err) => { - println!("err:{:?}", err); - break; - } - }, - None => { - println!("next none"); - break; - } - } - } - - println!("etag:{}", hash_reader.etag()); - - // 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45 - // println!("md5: {:?}", hash_reader.hex()); - } - - #[tokio::test] - async fn test_hash_reader() { - let data1 = vec![1u8; 60]; // 65536 - let data2 = vec![0u8; 32]; // 65536 - let size = data1.len() + data2.len(); - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; - - let mut stream = futures::stream::iter(chunk_results); - - let mut hash_reader = HashReader::new( - &mut stream, - size, - Some("d94c485610a7a00a574df55e45d3cc0c".to_string()), - Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()), - 0, - ); - - // let chunk_size = 8; - - // let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size); - - loop { - match hash_reader.next().await { - Some(res) => match res { - Ok(bytes) => { - println!("bytes: {}, {:?}", bytes.len(), bytes); - } - Err(err) => { - println!("err:{:?}", err); - break; - } - }, - None => { - println!("next none"); - break; - } - } - } - - // BUG: borrow of moved value: `md5_stream` - - // 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45 - // println!("md5: {:?}", hash_reader.hex()); - } - - #[tokio::test] - async fn test_chunked_stream() { - let data1 = vec![1u8; 60]; // 65536 - let data2 = vec![0u8; 33]; // 65536 - let data3 = vec![4u8; 5]; // 65536 - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - let chunk3 = Bytes::from(data3); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2), Ok(chunk3)]; - - let mut stream = futures::stream::iter(chunk_results); - // let mut hash_reader = HashReader::new( - // &mut stream, - // size, - // Some("d94c485610a7a00a574df55e45d3cc0c".to_string()), - // Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()), - // 0, - // ); - - let chunk_size = 8; - - let mut etag_reader = EtagReader::new(&mut stream, None, None); - - let mut chunked_stream = ChunkedStream::new(&mut etag_reader, chunk_size); - - loop { - match chunked_stream.next().await { - Some(res) => match res { - Ok(bytes) => { - println!("bytes: {}, {:?}", bytes.len(), bytes); - } - Err(err) => { - println!("err:{:?}", err); - break; - } - }, - None => { - println!("next none"); - break; - } - } - } - - println!("etag:{}", etag_reader.etag()); - } -} diff --git a/reader/src/readme.md b/reader/src/readme.md deleted file mode 100644 index 516bf842..00000000 --- a/reader/src/readme.md +++ /dev/null @@ -1,5 +0,0 @@ -# 流程 - -## 写入 - -http::Body -> HashReader -> ...(other reader) -> ChuckedReader -> BitrotWriter -> FileWriter diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs index 42bbb0ac..5fc85da8 100644 --- a/rustfs/src/admin/rpc.rs +++ b/rustfs/src/admin/rpc.rs @@ -4,6 +4,7 @@ use super::router::S3Router; use crate::storage::ecfs::bytes_stream; use common::error::Result; use ecstore::disk::DiskAPI; +use ecstore::io::READ_BUFFER_SIZE; use ecstore::store::find_local_disk; use futures::TryStreamExt; use http::StatusCode; @@ -71,7 +72,10 @@ impl Operation for ReadFile { Ok(S3Response::new(( StatusCode::OK, - Body::from(StreamingBlob::wrap(bytes_stream(ReaderStream::new(file), query.length))), + Body::from(StreamingBlob::wrap(bytes_stream( + ReaderStream::with_capacity(file, READ_BUFFER_SIZE), + query.length, + ))), ))) } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 92dfbd2c..307793d2 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -152,9 +152,13 @@ async fn run(opt: config::Opt) -> Result<()> { for (i, eps) in endpoint_pools.as_ref().iter().enumerate() { info!( - "created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}, \n{:?}", - i, eps.set_count, eps.drives_per_set, eps.cmd_line, eps + "created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}", + i, eps.set_count, eps.drives_per_set, eps.cmd_line ); + + for ep in eps.endpoints.as_ref().iter() { + info!(" - {}", ep); + } } set_global_addr(&opt.address).await; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 95ef13cc..edb936dc 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -20,6 +20,7 @@ use ecstore::bucket::policy_sys::PolicySys; use ecstore::bucket::tagging::decode_tags; use ecstore::bucket::tagging::encode_tags; use ecstore::bucket::versioning_sys::BucketVersioningSys; +use ecstore::io::READ_BUFFER_SIZE; use ecstore::new_object_layer_fn; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; @@ -51,6 +52,8 @@ use s3s::S3; use s3s::{S3Request, S3Response}; use std::fmt::Debug; use std::str::FromStr; +use tokio_util::io::ReaderStream; +use tokio_util::io::StreamReader; use tracing::debug; use tracing::error; use tracing::info; @@ -464,8 +467,13 @@ impl S3 for FS { }; let last_modified = info.mod_time.map(Timestamp::from); + let body = Some(StreamingBlob::wrap(bytes_stream( + ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE), + info.size, + ))); + let output = GetObjectOutput { - body: Some(reader.stream), + body, content_length: Some(info.size as i64), last_modified, content_type, @@ -799,6 +807,10 @@ impl S3 for FS { } }; + let body = Box::new(StreamReader::new( + body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), + )); + let mut reader = PutObjReader::new(body, content_length as usize); let Some(store) = new_object_layer_fn() else { @@ -911,6 +923,10 @@ impl S3 for FS { } }; + let body = Box::new(StreamReader::new( + body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), + )); + // mc cp step 4 let mut data = PutObjReader::new(body, content_length as usize); let opts = ObjectOptions::default();