opt network io

This commit is contained in:
weisd
2025-03-14 00:35:27 +08:00
parent 17d7c869ac
commit 01cf4c663d
37 changed files with 420 additions and 1692 deletions

View File

@@ -1,2 +1,4 @@
[target.x86_64-unknown-linux-gnu]
rustflags = ["-Clink-arg=-fuse-ld=lld"]
rustflags = [
"-C", "link-arg=-fuse-ld=bfd"
]

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
.vscode
/test
/logs
/data
.devcontainer
rustfs/static/*
vendor

17
Cargo.lock generated
View File

@@ -1921,7 +1921,6 @@ dependencies = [
"pin-project-lite",
"protos",
"rand 0.8.5",
"reader",
"reed-solomon-erasure",
"regex",
"reqwest",
@@ -4981,22 +4980,6 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
[[package]]
name = "reader"
version = "0.0.1"
dependencies = [
"bytes",
"futures",
"hex-simd",
"md-5",
"pin-project-lite",
"s3s",
"sha2 0.11.0-pre.4",
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"

View File

@@ -7,7 +7,6 @@ members = [
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"reader", # Object reading service
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
@@ -43,7 +42,6 @@ flatbuffers = "24.12.23"
futures = "0.3.31"
futures-util = "0.3.31"
common = { path = "./common/common" }
reader = { path = "./reader" }
hex = "0.4.3"
hyper = "1.6.0"
hyper-util = { version = "0.1.10", features = [

17
Dockerfile Normal file
View File

@@ -0,0 +1,17 @@
FROM alpine:latest
# RUN apk add --no-cache <package-name>
WORKDIR /app
RUN mkdir -p /data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
COPY ./target/x86_64-unknown-linux-musl/release/rustfs /app/rustfs
RUN chmod +x /app/rustfs
EXPOSE 9000
EXPOSE 9001
CMD ["/app/rustfs"]

View File

@@ -1,74 +1,82 @@
version: '3.8'
services:
node1:
node0:
image: rustfs:v1 # 替换为你的镜像名称和标签
container_name: node1
container_name: node0
hostname: node0
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9001:9000" # 映射宿主机的 9001 端口到容器的 9000 端口
- "9000:9000" # 映射宿主机的 9001 端口到容器的 9000 端口
- "8000:9001" # 映射宿主机的 9001 端口到容器的 9000 端口
volumes:
- ..:/root/data # 将当前路径挂载到容器内的 /root/data
command: "/root/rustfs"
networks:
- my_network
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node0:/data # 将当前路径挂载到容器内的 /root/data
command: "/app/rustfs"
node1:
image: rustfs:v1
container_name: node1
hostname: node1
environment:
- RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9001:9000" # 映射宿主机的 9002 端口到容器的 9000 端口
volumes:
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node1:/data
command: "/app/rustfs"
node2:
image: rustfs:v1
container_name: node2
hostname: node2
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9002:9000" # 映射宿主机的 9002 端口到容器的 9000 端口
- "9002:9000" # 映射宿主机的 9003 端口到容器的 9000 端口
volumes:
- ..:/root/data
command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs"
networks:
- my_network
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node2:/data
command: "/app/rustfs"
node3:
image: rustfs:v1
container_name: node3
hostname: node3
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_VOLUMES=http://node{0...3}:9000/data/rustfs{0...3}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9003:9000" # 映射宿主机的 9003 端口到容器的 9000 端口
- "9003:9000" # 映射宿主机的 9004 端口到容器的 9000 端口
volumes:
- ..:/root/data
command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs"
networks:
- my_network
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node3:/data
command: "/app/rustfs"
node4:
image: rustfs:v1
container_name: node4
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9004:9000" # 映射宿主机的 9004 端口到容器的 9000 端口
volumes:
- ..:/root/data
command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs"
networks:
- my_network
networks:
my_network:
driver: bridge
2025-03-14T05:23:15.661154Z INFO ecstore::disk::os: reliable_rename rm dst failed. src_file_path: "/data/rustfs1/.rustfs.sys/tmp/c7fabb9c-48c8-4827-b5e2-13271c3867c3x1741929793/part.38", dst_file_path: "/data/rustfs1/.rustfs.sys/multipart/494d877741f5e87d5160dc4e1bd4fbdacda64559ea0b7d16cdbeed61f252b98f/a83dc20f-e73a-46d0-a02b-11b330ba6e7ex1741929773056730169/641d3efd-cca0-418e-983b-ca2d47652900/part.38", base_dir: "/data/rustfs1/.rustfs.sys/multipart", err: Os { code: 2, kind: NotFound, message: "No such file or directory" }
at ecstore/src/disk/os.rs:144
2025-03-14T05:23:15.953116Z INFO ecstore::disk::os: reliable_rename rm dst failed. src_file_path: "/data/rustfs3/.rustfs.sys/tmp/e712821f-bc3f-4ffe-8a0c-0daa379d00d4x1741929793/part.39", dst_file_path: "/data/rustfs3/.rustfs.sys/multipart/494d877741f5e87d5160dc4e1bd4fbdacda64559ea0b7d16cdbeed61f252b98f/a83dc20f-e73a-46d0-a02b-11b330ba6e7ex1741929773056730169/641d3efd-cca0-418e-983b-ca2d47652900/part.39", base_dir: "/data/rustfs3/.rustfs.sys/multipart", err: Os { code: 2, kind: NotFound, message: "No such file or directory" }
at ecstore/src/disk/os.rs:144
2025-03-14T05:23:15.953218Z INFO ecstore::disk::os: reliable_rename rm dst failed. src_file_path: "/data/rustfs2/.rustfs.sys/tmp/e712821f-bc3f-4ffe-8a0c-0daa379d00d4x1741929793/part.39", dst_file_path: "/data/rustfs2/.rustfs.sys/multipart/494d877741f5e87d5160dc4e1bd4fbdacda64559ea0b7d16cdbeed61f252b98f/a83dc20f-e73a-46d0-a02b-11b330ba6e7ex1741929773056730169/641d3efd-cca0-418e-983b-ca2d47652900/part.39", base_dir: "/data/rustfs2/.rustfs.sys/multipart", err: Os { code: 2, kind: NotFound, message: "No such file or directory" }
at ecstore/src/disk/os.rs:144

View File

@@ -17,7 +17,6 @@ blake2 = "0.10.6"
bytes.workspace = true
common.workspace = true
chrono.workspace = true
reader.workspace = true
glob = "0.3.2"
thiserror.workspace = true
flatbuffers.workspace = true

View File

@@ -1,7 +1,8 @@
use crate::{
disk::{error::DiskError, Disk, DiskAPI, FileReader, FileWriter},
disk::{error::DiskError, Disk, DiskAPI},
erasure::{ReadAt, Writer},
error::{Error, Result},
io::{FileReader, FileWriter},
store_api::BitrotAlgorithm,
};
use blake2::Blake2b512;

View File

@@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use time::OffsetDateTime;
use tracing::{error, info};
use tracing::error;
use crate::config::common::{read_config, save_config};
use crate::error::{Error, Result};
@@ -311,7 +311,7 @@ impl BucketMetadata {
buf.extend_from_slice(&data);
save_config(store, self.save_file_path().as_str(), &buf).await?;
save_config(store, self.save_file_path().as_str(), buf).await?;
Ok(())
}
@@ -367,7 +367,7 @@ pub async fn load_bucket_metadata_parse(api: Arc<ECStore>, bucket: &str, parse:
return Err(err);
}
info!("bucketmeta {} not found with err {:?}, start to init ", bucket, &err);
// info!("bucketmeta {} not found with err {:?}, start to init ", bucket, &err);
BucketMetadata::new(bucket)
}

View File

@@ -164,7 +164,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
let entry = match r.peek().await {
Ok(res) => {
if let Some(entry) = res {
info!("read entry disk: {}, name: {}", i, entry.name);
// info!("read entry disk: {}, name: {}", i, entry.name);
entry
} else {
// eof

View File

@@ -1,6 +1,3 @@
use std::collections::HashSet;
use std::sync::Arc;
use super::error::{is_err_config_not_found, ConfigError};
use super::{storageclass, Config, GLOBAL_StorageClass, KVS};
use crate::disk::RUSTFS_META_BUCKET;
@@ -10,8 +7,9 @@ use crate::store_err::is_err_object_not_found;
use crate::utils::path::SLASH_SEPARATOR;
use http::HeaderMap;
use lazy_static::lazy_static;
use s3s::dto::StreamingBlob;
use s3s::Body;
use std::collections::HashSet;
use std::io::Cursor;
use std::sync::Arc;
use tracing::{error, warn};
pub const CONFIG_PREFIX: &str = "config";
@@ -59,7 +57,7 @@ pub async fn read_config_with_metadata<S: StorageAPI>(
Ok((data, rd.object_info))
}
pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: &[u8]) -> Result<()> {
pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>) -> Result<()> {
save_config_with_opts(
api,
file,
@@ -96,14 +94,10 @@ pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()>
}
}
async fn save_config_with_opts<S: StorageAPI>(api: Arc<S>, file: &str, data: &[u8], opts: &ObjectOptions) -> Result<()> {
async fn save_config_with_opts<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>, opts: &ObjectOptions) -> Result<()> {
let size = data.len();
let _ = api
.put_object(
RUSTFS_META_BUCKET,
file,
&mut PutObjReader::new(StreamingBlob::from(Body::from(data.to_vec())), data.len()),
opts,
)
.put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::new(Box::new(Cursor::new(data)), size), opts)
.await?;
Ok(())
}
@@ -174,7 +168,7 @@ async fn save_server_config<S: StorageAPI>(api: Arc<S>, cfg: &Config) -> Result<
let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR, CONFIG_FILE);
save_config(api, &config_file, data.as_slice()).await
save_config(api, &config_file, data).await
}
pub async fn lookup_configs<S: StorageAPI>(cfg: &mut Config, api: Arc<S>) {

View File

@@ -1,121 +0,0 @@
use crate::error::Result;
use futures::TryStreamExt;
use std::pin::Pin;
use std::task::Poll;
use tokio::io::AsyncWrite;
use tokio::sync::oneshot;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::error;
use tracing::warn;
use super::FileReader;
#[derive(Debug)]
pub struct HttpFileWriter {
wd: tokio::io::WriteHalf<tokio::io::SimplexStream>,
err_rx: oneshot::Receiver<std::io::Error>,
}
impl HttpFileWriter {
pub fn new(url: &str, disk: &str, volume: &str, path: &str, size: usize, append: bool) -> Result<Self> {
let (rd, wd) = tokio::io::simplex(4096);
let (err_tx, err_rx) = oneshot::channel::<std::io::Error>();
let body = reqwest::Body::wrap_stream(ReaderStream::new(rd));
let url = url.to_owned();
let disk = disk.to_owned();
let volume = volume.to_owned();
let path = path.to_owned();
tokio::spawn(async move {
let client = reqwest::Client::new();
if let Err(err) = client
.put(format!(
"{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}",
url,
urlencoding::encode(&disk),
urlencoding::encode(&volume),
urlencoding::encode(&path),
append,
size
))
.body(body)
.send()
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
{
error!("HttpFileWriter put file err: {:?}", err);
if let Err(er) = err_tx.send(err) {
error!("HttpFileWriter tx.send err: {:?}", er);
}
// return;
}
// error!("http write done {}", path);
});
Ok(Self {
wd,
err_rx,
// client: reqwest::Client::new(),
// url: url.to_string(),
// disk: disk.to_string(),
// volume: volume.to_string(),
})
}
}
impl AsyncWrite for HttpFileWriter {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
if let Ok(err) = self.as_mut().err_rx.try_recv() {
return Poll::Ready(Err(err));
}
Pin::new(&mut self.wd).poll_write(cx, buf)
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.wd).poll_flush(cx)
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.wd).poll_shutdown(cx)
}
}
pub async fn new_http_reader(
url: &str,
disk: &str,
volume: &str,
path: &str,
offset: usize,
length: usize,
) -> Result<FileReader> {
let resp = reqwest::Client::new()
.get(format!(
"{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
url,
urlencoding::encode(disk),
urlencoding::encode(volume),
urlencoding::encode(path),
offset,
length
))
.send()
.await?;
let inner = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other));
Ok(Box::new(inner))
}

View File

@@ -5,9 +5,9 @@ use super::error::{
use super::os::{is_root_disk, rename_all};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
os, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics, FileInfoVersions,
FileReader, FileWriter, Info, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp,
UpdateMetadataOpts, VolumeInfo, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET, STORAGE_FORMAT_FILE_BACKUP,
os, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics, FileInfoVersions, Info,
MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo,
WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET, STORAGE_FORMAT_FILE_BACKUP,
};
use crate::bitrot::bitrot_verify;
use crate::bucket::metadata_sys::{self};
@@ -27,6 +27,7 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE};
use crate::heal::heal_commands::{HealScanMode, HealingTracker};
use crate::heal::heal_ops::HEALING_TRACKER_FILENAME;
use crate::io::{FileReader, FileWriter};
use crate::metacache::writer::MetacacheWriter;
use crate::new_object_layer_fn;
use crate::set_disk::{
@@ -326,7 +327,7 @@ impl LocalDisk {
}
}
// FIXME: 先清空回收站吧,有时间再添加判断逻辑
// TODO: 优化 FIXME: 先清空回收站吧,有时间再添加判断逻辑
if let Err(err) = {
if trash_path.is_dir() {
@@ -1523,7 +1524,7 @@ impl DiskAPI for LocalDisk {
// TODO: io verifier
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
warn!("disk read_file: volume: {}, path: {}", volume, path);
// warn!("disk read_file: volume: {}, path: {}", volume, path);
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
if let Err(e) = utils::fs::access(&volume_dir).await {
@@ -1557,10 +1558,10 @@ impl DiskAPI for LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<FileReader> {
warn!(
"disk read_file_stream: volume: {}, path: {}, offset: {}, length: {}",
volume, path, offset, length
);
// warn!(
// "disk read_file_stream: volume: {}, path: {}, offset: {}, length: {}",
// volume, path, offset, length
// );
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
@@ -1748,7 +1749,7 @@ impl DiskAPI for LocalDisk {
return Err(os_err_to_file_err(e));
}
info!("read xl.meta failed, dst_file_path: {:?}, err: {:?}", dst_file_path, e);
// info!("read xl.meta failed, dst_file_path: {:?}, err: {:?}", dst_file_path, e);
None
}
};
@@ -2247,7 +2248,6 @@ impl DiskAPI for LocalDisk {
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
info!("delete_volume, volume: {}", volume);
let p = self.get_bucket_path(volume)?;
// TODO: 不能用递归删除如果目录下面有文件返回errVolumeNotEmpty

View File

@@ -1,7 +1,6 @@
pub mod endpoint;
pub mod error;
pub mod format;
pub mod io;
pub mod local;
pub mod os;
pub mod remote;
@@ -24,6 +23,7 @@ use crate::{
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::{HealScanMode, HealingTracker},
},
io::{FileReader, FileWriter},
store_api::{FileInfo, ObjectInfo, RawFileInfo},
utils::path::SLASH_SEPARATOR,
};
@@ -35,11 +35,7 @@ use remote::RemoteDisk;
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, fmt::Debug, path::PathBuf, sync::Arc};
use time::OffsetDateTime;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::Sender,
};
use tracing::info;
use tokio::{io::AsyncWrite, sync::mpsc::Sender};
use tracing::warn;
use uuid::Uuid;
@@ -328,7 +324,6 @@ impl DiskAPI for Disk {
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
info!("delete_volume, volume: {}", volume);
match self {
Disk::Local(local_disk) => local_disk.delete_volume(volume).await,
Disk::Remote(remote_disk) => remote_disk.delete_volume(volume).await,
@@ -349,7 +344,6 @@ impl DiskAPI for Disk {
scan_mode: HealScanMode,
we_sleep: ShouldSleepFn,
) -> Result<DataUsageCache> {
info!("ns_scanner");
match self {
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await,
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await,
@@ -374,9 +368,6 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskS
}
}
pub type FileReader = Box<dyn AsyncRead + Send + Sync + Unpin>;
pub type FileWriter = Box<dyn AsyncWrite + Send + Sync + Unpin>;
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn to_string(&self) -> String;
@@ -1184,20 +1175,6 @@ pub struct ReadMultipleResp {
pub mod_time: Option<OffsetDateTime>,
}
// impl Default for ReadMultipleResp {
// fn default() -> Self {
// Self {
// bucket: String::new(),
// prefix: String::new(),
// file: String::new(),
// exists: false,
// error: String::new(),
// data: Vec::new(),
// mod_time: OffsetDateTime::UNIX_EPOCH,
// }
// }
// }
#[derive(Debug, Deserialize, Serialize)]
pub struct VolumeInfo {
pub name: String,
@@ -1210,410 +1187,3 @@ pub struct ReadOptions {
pub read_data: bool,
pub healing: bool,
}
// pub struct FileWriter {
// pub inner: Pin<Box<dyn AsyncWrite + Send + Sync + 'static>>,
// }
// impl AsyncWrite for FileWriter {
// fn poll_write(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &[u8],
// ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
// Pin::new(&mut self.inner).poll_write(cx, buf)
// }
// fn poll_flush(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_flush(cx)
// }
// fn poll_shutdown(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_shutdown(cx)
// }
// }
// impl FileWriter {
// pub fn new<W>(inner: W) -> Self
// where
// W: AsyncWrite + Send + Sync + 'static,
// {
// Self { inner: Box::pin(inner) }
// }
// }
// #[derive(Debug)]
// pub struct BufferWriter {
// pub inner: Vec<u8>,
// }
// impl BufferWriter {
// pub fn new(inner: Vec<u8>) -> Self {
// Self { inner }
// }
// #[allow(clippy::should_implement_trait)]
// pub fn as_ref(&self) -> &[u8] {
// self.inner.as_ref()
// }
// }
// #[async_trait::async_trait]
// impl Writer for BufferWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let _ = self.inner.write(buf).await?;
// self.inner.flush().await?;
// Ok(())
// }
// }
// #[derive(Debug)]
// pub struct LocalFileWriter {
// pub inner: File,
// }
// impl LocalFileWriter {
// pub fn new(inner: File) -> Self {
// Self { inner }
// }
// }
// #[async_trait::async_trait]
// impl Writer for LocalFileWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let _ = self.inner.write(buf).await?;
// self.inner.flush().await?;
// Ok(())
// }
// }
// type NodeClient = NodeServiceClient<
// InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
// >;
// #[derive(Debug)]
// pub struct RemoteFileWriter {
// pub endpoint: Endpoint,
// pub volume: String,
// pub path: String,
// pub is_append: bool,
// tx: Sender<WriteRequest>,
// resp_stream: Streaming<WriteResponse>,
// }
// impl RemoteFileWriter {
// pub async fn new(endpoint: Endpoint, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result<Self> {
// let (tx, rx) = mpsc::channel(128);
// let in_stream = ReceiverStream::new(rx);
// let response = client.write_stream(in_stream).await.unwrap();
// let resp_stream = response.into_inner();
// Ok(Self {
// endpoint,
// volume,
// path,
// is_append,
// tx,
// resp_stream,
// })
// }
// }
// #[async_trait::async_trait]
// impl Writer for RemoteFileWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let request = WriteRequest {
// disk: self.endpoint.to_string(),
// volume: self.volume.to_string(),
// path: self.path.to_string(),
// is_append: self.is_append,
// data: buf.to_vec(),
// };
// self.tx.send(request).await?;
// if let Some(resp) = self.resp_stream.next().await {
// // match resp {
// // Ok(resp) => {
// // if resp.success {
// // info!("write stream success");
// // } else {
// // info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string()));
// // }
// // }
// // Err(_err) => {
// // }
// // }
// let resp = resp?;
// if resp.success {
// info!("write stream success");
// } else {
// return if let Some(err) = &resp.error {
// Err(proto_err_to_err(err))
// } else {
// Err(Error::from_string(""))
// };
// }
// } else {
// let error_info = "can not get response";
// info!("write stream failed: {}", error_info);
// return Err(Error::from_string(error_info));
// }
// Ok(())
// }
// }
// #[async_trait::async_trait]
// pub trait Reader {
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize>;
// // async fn seek(&mut self, offset: usize) -> Result<()>;
// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize>;
// }
// #[async_trait::async_trait]
// impl Reader for FileReader {
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
// match self {
// Self::Local(reader) => reader.read_at(offset, buf).await,
// Self::Remote(reader) => reader.read_at(offset, buf).await,
// Self::Buffer(reader) => reader.read_at(offset, buf).await,
// Self::Http(reader) => reader.read_at(offset, buf).await,
// }
// }
// // async fn seek(&mut self, offset: usize) -> Result<()> {
// // match self {
// // Self::Local(reader) => reader.seek(offset).await,
// // Self::Remote(reader) => reader.seek(offset).await,
// // Self::Buffer(reader) => reader.seek(offset).await,
// // }
// // }
// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
// // match self {
// // Self::Local(reader) => reader.read_exact(buf).await,
// // Self::Remote(reader) => reader.read_exact(buf).await,
// // Self::Buffer(reader) => reader.read_exact(buf).await,
// // }
// // }
// }
// #[derive(Debug)]
// pub struct BufferReader {
// pub inner: Cursor<Vec<u8>>,
// remaining: usize,
// }
// impl BufferReader {
// pub fn new(inner: Vec<u8>, offset: usize, read_length: usize) -> Self {
// let mut cur = Cursor::new(inner);
// cur.set_position(offset as u64);
// Self {
// inner: cur,
// remaining: offset + read_length,
// }
// }
// }
// impl AsyncRead for BufferReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// fn poll_read(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &mut tokio::io::ReadBuf<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// match Pin::new(&mut self.inner).poll_read(cx, buf) {
// Poll::Ready(Ok(_)) => {
// if self.inner.position() as usize >= self.remaining {
// self.remaining -= buf.filled().len();
// Poll::Ready(Ok(()))
// } else {
// Poll::Pending
// }
// }
// Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
// Poll::Pending => Poll::Pending,
// }
// }
// }
// #[async_trait::async_trait]
// impl Reader for BufferReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
// if self.pos != offset {
// self.inner.set_position(offset as u64);
// }
// self.inner.read_exact(buf).await?;
// self.pos += buf.len();
// Ok(buf.len())
// }
// // #[tracing::instrument(level = "debug", skip(self))]
// // async fn seek(&mut self, offset: usize) -> Result<()> {
// // if self.pos != offset {
// // self.inner.set_position(offset as u64);
// // }
// // Ok(())
// // }
// // #[tracing::instrument(level = "debug", skip(self))]
// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
// // let bytes_read = self.inner.read_exact(buf).await?;
// // self.pos += buf.len();
// // Ok(bytes_read)
// // }
// }
// #[derive(Debug)]
// pub struct LocalFileReader {
// pub inner: File,
// // pos: usize,
// }
// impl LocalFileReader {
// pub fn new(inner: File) -> Self {
// Self { inner }
// }
// }
// #[async_trait::async_trait]
// impl Reader for LocalFileReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
// if self.pos != offset {
// self.inner.seek(SeekFrom::Start(offset as u64)).await?;
// self.pos = offset;
// }
// self.inner.read_exact(buf).await?;
// self.pos += buf.len();
// Ok(buf.len())
// }
// // #[tracing::instrument(level = "debug", skip(self))]
// // async fn seek(&mut self, offset: usize) -> Result<()> {
// // if self.pos != offset {
// // self.inner.seek(SeekFrom::Start(offset as u64)).await?;
// // self.pos = offset;
// // }
// // Ok(())
// // }
// // #[tracing::instrument(level = "debug", skip(self, buf))]
// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
// // let bytes_read = self.inner.read_exact(buf).await?;
// // self.pos += buf.len();
// // Ok(bytes_read)
// // }
// }
// impl AsyncRead for LocalFileReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// fn poll_read(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &mut tokio::io::ReadBuf<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_read(cx, buf)
// }
// }
// #[derive(Debug)]
// pub struct RemoteFileReader {
// pub endpoint: Endpoint,
// pub volume: String,
// pub path: String,
// tx: Sender<ReadAtRequest>,
// resp_stream: Streaming<ReadAtResponse>,
// }
// impl RemoteFileReader {
// pub async fn new(endpoint: Endpoint, volume: String, path: String, mut client: NodeClient) -> Result<Self> {
// let (tx, rx) = mpsc::channel(128);
// let in_stream = ReceiverStream::new(rx);
// let response = client.read_at(in_stream).await.unwrap();
// let resp_stream = response.into_inner();
// Ok(Self {
// endpoint,
// volume,
// path,
// tx,
// resp_stream,
// })
// }
// }
// #[async_trait::async_trait]
// impl Reader for RemoteFileReader {
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
// let request = ReadAtRequest {
// disk: self.endpoint.to_string(),
// volume: self.volume.to_string(),
// path: self.path.to_string(),
// offset: offset.try_into().unwrap(),
// // length: length.try_into().unwrap(),
// length: buf.len().try_into().unwrap(),
// };
// self.tx.send(request).await?;
// if let Some(resp) = self.resp_stream.next().await {
// let resp = resp?;
// if resp.success {
// info!("read at stream success");
// buf.copy_from_slice(&resp.data);
// Ok(resp.read_size.try_into().unwrap())
// } else {
// return if let Some(err) = &resp.error {
// Err(proto_err_to_err(err))
// } else {
// Err(Error::from_string(""))
// };
// }
// } else {
// let error_info = "can not get response";
// info!("read at stream failed: {}", error_info);
// Err(Error::from_string(error_info))
// }
// }
// // async fn seek(&mut self, _offset: usize) -> Result<()> {
// // unimplemented!()
// // }
// // async fn read_exact(&mut self, _buf: &mut [u8]) -> Result<usize> {
// // unimplemented!()
// // }
// }
// impl AsyncRead for RemoteFileReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// fn poll_read(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &mut tokio::io::ReadBuf<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// unimplemented!("poll_read")
// }
// }

View File

@@ -141,13 +141,15 @@ pub async fn reliable_rename(
}
// need remove dst path
if let Err(err) = utils::fs::remove_all(dst_file_path.as_ref()).await {
info!(
"reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
err
);
if err.kind() != io::ErrorKind::NotFound {
info!(
"reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
err
);
}
}
let mut i = 0;
loop {

View File

@@ -23,8 +23,8 @@ use uuid::Uuid;
use super::{
endpoint::Endpoint, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption,
FileInfoVersions, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts,
VolumeInfo, WalkDirOptions,
FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo,
WalkDirOptions,
};
use crate::{
disk::error::DiskError,
@@ -36,11 +36,11 @@ use crate::{
},
store_api::{FileInfo, RawFileInfo},
};
use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter};
use crate::{
disk::io::{new_http_reader, HttpFileWriter},
io::{FileReader, FileWriter, HttpFileReader, HttpFileWriter},
utils::proto_err_to_err,
};
use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter};
use protos::proto_gen::node_service::RenamePartRequst;
#[derive(Debug)]
@@ -135,7 +135,7 @@ impl DiskAPI for RemoteDisk {
}
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>> {
info!("read_all");
info!("read_all {}/{}", volume, path);
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
@@ -147,8 +147,6 @@ impl DiskAPI for RemoteDisk {
let response = client.read_all(request).await?.into_inner();
info!("read_all success");
if !response.success {
return Err(Error::new(DiskError::FileNotFound));
}
@@ -182,7 +180,7 @@ impl DiskAPI for RemoteDisk {
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
info!("delete");
info!("delete {}/{}/{}", self.endpoint.to_string(), volume, path);
let options = serde_json::to_string(&opt)?;
let mut client = node_service_time_out_client(&self.addr)
.await
@@ -264,7 +262,7 @@ impl DiskAPI for RemoteDisk {
}
async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec<u8>) -> Result<()> {
info!("rename_part");
info!("rename_part {}/{}", src_volume, src_path);
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
@@ -318,7 +316,7 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, file_size: usize) -> Result<FileWriter> {
info!("create_file");
info!("create_file {}/{}/{}", self.endpoint.to_string(), volume, path);
Ok(Box::new(HttpFileWriter::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
@@ -331,7 +329,7 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
info!("append_file {}/{}", volume, path);
Ok(Box::new(HttpFileWriter::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
@@ -344,25 +342,31 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(new_http_reader(self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), volume, path, 0, 0).await?)
info!("read_file {}/{}", volume, path);
Ok(Box::new(
HttpFileReader::new(self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), volume, path, 0, 0)
.await?,
))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<FileReader> {
Ok(new_http_reader(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
offset,
length,
)
.await?)
info!("read_file_stream {}/{}/{}", self.endpoint.to_string(), volume, path);
Ok(Box::new(
HttpFileReader::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
offset,
length,
)
.await?,
))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
info!("list_dir");
info!("list_dir {}/{}", volume, _dir_path);
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
@@ -386,7 +390,8 @@ impl DiskAPI for RemoteDisk {
// FIXME: TODO: use writer
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
info!("walk_dir");
let now = std::time::SystemTime::now();
info!("walk_dir {}/{}/{:?}", self.endpoint.to_string(), opts.bucket, opts.filter_prefix);
let mut wr = wr;
let mut out = MetacacheWriter::new(&mut wr);
let mut buf = Vec::new();
@@ -415,6 +420,12 @@ impl DiskAPI for RemoteDisk {
}
}
info!(
"walk_dir {}/{:?} done {:?}",
opts.bucket,
opts.filter_prefix,
now.elapsed().unwrap_or_default()
);
Ok(())
}
@@ -426,7 +437,7 @@ impl DiskAPI for RemoteDisk {
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
info!("rename_data");
info!("rename_data {}/{}/{}/{}", self.addr, self.endpoint.to_string(), dst_volume, dst_path);
let file_info = serde_json::to_string(&fi)?;
let mut client = node_service_time_out_client(&self.addr)
.await
@@ -608,7 +619,7 @@ impl DiskAPI for RemoteDisk {
}
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
info!("write_metadata");
info!("write_metadata {}/{}", volume, path);
let file_info = serde_json::to_string(&fi)?;
let mut client = node_service_time_out_client(&self.addr)
.await
@@ -670,7 +681,7 @@ impl DiskAPI for RemoteDisk {
}
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
info!("read_xl");
info!("read_xl {}/{}/{}", self.endpoint.to_string(), volume, path);
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
@@ -779,7 +790,7 @@ impl DiskAPI for RemoteDisk {
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
info!("read_multiple");
info!("read_multiple {}/{}/{}", self.endpoint.to_string(), req.bucket, req.prefix);
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = node_service_time_out_client(&self.addr)
.await
@@ -809,7 +820,7 @@ impl DiskAPI for RemoteDisk {
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
info!("delete_volume");
info!("delete_volume {}/{}", self.endpoint.to_string(), volume);
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
@@ -832,7 +843,6 @@ impl DiskAPI for RemoteDisk {
}
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo> {
info!("delete_volume");
let opts = serde_json::to_string(&opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await

View File

@@ -1,13 +1,11 @@
use crate::bitrot::{BitrotReader, BitrotWriter};
use crate::error::{Error, Result, StdError};
use crate::error::{Error, Result};
use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs};
use bytes::Bytes;
use futures::future::join_all;
use futures::{pin_mut, Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::any::Any;
use std::io::ErrorKind;
use tokio::io::DuplexStream;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::warn;
use tracing::{error, info};
@@ -49,22 +47,22 @@ impl Erasure {
}
}
#[tracing::instrument(level = "debug", skip(self, body, writers))]
#[tracing::instrument(level = "debug", skip(self, reader, writers))]
pub async fn encode<S>(
&mut self,
body: S,
reader: &mut S,
writers: &mut [Option<BitrotWriter>],
// block_size: usize,
total_size: usize,
write_quorum: usize,
) -> Result<usize>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync,
S: AsyncRead + Unpin + Send + 'static,
{
pin_mut!(body);
let mut reader = tokio_util::io::StreamReader::new(
body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
);
// pin_mut!(body);
// let mut reader = tokio_util::io::StreamReader::new(
// body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
// );
let mut total: usize = 0;
@@ -101,6 +99,7 @@ impl Erasure {
let blocks = self.encode_data(&self.buf)?;
let mut errs = Vec::new();
// TODO: 并发写入
for (i, w_op) in writers.iter_mut().enumerate() {
if let Some(w) = w_op {
match w.write(blocks[i].as_ref()).await {
@@ -204,14 +203,17 @@ impl Erasure {
// Ok(total)
}
pub async fn decode(
pub async fn decode<W>(
&self,
writer: &mut DuplexStream,
writer: &mut W,
readers: Vec<Option<BitrotReader>>,
offset: usize,
length: usize,
total_length: usize,
) -> (usize, Option<Error>) {
) -> (usize, Option<Error>)
where
W: AsyncWriteExt + Send + Unpin + 'static,
{
if length == 0 {
return (0, None);
}
@@ -281,14 +283,17 @@ impl Erasure {
(bytes_writed, None)
}
async fn write_data_blocks(
async fn write_data_blocks<W>(
&self,
writer: &mut DuplexStream,
writer: &mut W,
bufs: Vec<Option<Vec<u8>>>,
data_blocks: usize,
offset: usize,
length: usize,
) -> Result<usize> {
) -> Result<usize>
where
W: AsyncWrite + Send + Unpin + 'static,
{
if bufs.len() < data_blocks {
return Err(Error::msg("read bufs not match data_blocks"));
}

View File

@@ -217,7 +217,7 @@ async fn run_data_scanner() {
globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await;
let mut wr = Vec::new();
cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap();
let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &wr).await;
let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, wr).await;
}
Err(err) => {
info!("ns_scanner failed: {:?}", err);
@@ -268,7 +268,7 @@ async fn save_background_heal_info(store: Arc<ECStore>, info: &BackgroundHealInf
Ok(info) => info,
Err(_) => return,
};
let _ = save_config(store, &BACKGROUND_HEAL_INFO_PATH, &b).await;
let _ = save_config(store, &BACKGROUND_HEAL_INFO_PATH, b).await;
}
async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot_start_time: SystemTime) -> HealScanMode {

View File

@@ -124,10 +124,11 @@ pub async fn store_data_usage_in_backend(mut rx: Receiver<DataUsageInfo>) {
Some(data_usage_info) => {
if let Ok(data) = serde_json::to_vec(&data_usage_info) {
if attempts > 10 {
let _ = save_config(store.clone(), &format!("{}{}", *DATA_USAGE_OBJ_NAME_PATH, ".bkp"), &data).await;
let _ =
save_config(store.clone(), &format!("{}{}", *DATA_USAGE_OBJ_NAME_PATH, ".bkp"), data.clone()).await;
attempts += 1;
}
let _ = save_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH, &data).await;
let _ = save_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH, data).await;
attempts += 1;
} else {
continue;

View File

@@ -458,9 +458,9 @@ impl DataUsageCache {
let name_clone = name.clone();
tokio::spawn(async move {
let _ = save_config(store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await;
let _ = save_config(store_clone, &format!("{}{}", &name_clone, ".bkp"), buf_clone).await;
});
save_config(store, &name, &buf).await
save_config(store, &name, buf).await
}
pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) {

View File

@@ -1,226 +1,153 @@
use std::io::Read;
use std::io::Write;
use futures::TryStreamExt;
use md5::Digest;
use md5::Md5;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
use tokio::sync::oneshot;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::error;
use tracing::warn;
pub enum Reader {
File(File),
Buffer(VecAsyncReader),
pub type FileReader = Box<dyn AsyncRead + Send + Sync + Unpin>;
pub type FileWriter = Box<dyn AsyncWrite + Send + Sync + Unpin>;
pub const READ_BUFFER_SIZE: usize = 1024 * 1024;
#[derive(Debug)]
pub struct HttpFileWriter {
wd: tokio::io::DuplexStream,
err_rx: oneshot::Receiver<std::io::Error>,
}
impl AsyncRead for Reader {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Reader::File(file) => Pin::new(file).poll_read(cx, buf),
Reader::Buffer(buffer) => Pin::new(buffer).poll_read(cx, buf),
}
impl HttpFileWriter {
pub fn new(url: &str, disk: &str, volume: &str, path: &str, size: usize, append: bool) -> std::io::Result<Self> {
let (rd, wd) = tokio::io::duplex(READ_BUFFER_SIZE);
let (err_tx, err_rx) = oneshot::channel::<std::io::Error>();
let body = reqwest::Body::wrap_stream(ReaderStream::with_capacity(rd, READ_BUFFER_SIZE));
let url = url.to_owned();
let disk = disk.to_owned();
let volume = volume.to_owned();
let path = path.to_owned();
tokio::spawn(async move {
let client = reqwest::Client::new();
if let Err(err) = client
.put(format!(
"{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}",
url,
urlencoding::encode(&disk),
urlencoding::encode(&volume),
urlencoding::encode(&path),
append,
size
))
.body(body)
.send()
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
{
error!("HttpFileWriter put file err: {:?}", err);
if let Err(er) = err_tx.send(err) {
error!("HttpFileWriter tx.send err: {:?}", er);
}
}
});
Ok(Self { wd, err_rx })
}
}
#[derive(Default)]
pub enum Writer {
#[default]
NotUse,
File(File),
Buffer(VecAsyncWriter),
}
impl AsyncWrite for Writer {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
match self.get_mut() {
Writer::File(file) => Pin::new(file).poll_write(cx, buf),
Writer::Buffer(buff) => Pin::new(buff).poll_write(cx, buf),
Writer::NotUse => Poll::Ready(Ok(0)),
impl AsyncWrite for HttpFileWriter {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
if let Ok(err) = self.as_mut().err_rx.try_recv() {
return Poll::Ready(Err(err));
}
Pin::new(&mut self.wd).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Writer::File(file) => Pin::new(file).poll_flush(cx),
Writer::Buffer(buff) => Pin::new(buff).poll_flush(cx),
Writer::NotUse => Poll::Ready(Ok(())),
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.wd).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Writer::File(file) => Pin::new(file).poll_shutdown(cx),
Writer::Buffer(buff) => Pin::new(buff).poll_shutdown(cx),
Writer::NotUse => Poll::Ready(Ok(())),
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.wd).poll_shutdown(cx)
}
}
pub struct AsyncToSync<R> {
pub struct HttpFileReader {
inner: FileReader,
}
impl HttpFileReader {
pub async fn new(url: &str, disk: &str, volume: &str, path: &str, offset: usize, length: usize) -> std::io::Result<Self> {
let resp = reqwest::Client::new()
.get(format!(
"{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
url,
urlencoding::encode(disk),
urlencoding::encode(volume),
urlencoding::encode(path),
offset,
length
))
.send()
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let inner = Box::new(StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other)));
Ok(Self { inner })
}
}
impl AsyncRead for HttpFileReader {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
pub struct EtagReader<R> {
inner: R,
md5: Md5,
}
impl<R: AsyncRead + Unpin> AsyncToSync<R> {
pub fn new_reader(inner: R) -> Self {
Self { inner }
impl<R> EtagReader<R> {
pub fn new(inner: R) -> Self {
EtagReader { inner, md5: Md5::new() }
}
fn read_async(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
let mut read_buf = ReadBuf::new(buf);
// Poll the underlying AsyncRead to fill the ReadBuf
match Pin::new(&mut self.inner).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
pub fn etag(self) -> String {
hex_simd::encode_to_string(self.md5.finalize(), hex_simd::AsciiCase::Lower)
}
}
impl<R: AsyncWrite + Unpin> AsyncToSync<R> {
pub fn new_writer(inner: R) -> Self {
Self { inner }
}
// This function will perform a write using AsyncWrite
fn write_async(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
let result = Pin::new(&mut self.inner).poll_write(cx, buf);
match result {
Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
impl<R: AsyncRead + Unpin> AsyncRead for EtagReader<R> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let bytes = buf.filled();
self.md5.update(bytes);
// This function will perform a flush using AsyncWrite
fn flush_async(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
}
impl<R: AsyncRead + Unpin> Read for AsyncToSync<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref());
loop {
match self.read_async(&mut cx, buf) {
Poll::Ready(Ok(n)) => return Ok(n),
Poll::Ready(Err(e)) => return Err(e),
Poll::Pending => {
// If Pending, we need to wait for the readiness.
// Here, we can use an arbitrary mechanism to yield control,
// this might be blocking until some readiness occurs can be complex.
// A full blocking implementation would require an async runtime to block on.
std::thread::sleep(std::time::Duration::from_millis(1)); // Replace with proper waiting if needed
}
Poll::Ready(Ok(()))
}
other => other,
}
}
}
impl<W: AsyncWrite + Unpin> Write for AsyncToSync<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref());
loop {
match self.write_async(&mut cx, buf) {
Poll::Ready(Ok(n)) => return Ok(n),
Poll::Ready(Err(e)) => return Err(e),
Poll::Pending => {
// Here we are blocking and waiting for the async operation to complete.
std::thread::sleep(std::time::Duration::from_millis(1)); // Not efficient, see notes.
}
}
}
}
fn flush(&mut self) -> std::io::Result<()> {
let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref());
loop {
match self.flush_async(&mut cx) {
Poll::Ready(Ok(())) => return Ok(()),
Poll::Ready(Err(e)) => return Err(e),
Poll::Pending => {
// Again, blocking to wait for flush.
std::thread::sleep(std::time::Duration::from_millis(1)); // Not efficient, see notes.
}
}
}
}
}
pub struct VecAsyncWriter {
buffer: Vec<u8>,
}
impl VecAsyncWriter {
/// Create a new VecAsyncWriter with an empty Vec<u8>.
pub fn new(buffer: Vec<u8>) -> Self {
VecAsyncWriter { buffer }
}
/// Retrieve the underlying buffer.
pub fn get_buffer(&self) -> &[u8] {
&self.buffer
}
}
// Implementing AsyncWrite trait for VecAsyncWriter
impl AsyncWrite for VecAsyncWriter {
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let len = buf.len();
// Assume synchronous writing for simplicity
self.get_mut().buffer.extend_from_slice(buf);
// Returning the length of written data
Poll::Ready(Ok(len))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// In this case, flushing is a no-op for a Vec<u8>
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// Similar to flush, shutdown has no effect here
Poll::Ready(Ok(()))
}
}
pub struct VecAsyncReader {
buffer: Vec<u8>,
position: usize,
}
impl VecAsyncReader {
/// Create a new VecAsyncReader with the given Vec<u8>.
pub fn new(buffer: Vec<u8>) -> Self {
VecAsyncReader { buffer, position: 0 }
}
/// Reset the reader position.
pub fn reset(&mut self) {
self.position = 0;
}
}
// Implementing AsyncRead trait for VecAsyncReader
impl AsyncRead for VecAsyncReader {
fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf) -> Poll<io::Result<()>> {
let this = self.get_mut();
// Check how many bytes are available to read
let len = this.buffer.len();
let bytes_available = len - this.position;
if bytes_available == 0 {
// If there's no more data to read, return ready with an Eof
return Poll::Ready(Ok(()));
}
// Calculate how much we can read into the provided buffer
let to_read = std::cmp::min(bytes_available, buf.remaining());
// Write the data to the buf
buf.put_slice(&this.buffer[this.position..this.position + to_read]);
// Update the position
this.position += to_read;
// Indicate how many bytes were read
Poll::Ready(Ok(()))
}
}

View File

@@ -350,10 +350,9 @@ impl<R: AsyncRead + Unpin> MetacacheReader<R> {
#[tokio::test]
async fn test_writer() {
use crate::io::VecAsyncReader;
use crate::io::VecAsyncWriter;
use std::io::Cursor;
let mut f = VecAsyncWriter::new(Vec::new());
let mut f = Cursor::new(Vec::new());
let mut w = MetacacheWriter::new(&mut f);
@@ -373,16 +372,16 @@ async fn test_writer() {
w.close().await.unwrap();
let data = f.get_buffer().to_vec();
let data = f.into_inner();
let nf = VecAsyncReader::new(data);
let nf = Cursor::new(data);
let mut r = MetacacheReader::new(nf);
let nobjs = r.read_all().await.unwrap();
for info in nobjs.iter() {
println!("new {:?}", &info);
}
// for info in nobjs.iter() {
// println!("new {:?}", &info);
// }
assert_eq!(objs, nobjs)
}

View File

@@ -51,7 +51,7 @@ impl PeerRestClient {
let eps = eps.clone();
let hosts = eps.hosts_sorted();
let mut remote = vec![None; hosts.len()];
let mut remote = Vec::with_capacity(hosts.len());
let mut all = vec![None; hosts.len()];
for (i, hs_host) in hosts.iter().enumerate() {
if let Some(host) = hs_host {

View File

@@ -116,7 +116,7 @@ impl PoolMeta {
data.write_all(&buf)?;
for pool in pools {
save_config(pool, POOL_META_NAME, &data).await?;
save_config(pool, POOL_META_NAME, data.clone()).await?;
}
Ok(())

View File

@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, HashSet},
io::{Cursor, Write},
mem::replace,
path::Path,
sync::Arc,
time::Duration,
@@ -34,6 +35,7 @@ use crate::{
},
heal_ops::BG_HEALING_UUID,
},
io::{EtagReader, READ_BUFFER_SIZE},
quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError},
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec,
@@ -66,6 +68,7 @@ use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
use lock::{
// drwmutex::Options,
drwmutex::Options,
namespace_lock::{new_nslock, NsLockMap},
LockApi,
@@ -76,14 +79,12 @@ use rand::{
thread_rng,
{seq::SliceRandom, Rng},
};
use reader::reader::EtagReader;
use s3s::{dto::StreamingBlob, Body};
use sha2::{Digest, Sha256};
use std::hash::Hash;
use std::time::SystemTime;
use time::OffsetDateTime;
use tokio::{
io::DuplexStream,
io::{empty, AsyncWrite},
sync::{broadcast, RwLock},
};
use tokio::{
@@ -1785,19 +1786,22 @@ impl SetDisks {
skip( writer,disks,fi,files),
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn get_object_with_fileinfo(
async fn get_object_with_fileinfo<W>(
// &self,
bucket: &str,
object: &str,
offset: usize,
length: usize,
writer: &mut DuplexStream,
writer: &mut W,
fi: FileInfo,
files: Vec<FileInfo>,
disks: &[Option<DiskStore>],
set_index: usize,
pool_index: usize,
) -> Result<()> {
) -> Result<()>
where
W: AsyncWrite + Send + Sync + Unpin + 'static,
{
let (disks, files) = Self::shuffle_disks_and_parts_metadata_by_index(disks, &files, &fi);
let total_size = fi.size;
@@ -1854,17 +1858,6 @@ impl SetDisks {
// debug!("read part_path {}", &part_path);
if let Some(disk) = disk_op {
// let filereader = {
// if let Some(ref data) = files[idx].data {
// FileReader::Buffer(BufferReader::new(data.clone()))
// } else {
// let disk = disk.clone();
// let part_path =
// format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or(Uuid::nil()), part_number);
// disk.read_file(bucket, &part_path).await?
// }
// };
let checksum_info = files[idx].erasure.get_checksum_info(part_number);
let reader = new_bitrot_filereader(
disk.clone(),
@@ -2223,10 +2216,10 @@ impl SetDisks {
let mut outdate_disks = vec![None; disk_len];
let mut disks_to_heal_count = 0;
info!(
"errs: {:?}, data_errs_by_disk: {:?}, lastest_meta: {:?}",
errs, data_errs_by_disk, lastest_meta
);
// info!(
// "errs: {:?}, data_errs_by_disk: {:?}, lastest_meta: {:?}",
// errs, data_errs_by_disk, lastest_meta
// );
for index in 0..available_disks.len() {
let (yes, reason) = should_heal_object_on_disk(
&errs[index],
@@ -2415,7 +2408,7 @@ impl SetDisks {
if let (Some(disk), Some(metadata)) = (disk, &copy_parts_metadata[index]) {
// let filereader = {
// if let Some(ref data) = metadata.data {
// FileReader::Buffer(BufferReader::new(data.clone()))
// Box::new(BufferReader::new(data.clone()))
// } else {
// let disk = disk.clone();
// let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number);
@@ -3614,7 +3607,7 @@ impl ObjectIO for SetDisks {
}
let reader = GetObjectReader {
stream: StreamingBlob::from(Body::from(Vec::new())),
stream: Box::new(Cursor::new(Vec::new())),
object_info,
};
return Ok(reader);
@@ -3622,10 +3615,9 @@ impl ObjectIO for SetDisks {
// TODO: remote
let (rd, mut wd) = tokio::io::duplex(fi.erasure.block_size);
let (rd, wd) = tokio::io::duplex(READ_BUFFER_SIZE);
let (reader, offset, length) =
GetObjectReader::new(StreamingBlob::wrap(tokio_util::io::ReaderStream::new(rd)), range, &object_info, opts, &h)?;
let (reader, offset, length) = GetObjectReader::new(Box::new(rd), range, &object_info, opts, &h)?;
// let disks = disks.clone();
let bucket = bucket.to_owned();
@@ -3634,12 +3626,23 @@ impl ObjectIO for SetDisks {
let pool_index = self.pool_index;
tokio::spawn(async move {
if let Err(e) = Self::get_object_with_fileinfo(
&bucket, &object, offset, length, &mut wd, fi, files, &disks, set_index, pool_index,
&bucket,
&object,
offset,
length,
&mut Box::new(wd),
fi,
files,
&disks,
set_index,
pool_index,
)
.await
{
error!("get_object_with_fileinfo err {:?}", e);
};
// error!("get_object_with_fileinfo end");
});
Ok(reader)
@@ -3769,8 +3772,10 @@ impl ObjectIO for SetDisks {
}
}
let stream = replace(&mut data.stream, Box::new(empty()));
let mut etag_stream = EtagReader::new(stream);
// TODO: etag from header
let mut etag_stream = EtagReader::new(&mut data.stream, None, None);
let w_size = erasure
.encode(&mut etag_stream, &mut writers, data.content_length, write_quorum)
@@ -4356,7 +4361,8 @@ impl StorageAPI for SetDisks {
let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let mut etag_stream = EtagReader::new(&mut data.stream, None, None);
let stream = replace(&mut data.stream, Box::new(empty()));
let mut etag_stream = EtagReader::new(stream);
let w_size = erasure
.encode(&mut etag_stream, &mut writers, data.content_length, write_quorum)
@@ -4841,25 +4847,28 @@ impl StorageAPI for SetDisks {
}
}
// TODO: 优化 cleanupMultipartPath
for p in curr_fi.parts.iter() {
self.remove_part_meta(
bucket,
object,
upload_id,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
p.number,
)
.await?;
if !fi.parts.iter().any(|v| v.number == p.number) {
self.remove_object_part(
let _ = self
.remove_part_meta(
bucket,
object,
upload_id,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
p.number,
)
.await?;
.await;
if !fi.parts.iter().any(|v| v.number == p.number) {
let _ = self
.remove_object_part(
bucket,
object,
upload_id,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
p.number,
)
.await;
}
}
@@ -5235,7 +5244,7 @@ async fn disks_with_all_parts(
}
}
}
info!("meta_errs: {:?}, errs: {:?}", meta_errs, errs);
// info!("meta_errs: {:?}, errs: {:?}", meta_errs, errs);
meta_errs.iter().enumerate().for_each(|(index, err)| {
if err.is_some() {
let part_err = conv_part_err_to_int(err);
@@ -5245,7 +5254,7 @@ async fn disks_with_all_parts(
}
});
info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
// info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
for (index, disk) in online_disks.iter().enumerate() {
if meta_errs[index].is_some() {
continue;
@@ -5332,7 +5341,7 @@ async fn disks_with_all_parts(
}
}
}
info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
// info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
for (part, disks) in data_errs_by_part.iter() {
for (idx, disk) in disks.iter().enumerate() {
if let Some(vec) = data_errs_by_disk.get_mut(&idx) {
@@ -5340,7 +5349,7 @@ async fn disks_with_all_parts(
}
}
}
info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
// info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
for (i, disk) in online_disks.iter().enumerate() {
if meta_errs[i].is_none() && disk.is_some() && !has_part_err(&data_errs_by_disk[&i]) {
available_disks[i] = Some(disk.clone().unwrap());

View File

@@ -138,7 +138,7 @@ impl Sets {
if let Some(_disk_id) = has_disk_id {
set_drive.push(disk);
} else {
warn!("sets new set_drive {}-{} get_disk_id is none", i, j);
error!("sets new set_drive {}-{} get_disk_id is none", i, j);
set_drive.push(None);
}
}
@@ -207,7 +207,7 @@ impl Sets {
},
_ = cloned_token.cancelled() => {
warn!("ctx cancelled");
warn!("monitor_and_connect_endpoints ctx cancelled");
break;
}
}

View File

@@ -1,4 +1,5 @@
use crate::heal::heal_ops::HealSequence;
use crate::io::FileReader;
use crate::store_utils::clean_metadata;
use crate::{
disk::DiskStore,
@@ -7,15 +8,16 @@ use crate::{
utils::path::decode_dir_object,
xhttp,
};
use futures::StreamExt;
use http::{HeaderMap, HeaderValue};
use madmin::heal_commands::HealResultItem;
use rmp_serde::Serializer;
use s3s::{dto::StreamingBlob, Body};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Cursor;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::io::AsyncReadExt;
use uuid::Uuid;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
@@ -416,35 +418,42 @@ pub struct DeleteBucketOptions {
pub srdelete_op: SRBucketDeleteOp,
}
#[derive(Debug)]
pub struct PutObjReader {
pub stream: StreamingBlob,
pub stream: FileReader,
pub content_length: usize,
}
impl Debug for PutObjReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PutObjReader")
.field("content_length", &self.content_length)
.finish()
}
}
impl PutObjReader {
pub fn new(stream: StreamingBlob, content_length: usize) -> Self {
pub fn new(stream: FileReader, content_length: usize) -> Self {
PutObjReader { stream, content_length }
}
pub fn from_vec(data: Vec<u8>) -> Self {
let content_length = data.len();
PutObjReader {
stream: Body::from(data).into(),
stream: Box::new(Cursor::new(data)),
content_length,
}
}
}
pub struct GetObjectReader {
pub stream: StreamingBlob,
pub stream: FileReader,
pub object_info: ObjectInfo,
}
impl GetObjectReader {
#[tracing::instrument(level = "debug", skip(reader))]
pub fn new(
reader: StreamingBlob,
reader: FileReader,
rs: Option<HTTPRangeSpec>,
oi: &ObjectInfo,
opts: &ObjectOptions,
@@ -482,14 +491,15 @@ impl GetObjectReader {
}
pub async fn read_all(&mut self) -> Result<Vec<u8>> {
let mut data = Vec::new();
self.stream.read_to_end(&mut data).await?;
while let Some(x) = self.stream.next().await {
let buf = match x {
Ok(res) => res,
Err(e) => return Err(Error::msg(e.to_string())),
};
data.extend_from_slice(buf.as_ref());
}
// while let Some(x) = self.stream.next().await {
// let buf = match x {
// Ok(res) => res,
// Err(e) => return Err(Error::msg(e.to_string())),
// };
// data.extend_from_slice(buf.as_ref());
// }
Ok(data)
}

View File

@@ -370,7 +370,7 @@ impl Store for ObjectStore {
let mut data = serde_json::to_vec(&item)?;
data = Self::encrypt_data(&data)?;
save_config(self.object_api.clone(), path.as_ref(), &data).await
save_config(self.object_api.clone(), path.as_ref(), data).await
}
async fn delete_iam_config(&self, path: impl AsRef<str> + Send) -> Result<()> {
delete_config(self.object_api.clone(), path.as_ref()).await

View File

@@ -1,24 +0,0 @@
[package]
name = "reader"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[lints]
workspace = true
[dependencies]
tracing.workspace = true
s3s.workspace = true
thiserror.workspace = true
bytes.workspace = true
pin-project-lite.workspace = true
hex-simd = "0.8.0"
md-5.workspace = true
sha2 = { version = "0.11.0-pre.4" }
futures.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@@ -1,12 +0,0 @@
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum ReaderError {
#[error("stream input error {0}")]
StreamInput(String),
//
#[error("etag: expected ETag {0} does not match computed ETag {1}")]
VerifyError(String, String),
#[error("Bad checksum: Want {0} does not match calculated {1}")]
ChecksumMismatch(String, String),
#[error("Bad sha256: Expected {0} does not match calculated {1}")]
SHA256Mismatch(String, String),
}

View File

@@ -1,170 +0,0 @@
use md5::{Digest as Md5Digest, Md5};
use sha2::{
digest::{Reset, Update},
Digest, Sha256 as sha_sha256,
};
pub trait Hasher {
fn write(&mut self, bytes: &[u8]);
fn reset(&mut self);
fn sum(&mut self) -> String;
fn size(&self) -> usize;
fn block_size(&self) -> usize;
}
#[derive(Default)]
pub enum HashType {
#[default]
Undefined,
Uuid(Uuid),
Md5(MD5),
Sha256(Sha256),
}
impl Hasher for HashType {
fn write(&mut self, bytes: &[u8]) {
match self {
HashType::Md5(md5) => md5.write(bytes),
HashType::Sha256(sha256) => sha256.write(bytes),
HashType::Uuid(uuid) => uuid.write(bytes),
HashType::Undefined => (),
}
}
fn reset(&mut self) {
match self {
HashType::Md5(md5) => md5.reset(),
HashType::Sha256(sha256) => sha256.reset(),
HashType::Uuid(uuid) => uuid.reset(),
HashType::Undefined => (),
}
}
fn sum(&mut self) -> String {
match self {
HashType::Md5(md5) => md5.sum(),
HashType::Sha256(sha256) => sha256.sum(),
HashType::Uuid(uuid) => uuid.sum(),
HashType::Undefined => "".to_owned(),
}
}
fn size(&self) -> usize {
match self {
HashType::Md5(md5) => md5.size(),
HashType::Sha256(sha256) => sha256.size(),
HashType::Uuid(uuid) => uuid.size(),
HashType::Undefined => 0,
}
}
fn block_size(&self) -> usize {
match self {
HashType::Md5(md5) => md5.block_size(),
HashType::Sha256(sha256) => sha256.block_size(),
HashType::Uuid(uuid) => uuid.block_size(),
HashType::Undefined => 64,
}
}
}
pub struct Sha256 {
hasher: sha_sha256,
}
impl Sha256 {
pub fn new() -> Self {
Self {
hasher: sha_sha256::new(),
}
}
}
impl Default for Sha256 {
fn default() -> Self {
Self::new()
}
}
impl Hasher for Sha256 {
fn write(&mut self, bytes: &[u8]) {
Update::update(&mut self.hasher, bytes);
}
fn reset(&mut self) {
Reset::reset(&mut self.hasher);
}
fn sum(&mut self) -> String {
hex_simd::encode_to_string(self.hasher.clone().finalize(), hex_simd::AsciiCase::Lower)
}
fn size(&self) -> usize {
32
}
fn block_size(&self) -> usize {
64
}
}
pub struct MD5 {
hasher: Md5,
}
impl MD5 {
pub fn new() -> Self {
Self { hasher: Md5::new() }
}
}
impl Default for MD5 {
fn default() -> Self {
Self::new()
}
}
impl Hasher for MD5 {
fn write(&mut self, bytes: &[u8]) {
self.hasher.update(bytes);
}
fn reset(&mut self) {}
fn sum(&mut self) -> String {
hex_simd::encode_to_string(self.hasher.clone().finalize(), hex_simd::AsciiCase::Lower)
}
fn size(&self) -> usize {
32
}
fn block_size(&self) -> usize {
64
}
}
pub struct Uuid {
id: String,
}
impl Uuid {
pub fn new(id: String) -> Self {
Self { id }
}
}
impl Hasher for Uuid {
fn write(&mut self, _bytes: &[u8]) {}
fn reset(&mut self) {}
fn sum(&mut self) -> String {
self.id.clone()
}
fn size(&self) -> usize {
self.id.len()
}
fn block_size(&self) -> usize {
64
}
}

View File

@@ -1,7 +0,0 @@
pub mod error;
pub mod hasher;
pub mod reader;
pub fn hex(data: impl AsRef<[u8]>) -> String {
hex_simd::encode_to_string(data, hex_simd::AsciiCase::Lower)
}

View File

@@ -1,493 +0,0 @@
use bytes::Bytes;
use s3s::StdError;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Poll;
use crate::{
error::ReaderError,
hasher::{HashType, Uuid},
};
// use futures::stream::Stream;
use super::hasher::{Hasher, Sha256, MD5};
use futures::Stream;
pin_project_lite::pin_project! {
#[derive(Default)]
pub struct EtagReader<S> {
#[pin]
inner: S,
md5: HashType,
checksum:Option<String>,
bytes_read:usize,
}
}
impl<S> EtagReader<S> {
pub fn new(inner: S, etag: Option<String>, force_md5: Option<String>) -> Self {
let md5 = {
if let Some(m) = force_md5 {
HashType::Uuid(Uuid::new(m))
} else {
HashType::Md5(MD5::new())
}
};
Self {
inner,
md5,
checksum: etag,
bytes_read: 0,
}
}
pub fn etag(&mut self) -> String {
self.md5.sum()
}
}
impl<S> Stream for EtagReader<S>
where
S: Stream<Item = Result<Bytes, StdError>>,
{
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let poll = this.inner.poll_next(cx);
if let Poll::Ready(ref res) = poll {
match res {
Some(Ok(bytes)) => {
*this.bytes_read += bytes.len();
this.md5.write(bytes);
}
Some(Err(err)) => {
return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string())))));
}
None => {
if let Some(etag) = this.checksum {
let got = this.md5.sum();
if got.as_str() != etag.as_str() {
return Poll::Ready(Some(Err(Box::new(ReaderError::VerifyError(etag.to_owned(), got)))));
}
}
}
}
}
poll
}
}
pin_project_lite::pin_project! {
#[derive(Default)]
pub struct HashReader<S> {
#[pin]
inner: S,
sha256: Option<Sha256>,
md5: Option<MD5>,
md5_hex:Option<String>,
sha256_hex:Option<String>,
size:usize,
actual_size: usize,
bytes_read:usize,
}
}
impl<S> HashReader<S> {
pub fn new(inner: S, size: usize, md5_hex: Option<String>, sha256_hex: Option<String>, actual_size: usize) -> Self {
let md5 = {
if md5_hex.is_some() {
Some(MD5::new())
} else {
None
}
};
let sha256 = {
if sha256_hex.is_some() {
Some(Sha256::new())
} else {
None
}
};
Self {
inner,
size,
actual_size,
md5_hex,
sha256_hex,
bytes_read: 0,
md5,
sha256,
}
}
}
impl<S> Stream for HashReader<S>
where
S: Stream<Item = Result<Bytes, StdError>>,
{
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let poll = this.inner.poll_next(cx);
if let Poll::Ready(ref res) = poll {
match res {
Some(Ok(bytes)) => {
*this.bytes_read += bytes.len();
if let Some(sha) = this.sha256 {
sha.write(bytes);
}
if let Some(md5) = this.md5 {
md5.write(bytes);
}
}
Some(Err(err)) => {
return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string())))));
}
None => {
if let Some(hash) = this.sha256 {
if let Some(hex) = this.sha256_hex {
let got = hash.sum();
let src = hex.as_str();
if src != got.as_str() {
println!("sha256 err src:{},got:{}", src, got);
return Poll::Ready(Some(Err(Box::new(ReaderError::SHA256Mismatch(src.to_string(), got)))));
}
}
}
if let Some(hash) = this.md5 {
if let Some(hex) = this.md5_hex {
let got = hash.sum();
let src = hex.as_str();
if src != got.as_str() {
// TODO: ERR
println!("md5 err src:{},got:{}", src, got);
return Poll::Ready(Some(Err(Box::new(ReaderError::ChecksumMismatch(src.to_string(), got)))));
}
}
}
}
}
}
// println!("poll {:?}", poll);
poll
}
}
pin_project_lite::pin_project! {
pub struct ChunkedStream<S> {
#[pin]
inner: S,
chuck_size: usize,
streams: VecDeque<Bytes>,
remaining:Vec<u8>,
}
}
impl<S> ChunkedStream<S> {
pub fn new(inner: S, chuck_size: usize) -> Self {
Self {
inner,
chuck_size,
streams: VecDeque::new(),
remaining: Vec::new(),
}
}
}
impl<S> Stream for ChunkedStream<S>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync,
// E: std::error::Error + Send + Sync,
{
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let (items, op_items) = self.inner.size_hint();
let this = self.project();
if let Some(b) = this.streams.pop_front() {
return Poll::Ready(Some(Ok(b)));
}
let poll = this.inner.poll_next(cx);
match poll {
Poll::Ready(res_op) => match res_op {
Some(res) => match res {
Ok(bytes) => {
let chuck_size = *this.chuck_size;
let mut bytes = bytes;
// println!("get len {}", bytes.len());
// 如果有剩余
if !this.remaining.is_empty() {
let need_size = chuck_size - this.remaining.len();
// 传入的数据大小需要补齐的大小,使用传入数据补齐
if bytes.len() >= need_size {
let add_bytes = bytes.split_to(need_size);
this.remaining.extend_from_slice(&add_bytes);
this.streams.push_back(Bytes::from(this.remaining.clone()));
this.remaining.clear();
} else {
// 不够,直接追加
let need_size = bytes.len();
let add_bytes = bytes.split_to(need_size);
this.remaining.extend_from_slice(&add_bytes);
}
}
loop {
if bytes.len() < chuck_size {
break;
}
let chuck = bytes.split_to(chuck_size);
this.streams.push_back(chuck);
}
if !bytes.is_empty() {
this.remaining.extend_from_slice(&bytes);
}
if let Some(b) = this.streams.pop_front() {
return Poll::Ready(Some(Ok(b)));
}
if items > 0 || op_items.is_some() {
return Poll::Pending;
}
if !this.remaining.is_empty() {
let b = this.remaining.clone();
this.remaining.clear();
return Poll::Ready(Some(Ok(Bytes::from(b))));
}
Poll::Ready(None)
}
Err(err) => Poll::Ready(Some(Err(err))),
},
None => {
// println!("get empty");
if let Some(b) = this.streams.pop_front() {
return Poll::Ready(Some(Ok(b)));
}
if !this.remaining.is_empty() {
let b = this.remaining.clone();
this.remaining.clear();
return Poll::Ready(Some(Ok(Bytes::from(b))));
}
Poll::Ready(None)
}
},
Poll::Pending => {
// println!("get Pending");
Poll::Pending
}
}
// if let Poll::Ready(Some(res)) = poll {
// warn!("poll res ...");
// match res {
// Ok(bytes) => {
// let chuck_size = *this.chuck_size;
// let mut bytes = bytes;
// if this.remaining.len() > 0 {
// let need_size = chuck_size - this.remaining.len();
// let add_bytes = bytes.split_to(need_size);
// this.remaining.extend_from_slice(&add_bytes);
// warn!("poll push_back remaining ...1");
// this.streams.push_back(Bytes::from(this.remaining.clone()));
// this.remaining.clear();
// }
// loop {
// if bytes.len() < chuck_size {
// break;
// }
// let chuck = bytes.split_to(chuck_size);
// warn!("poll push_back ...1");
// this.streams.push_back(chuck);
// }
// warn!("poll remaining extend_from_slice...1");
// this.remaining.extend_from_slice(&bytes);
// }
// Err(err) => return Poll::Ready(Some(Err(err))),
// }
// }
// if let Some(b) = this.streams.pop_front() {
// warn!("poll pop_front ...");
// return Poll::Ready(Some(Ok(b)));
// }
// if this.remaining.len() > 0 {
// let b = this.remaining.clone();
// this.remaining.clear();
// warn!("poll remaining ...1");
// return Poll::Ready(Some(Ok(Bytes::from(b))));
// }
// Poll::Pending
}
fn size_hint(&self) -> (usize, Option<usize>) {
let mut items = self.streams.len();
if !self.remaining.is_empty() {
items += 1;
}
(items, Some(items))
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::StreamExt;
#[tokio::test]
async fn test_etag_reader() {
let data1 = vec![1u8; 60]; // 65536
let data2 = vec![0u8; 32]; // 65536
let chunk1 = Bytes::from(data1);
let chunk2 = Bytes::from(data2);
let chunk_results: Vec<Result<Bytes, StdError>> = vec![Ok(chunk1), Ok(chunk2)];
let mut stream = futures::stream::iter(chunk_results);
let mut hash_reader = EtagReader::new(&mut stream, None, None);
// let chunk_size = 8;
// let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size);
loop {
match hash_reader.next().await {
Some(res) => match res {
Ok(bytes) => {
println!("bytes: {}, {:?}", bytes.len(), bytes);
}
Err(err) => {
println!("err:{:?}", err);
break;
}
},
None => {
println!("next none");
break;
}
}
}
println!("etag:{}", hash_reader.etag());
// 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45
// println!("md5: {:?}", hash_reader.hex());
}
#[tokio::test]
async fn test_hash_reader() {
let data1 = vec![1u8; 60]; // 65536
let data2 = vec![0u8; 32]; // 65536
let size = data1.len() + data2.len();
let chunk1 = Bytes::from(data1);
let chunk2 = Bytes::from(data2);
let chunk_results: Vec<Result<Bytes, StdError>> = vec![Ok(chunk1), Ok(chunk2)];
let mut stream = futures::stream::iter(chunk_results);
let mut hash_reader = HashReader::new(
&mut stream,
size,
Some("d94c485610a7a00a574df55e45d3cc0c".to_string()),
Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()),
0,
);
// let chunk_size = 8;
// let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size);
loop {
match hash_reader.next().await {
Some(res) => match res {
Ok(bytes) => {
println!("bytes: {}, {:?}", bytes.len(), bytes);
}
Err(err) => {
println!("err:{:?}", err);
break;
}
},
None => {
println!("next none");
break;
}
}
}
// BUG: borrow of moved value: `md5_stream`
// 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45
// println!("md5: {:?}", hash_reader.hex());
}
#[tokio::test]
async fn test_chunked_stream() {
let data1 = vec![1u8; 60]; // 65536
let data2 = vec![0u8; 33]; // 65536
let data3 = vec![4u8; 5]; // 65536
let chunk1 = Bytes::from(data1);
let chunk2 = Bytes::from(data2);
let chunk3 = Bytes::from(data3);
let chunk_results: Vec<Result<Bytes, StdError>> = vec![Ok(chunk1), Ok(chunk2), Ok(chunk3)];
let mut stream = futures::stream::iter(chunk_results);
// let mut hash_reader = HashReader::new(
// &mut stream,
// size,
// Some("d94c485610a7a00a574df55e45d3cc0c".to_string()),
// Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()),
// 0,
// );
let chunk_size = 8;
let mut etag_reader = EtagReader::new(&mut stream, None, None);
let mut chunked_stream = ChunkedStream::new(&mut etag_reader, chunk_size);
loop {
match chunked_stream.next().await {
Some(res) => match res {
Ok(bytes) => {
println!("bytes: {}, {:?}", bytes.len(), bytes);
}
Err(err) => {
println!("err:{:?}", err);
break;
}
},
None => {
println!("next none");
break;
}
}
}
println!("etag:{}", etag_reader.etag());
}
}

View File

@@ -1,5 +0,0 @@
# 流程
## 写入
http::Body -> HashReader -> ...(other reader) -> ChuckedReader -> BitrotWriter -> FileWriter

View File

@@ -4,6 +4,7 @@ use super::router::S3Router;
use crate::storage::ecfs::bytes_stream;
use common::error::Result;
use ecstore::disk::DiskAPI;
use ecstore::io::READ_BUFFER_SIZE;
use ecstore::store::find_local_disk;
use futures::TryStreamExt;
use http::StatusCode;
@@ -71,7 +72,10 @@ impl Operation for ReadFile {
Ok(S3Response::new((
StatusCode::OK,
Body::from(StreamingBlob::wrap(bytes_stream(ReaderStream::new(file), query.length))),
Body::from(StreamingBlob::wrap(bytes_stream(
ReaderStream::with_capacity(file, READ_BUFFER_SIZE),
query.length,
))),
)))
}
}

View File

@@ -152,9 +152,13 @@ async fn run(opt: config::Opt) -> Result<()> {
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}, \n{:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line, eps
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line
);
for ep in eps.endpoints.as_ref().iter() {
info!(" - {}", ep);
}
}
set_global_addr(&opt.address).await;

View File

@@ -20,6 +20,7 @@ use ecstore::bucket::policy_sys::PolicySys;
use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::io::READ_BUFFER_SIZE;
use ecstore::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
@@ -51,6 +52,8 @@ use s3s::S3;
use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
use tracing::error;
use tracing::info;
@@ -464,8 +467,13 @@ impl S3 for FS {
};
let last_modified = info.mod_time.map(Timestamp::from);
let body = Some(StreamingBlob::wrap(bytes_stream(
ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE),
info.size,
)));
let output = GetObjectOutput {
body: Some(reader.stream),
body,
content_length: Some(info.size as i64),
last_modified,
content_type,
@@ -799,6 +807,10 @@ impl S3 for FS {
}
};
let body = Box::new(StreamReader::new(
body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
));
let mut reader = PutObjReader::new(body, content_length as usize);
let Some(store) = new_object_layer_fn() else {
@@ -911,6 +923,10 @@ impl S3 for FS {
}
};
let body = Box::new(StreamReader::new(
body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
));
// mc cp step 4
let mut data = PutObjReader::new(body, content_length as usize);
let opts = ObjectOptions::default();