use http for remote read/write

This commit is contained in:
weisd
2025-03-11 09:12:20 +08:00
parent 7a7aee2049
commit 70031effa7
28 changed files with 4104 additions and 1792 deletions

View File

@@ -4,7 +4,7 @@ ENV LANG C.UTF-8
RUN sed -i s@http://.*archive.ubuntu.com@http://repo.huaweicloud.com@g /etc/apt/sources.list
RUN apt-get clean && apt-get update && apt-get install wget git curl unzip gcc pkg-config libssl-dev -y
RUN apt-get clean && apt-get update && apt-get install wget git curl unzip gcc pkg-config libssl-dev lld libdbus-1-dev libwayland-dev libwebkit2gtk-4.1-dev libxdo-dev -y
# install protoc
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v27.0/protoc-27.0-linux-x86_64.zip \

View File

@@ -4,7 +4,7 @@ ENV LANG C.UTF-8
RUN sed -i s@http://.*archive.ubuntu.com@http://repo.huaweicloud.com@g /etc/apt/sources.list
RUN apt-get clean && apt-get update && apt-get install wget git curl unzip gcc pkg-config libssl-dev -y
RUN apt-get clean && apt-get update && apt-get install wget git curl unzip gcc pkg-config libssl-dev lld libdbus-1-dev libwayland-dev libwebkit2gtk-4.1-dev libxdo-dev -y
# install protoc
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v27.0/protoc-27.0-linux-x86_64.zip \

17
Cargo.lock generated
View File

@@ -17,21 +17,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "admin"
version = "0.0.1"
dependencies = [
"axum",
"ecstore",
"futures-util",
"hyper",
"mime",
"serde",
"serde_json",
"time",
"tower 0.5.2",
]
[[package]]
name = "aead"
version = "0.5.2"
@@ -1960,6 +1945,7 @@ dependencies = [
"tracing-error",
"transform-stream",
"url",
"urlencoding",
"uuid",
"winapi",
"workers",
@@ -5309,7 +5295,6 @@ dependencies = [
name = "rustfs"
version = "0.1.0"
dependencies = [
"admin",
"async-trait",
"atoi",
"axum",

View File

@@ -7,7 +7,6 @@ members = [
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"api/admin", # Admin HTTP API endpoints
"reader", # Object reading service
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management

View File

@@ -37,9 +37,9 @@ probe-e2e:
# in target/rockylinux9.3/release/rustfs
BUILD_OS ?= rockylinux9.3
.PHONY: build
build: ROCKYLINUX_BUILD_IMAGE_NAME = $(BUILD_OS):v1
build: ROCKYLINUX_BUILD_IMAGE_NAME = rustfs-$(BUILD_OS):v1
build: ROCKYLINUX_BUILD_CONTAINER_NAME = rustfs-$(BUILD_OS)-build
build: BUILD_CMD = /root/.cargo/bin/cargo build --release --target-dir /root/s3-rustfs/target/$(BUILD_OS)
build: BUILD_CMD = /root/.cargo/bin/cargo build --release --bin rustfs --target-dir /root/s3-rustfs/target/$(BUILD_OS)
build:
$(DOCKER_CLI) build -t $(ROCKYLINUX_BUILD_IMAGE_NAME) -f $(DOCKERFILE_PATH)/Dockerfile.$(BUILD_OS) .
$(DOCKER_CLI) run --rm --name $(ROCKYLINUX_BUILD_CONTAINER_NAME) -v $(shell pwd):/root/s3-rustfs -it $(ROCKYLINUX_BUILD_IMAGE_NAME) $(BUILD_CMD)

38
TODO.md
View File

@@ -57,3 +57,41 @@
- [ ] 对象压缩
- [ ] STS
- [ ] 分层阿里云、腾讯云、S3远程对接
scp ./target/ubuntu22.04/release/rustfs.zip root@8.130.183.154:~/
scp ./target/ubuntu22.04/release/rustfs.zip root@8.130.177.182:~/
scp ./target/ubuntu22.04/release/rustfs.zip root@8.130.91.189:~/
scp ./target/ubuntu22.04/release/rustfs.zip root@8.130.182.114:~/
scp ./target/x86_64-unknown-linux-musl/release/rustfs root@8.130.183.154:~/
scp ./target/x86_64-unknown-linux-musl/release/rustfs root@8.130.177.182:~/
scp ./target/x86_64-unknown-linux-musl/release/rustfs root@8.130.91.189:~/
scp ./target/x86_64-unknown-linux-musl/release/rustfs root@8.130.182.114:~/
2025-03-11T06:18:50.011565Z DEBUG s3s::service: req: Request { method: PUT, uri: /rustfs/rpc/put_file_stream?disk=http://node2:9000/data/rustfs2&volume=.rustfs.sys/tmp&path=a45ade1a-e09b-4eb4-bac1-8b5f55f7d438/235da61f-a705-4f9a-aa21-7801d2eaf61d/part.1&append=false, version: HTTP/1.1, headers: {"accept": "*/*", "host": "node2:9000", "transfer-encoding": "chunked"}, body: Body { hyper: Body(Streaming) } }
at /Users/weisd/.cargo/git/checkouts/s3s-58426f2d17c34859/ab139f7/crates/s3s/src/service.rs:81
in s3s::service::call with start_time: 2025-03-11 6:18:50.011550933 +00:00:00
2025-03-11T06:18:50.011603Z DEBUG s3s::ops: parsing path-style request, decoded_uri_path: "/rustfs/rpc/put_file_stream"
at /Users/weisd/.cargo/git/checkouts/s3s-58426f2d17c34859/ab139f7/crates/s3s/src/ops/mod.rs:266
in s3s::service::call with start_time: 2025-03-11 6:18:50.011550933 +00:00:00
2025-03-11T06:18:50.011651Z DEBUG s3s::ops: body_changed: false, decoded_content_length: None, has_multipart: false
at /Users/weisd/.cargo/git/checkouts/s3s-58426f2d17c34859/ab139f7/crates/s3s/src/ops/mod.rs:342
in s3s::service::call with start_time: 2025-03-11 6:18:50.011550933 +00:00:00
2025-03-11T06:18:50.011687Z WARN rustfs::admin::rpc: handle PutFile
at rustfs/src/admin/rpc.rs:120
in s3s::service::call with start_time: 2025-03-11 6:18:50.011550933 +00:00:00
2025-03-11T06:18:50.011716Z DEBUG s3s::ops: custom route returns error, err: S3Error(Inner { code: InvalidArgument, message: Some("get query failed1 Error(\"missing field `size`\")"), request_id: None, status_code: None, source: None, headers: None })
at /Users/weisd/.cargo/git/checkouts/s3s-58426f2d17c34859/ab139f7/crates/s3s/src/ops/mod.rs:227
in s3s::service::call with start_time: 2025-03-11 6:18:50.011550933 +00:00:00
2025-03-11T06:18:50.011751Z DEBUG s3s::service: res: Response { status: 400, version: HTTP/1.1, headers: {"content-type": "application/xml"}, body: Body { once: b"<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>InvalidArgument</Code><Message>get query failed1 Error(&quot;missing field `size`&quot;)</Message></Error>" } }

View File

@@ -1,21 +0,0 @@
[package]
name = "admin"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[lints]
workspace = true
[dependencies]
axum.workspace = true
mime.workspace = true
serde.workspace = true
serde_json.workspace = true
ecstore = { path = "../../ecstore" }
time = { workspace = true, features = ["serde"] }
tower.workspace = true
futures-util = "0.3.31"
hyper.workspace = true

View File

@@ -1,98 +0,0 @@
use axum::{
body::Body,
http::{header::CONTENT_TYPE, HeaderValue, StatusCode},
response::{IntoResponse, Response},
};
use mime::APPLICATION_JSON;
use serde::Serialize;
#[derive(Serialize, Default)]
#[serde(rename_all = "PascalCase")]
pub struct ErrorResponse {
pub code: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bucket_name: Option<String>,
pub resource: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
pub host_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub actual_object_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub range_requested: Option<String>,
}
impl IntoResponse for APIError {
fn into_response(self) -> Response {
let code = self.http_status_code;
let err_response = ErrorResponse::from(self);
let json_res = match serde_json::to_vec(&err_response) {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response(),
};
Response::builder()
.status(code)
.header(CONTENT_TYPE, HeaderValue::from_static(APPLICATION_JSON.as_ref()))
.body(Body::from(json_res))
.unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response())
}
}
#[derive(Default)]
pub struct APIError {
code: String,
description: String,
http_status_code: StatusCode,
object_size: Option<String>,
range_requested: Option<String>,
}
pub enum ErrorCode {
ErrNotImplemented,
ErrServerNotInitialized,
}
impl IntoResponse for ErrorCode {
fn into_response(self) -> Response {
APIError::from(self).into_response()
}
}
impl From<ErrorCode> for APIError {
fn from(value: ErrorCode) -> Self {
use ErrorCode::*;
match value {
ErrNotImplemented => APIError {
code: "NotImplemented".into(),
description: "A header you provided implies functionality that is not implemented.".into(),
http_status_code: StatusCode::NOT_IMPLEMENTED,
..Default::default()
},
ErrServerNotInitialized => APIError {
code: "ServerNotInitialized".into(),
description: "Server not initialized yet, please try again.".into(),
http_status_code: StatusCode::SERVICE_UNAVAILABLE,
..Default::default()
},
}
}
}
impl From<APIError> for ErrorResponse {
fn from(value: APIError) -> Self {
Self {
code: value.code,
message: value.description,
actual_object_size: value.object_size,
range_requested: value.range_requested,
..Default::default()
}
}
}

View File

@@ -1 +0,0 @@
pub mod list_pools;

View File

@@ -1,83 +0,0 @@
use crate::error::ErrorCode;
use crate::Result as LocalResult;
use axum::Json;
use ecstore::new_object_layer_fn;
use serde::Serialize;
use time::OffsetDateTime;
#[derive(Serialize)]
pub struct PoolStatus {
id: i64,
cmdline: String,
#[serde(rename = "lastUpdate")]
#[serde(serialize_with = "time::serde::rfc3339::serialize")]
last_updat: OffsetDateTime,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "decommissionInfo")]
decommission_info: Option<PoolDecommissionInfo>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct PoolDecommissionInfo {
#[serde(serialize_with = "time::serde::rfc3339::serialize")]
start_time: OffsetDateTime,
start_size: i64,
total_size: i64,
current_size: i64,
complete: bool,
failed: bool,
canceled: bool,
#[serde(rename = "objectsDecommissioned")]
items_decommissioned: i64,
#[serde(rename = "objectsDecommissionedFailed")]
items_decommission_failed: i64,
#[serde(rename = "bytesDecommissioned")]
bytes_done: i64,
#[serde(rename = "bytesDecommissionedFailed")]
bytes_failed: i64,
}
pub async fn handler() -> LocalResult<Json<Vec<PoolStatus>>> {
// if ecstore::is_legacy().await {
// return Err(ErrorCode::ErrNotImplemented);
// }
//
//
// todo 实用oncelock作为全局变量
let Some(store) = new_object_layer_fn() else { return Err(ErrorCode::ErrNotImplemented) };
// todo, 调用pool.status()接口获取每个池的数据
//
let mut result = Vec::new();
for (idx, _pool) in store.pools.iter().enumerate() {
// 这里mock一下数据
result.push(PoolStatus {
id: idx as _,
cmdline: "cmdline".into(),
last_updat: OffsetDateTime::now_utc(),
decommission_info: if idx % 2 == 0 {
Some(PoolDecommissionInfo {
start_time: OffsetDateTime::now_utc(),
start_size: 1,
total_size: 2,
current_size: 2,
complete: true,
failed: true,
canceled: true,
items_decommissioned: 1,
items_decommission_failed: 1,
bytes_done: 1,
bytes_failed: 1,
})
} else {
None
},
})
}
Ok(Json(result))
}

View File

@@ -1,20 +0,0 @@
pub mod error;
pub mod handlers;
use axum::{extract::Request, response::Response, routing::get, BoxError, Router};
use error::ErrorCode;
use handlers::list_pools;
use tower::Service;
pub type Result<T> = std::result::Result<T, ErrorCode>;
const API_VERSION: &str = "/v3";
pub fn register_admin_router() -> impl Service<Request, Response = Response, Error: Into<BoxError>, Future: Send> + Clone {
Router::new()
.nest(
"/rustfs/admin",
Router::new().nest(API_VERSION, Router::new().route("/pools/list", get(list_pools::handler))),
)
.into_service()
}

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

74
docker-compose.yaml Normal file
View File

@@ -0,0 +1,74 @@
version: '3.8'
services:
node1:
image: rustfs:v1 # 替换为你的镜像名称和标签
container_name: node1
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9001:9000" # 映射宿主机的 9001 端口到容器的 9000 端口
volumes:
- ..:/root/data # 将当前路径挂载到容器内的 /root/data
command: "/root/rustfs"
networks:
- my_network
node2:
image: rustfs:v1
container_name: node2
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9002:9000" # 映射宿主机的 9002 端口到容器的 9000 端口
volumes:
- ..:/root/data
command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs"
networks:
- my_network
node3:
image: rustfs:v1
container_name: node3
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9003:9000" # 映射宿主机的 9003 端口到容器的 9000 端口
volumes:
- ..:/root/data
command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs"
networks:
- my_network
node4:
image: rustfs:v1
container_name: node4
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
platform: linux/amd64
ports:
- "9004:9000" # 映射宿主机的 9004 端口到容器的 9000 端口
volumes:
- ..:/root/data
command: "mkdir -p /root/data/target/volume/test{1..4} && /root/data/target/ubuntu22.04/release/rustfs"
networks:
- my_network
networks:
my_network:
driver: bridge

View File

@@ -67,6 +67,7 @@ md-5.workspace = true
madmin.workspace = true
workers.workspace = true
reqwest = { workspace = true }
urlencoding = "2.1.3"
[target.'cfg(not(windows))'.dependencies]

View File

@@ -1,5 +1,9 @@
use crate::{
disk::{error::DiskError, BufferReader, Disk, DiskAPI, DiskStore, FileReader, FileWriter},
disk::{
error::DiskError,
io::{FileReader, FileWriter},
Disk, DiskAPI,
},
erasure::{ReadAt, Writer},
error::{Error, Result},
store_api::BitrotAlgorithm,
@@ -9,13 +13,8 @@ use blake2::Digest as _;
use highway::{HighwayHash, HighwayHasher, Key};
use lazy_static::lazy_static;
use sha2::{digest::core_api::BlockSizeUser, Digest, Sha256};
use std::{any::Any, collections::HashMap, sync::Arc};
use tokio::{
io::AsyncReadExt as _,
spawn,
sync::mpsc::{self, Sender},
task::JoinHandle,
};
use std::{any::Any, collections::HashMap, io::Cursor, sync::Arc};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt};
use tracing::{error, info};
lazy_static! {
@@ -145,22 +144,22 @@ pub fn bitrot_algorithm_from_string(s: &str) -> BitrotAlgorithm {
pub type BitrotWriter = Box<dyn Writer + Send + 'static>;
pub async fn new_bitrot_writer(
disk: DiskStore,
orig_volume: &str,
volume: &str,
file_path: &str,
length: usize,
algo: BitrotAlgorithm,
shard_size: usize,
) -> Result<BitrotWriter> {
if algo == BitrotAlgorithm::HighwayHash256S {
return Ok(Box::new(
StreamingBitrotWriter::new(disk, orig_volume, volume, file_path, length, algo, shard_size).await?,
));
}
Ok(Box::new(WholeBitrotWriter::new(disk, volume, file_path, algo, shard_size)))
}
// pub async fn new_bitrot_writer(
// disk: DiskStore,
// orig_volume: &str,
// volume: &str,
// file_path: &str,
// length: usize,
// algo: BitrotAlgorithm,
// shard_size: usize,
// ) -> Result<BitrotWriter> {
// if algo == BitrotAlgorithm::HighwayHash256S {
// return Ok(Box::new(
// StreamingBitrotWriter::new(disk, orig_volume, volume, file_path, length, algo, shard_size).await?,
// ));
// }
// Ok(Box::new(WholeBitrotWriter::new(disk, volume, file_path, algo, shard_size)))
// }
pub type BitrotReader = Box<dyn ReadAt + Send>;
@@ -189,13 +188,13 @@ pub async fn close_bitrot_writers(writers: &mut [Option<BitrotWriter>]) -> Resul
Ok(())
}
pub fn bitrot_writer_sum(w: &BitrotWriter) -> Vec<u8> {
if let Some(w) = w.as_any().downcast_ref::<WholeBitrotWriter>() {
return w.hash.clone().finalize();
}
// pub fn bitrot_writer_sum(w: &BitrotWriter) -> Vec<u8> {
// if let Some(w) = w.as_any().downcast_ref::<WholeBitrotWriter>() {
// return w.hash.clone().finalize();
// }
Vec::new()
}
// Vec::new()
// }
pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: BitrotAlgorithm) -> usize {
if algo != BitrotAlgorithm::HighwayHash256S {
@@ -260,40 +259,40 @@ pub async fn bitrot_verify(
Ok(())
}
pub struct WholeBitrotWriter {
disk: DiskStore,
volume: String,
file_path: String,
_shard_size: usize,
pub hash: Hasher,
}
// pub struct WholeBitrotWriter {
// disk: DiskStore,
// volume: String,
// file_path: String,
// _shard_size: usize,
// pub hash: Hasher,
// }
impl WholeBitrotWriter {
pub fn new(disk: DiskStore, volume: &str, file_path: &str, algo: BitrotAlgorithm, shard_size: usize) -> Self {
WholeBitrotWriter {
disk,
volume: volume.to_string(),
file_path: file_path.to_string(),
_shard_size: shard_size,
hash: algo.new_hasher(),
}
}
}
// impl WholeBitrotWriter {
// pub fn new(disk: DiskStore, volume: &str, file_path: &str, algo: BitrotAlgorithm, shard_size: usize) -> Self {
// WholeBitrotWriter {
// disk,
// volume: volume.to_string(),
// file_path: file_path.to_string(),
// _shard_size: shard_size,
// hash: algo.new_hasher(),
// }
// }
// }
#[async_trait::async_trait]
impl Writer for WholeBitrotWriter {
fn as_any(&self) -> &dyn Any {
self
}
// #[async_trait::async_trait]
// impl Writer for WholeBitrotWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let mut file = self.disk.append_file(&self.volume, &self.file_path).await?;
let _ = file.write(buf).await?;
self.hash.update(buf);
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let mut file = self.disk.append_file(&self.volume, &self.file_path).await?;
// let _ = file.write(buf).await?;
// self.hash.update(buf);
Ok(())
}
}
// Ok(())
// }
// }
// #[derive(Debug)]
// pub struct WholeBitrotReader {
@@ -344,74 +343,74 @@ impl Writer for WholeBitrotWriter {
// }
// }
struct StreamingBitrotWriter {
hasher: Hasher,
tx: Sender<Option<Vec<u8>>>,
task: Option<JoinHandle<()>>,
}
// struct StreamingBitrotWriter {
// hasher: Hasher,
// tx: Sender<Option<Vec<u8>>>,
// task: Option<JoinHandle<()>>,
// }
impl StreamingBitrotWriter {
pub async fn new(
disk: DiskStore,
orig_volume: &str,
volume: &str,
file_path: &str,
length: usize,
algo: BitrotAlgorithm,
shard_size: usize,
) -> Result<Self> {
let hasher = algo.new_hasher();
let (tx, mut rx) = mpsc::channel::<Option<Vec<u8>>>(10);
// impl StreamingBitrotWriter {
// pub async fn new(
// disk: DiskStore,
// orig_volume: &str,
// volume: &str,
// file_path: &str,
// length: usize,
// algo: BitrotAlgorithm,
// shard_size: usize,
// ) -> Result<Self> {
// let hasher = algo.new_hasher();
// let (tx, mut rx) = mpsc::channel::<Option<Vec<u8>>>(10);
let total_file_size = length.div_ceil(shard_size) * hasher.size() + length;
let mut writer = disk.create_file(orig_volume, volume, file_path, total_file_size).await?;
// let total_file_size = length.div_ceil(shard_size) * hasher.size() + length;
// let mut writer = disk.create_file(orig_volume, volume, file_path, total_file_size).await?;
let task = spawn(async move {
loop {
if let Some(Some(buf)) = rx.recv().await {
writer.write(&buf).await.unwrap();
continue;
}
// let task = spawn(async move {
// loop {
// if let Some(Some(buf)) = rx.recv().await {
// writer.write(&buf).await.unwrap();
// continue;
// }
break;
}
});
// break;
// }
// });
Ok(StreamingBitrotWriter {
hasher,
tx,
task: Some(task),
})
}
}
// Ok(StreamingBitrotWriter {
// hasher,
// tx,
// task: Some(task),
// })
// }
// }
#[async_trait::async_trait]
impl Writer for StreamingBitrotWriter {
fn as_any(&self) -> &dyn Any {
self
}
// #[async_trait::async_trait]
// impl Writer for StreamingBitrotWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
async fn write(&mut self, buf: &[u8]) -> Result<()> {
if buf.is_empty() {
return Ok(());
}
self.hasher.reset();
self.hasher.update(buf);
let hash_bytes = self.hasher.clone().finalize();
let _ = self.tx.send(Some(hash_bytes)).await?;
let _ = self.tx.send(Some(buf.to_vec())).await?;
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// if buf.is_empty() {
// return Ok(());
// }
// self.hasher.reset();
// self.hasher.update(buf);
// let hash_bytes = self.hasher.clone().finalize();
// let _ = self.tx.send(Some(hash_bytes)).await?;
// let _ = self.tx.send(Some(buf.to_vec())).await?;
Ok(())
}
// Ok(())
// }
async fn close(&mut self) -> Result<()> {
let _ = self.tx.send(None).await?;
if let Some(task) = self.task.take() {
let _ = task.await; // 等待任务完成
}
Ok(())
}
}
// async fn close(&mut self) -> Result<()> {
// let _ = self.tx.send(None).await?;
// if let Some(task) = self.task.take() {
// let _ = task.await; // 等待任务完成
// }
// Ok(())
// }
// }
// #[derive(Debug)]
// struct StreamingBitrotReader {
@@ -522,8 +521,8 @@ impl Writer for BitrotFileWriter {
self.hasher.reset();
self.hasher.update(buf);
let hash_bytes = self.hasher.clone().finalize();
let _ = self.inner.write(&hash_bytes).await?;
let _ = self.inner.write(buf).await?;
let _ = self.inner.write_all(&hash_bytes).await?;
let _ = self.inner.write_all(buf).await?;
Ok(())
}
@@ -600,11 +599,7 @@ impl ReadAt for BitrotFileReader {
let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset;
if let Some(data) = self.data.clone() {
self.reader = Some(FileReader::Buffer(BufferReader::new(
data,
stream_offset,
self.till_offset - stream_offset,
)));
self.reader = Some(FileReader::Buffer(Cursor::new(data)));
} else {
self.reader = Some(
self.disk

229
ecstore/src/disk/io.rs Normal file
View File

@@ -0,0 +1,229 @@
use crate::error::Result;
use futures::TryStreamExt;
use std::io::Cursor;
use std::pin::Pin;
use std::task::Poll;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::error;
use tracing::warn;
#[derive(Debug)]
pub enum FileReader {
Local(File),
// Remote(RemoteFileReader),
Buffer(Cursor<Vec<u8>>),
Http(HttpFileReader),
}
impl AsyncRead for FileReader {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
match &mut *self {
Self::Local(reader) => Pin::new(reader).poll_read(cx, buf),
Self::Buffer(reader) => Pin::new(reader).poll_read(cx, buf),
Self::Http(reader) => Pin::new(reader).poll_read(cx, buf),
}
}
}
#[derive(Debug)]
pub struct HttpFileReader {
// client: reqwest::Client,
// url: String,
// disk: String,
// volume: String,
// path: String,
// offset: usize,
// length: usize,
inner: tokio::io::DuplexStream,
// buf: Vec<u8>,
// pos: usize,
}
impl HttpFileReader {
pub fn new(url: &str, disk: &str, volume: &str, path: &str, offset: usize, length: usize) -> Result<Self> {
warn!("http read start {}", path);
let url = url.to_owned();
let disk = disk.to_owned();
let volume = volume.to_owned();
let path = path.to_owned();
// let (reader, mut writer) = tokio::io::simplex(1024);
let (reader, mut writer) = tokio::io::duplex(1024 * 1024 * 10);
tokio::spawn(async move {
let client = reqwest::Client::new();
let resp = match client
.get(format!(
"{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
url,
urlencoding::encode(&disk),
urlencoding::encode(&volume),
urlencoding::encode(&path),
offset,
length
))
.send()
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
{
Ok(resp) => resp,
Err(err) => {
warn!("http file reader error: {}", err);
return;
}
};
let mut rd = StreamReader::new(
resp.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);
if let Err(err) = tokio::io::copy(&mut rd, &mut writer).await {
error!("http file reader copy error: {}", err);
};
});
Ok(Self {
// client: reqwest::Client::new(),
// url: url.to_string(),
// disk: disk.to_string(),
// volume: volume.to_string(),
// path: path.to_string(),
// offset,
// length,
inner: reader,
// buf: Vec::new(),
// pos: 0,
})
}
}
impl AsyncRead for HttpFileReader {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
#[derive(Debug)]
pub enum FileWriter {
Local(File),
Http(HttpFileWriter),
Buffer(Cursor<Vec<u8>>),
}
impl AsyncWrite for FileWriter {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
match &mut *self {
Self::Local(writer) => Pin::new(writer).poll_write(cx, buf),
Self::Buffer(writer) => Pin::new(writer).poll_write(cx, buf),
Self::Http(writer) => Pin::new(writer).poll_write(cx, buf),
}
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
match &mut *self {
Self::Local(writer) => Pin::new(writer).poll_flush(cx),
Self::Buffer(writer) => Pin::new(writer).poll_flush(cx),
Self::Http(writer) => Pin::new(writer).poll_flush(cx),
}
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
match &mut *self {
Self::Local(writer) => Pin::new(writer).poll_shutdown(cx),
Self::Buffer(writer) => Pin::new(writer).poll_shutdown(cx),
Self::Http(writer) => Pin::new(writer).poll_shutdown(cx),
}
}
}
#[derive(Debug)]
pub struct HttpFileWriter {
wd: tokio::io::WriteHalf<tokio::io::SimplexStream>,
}
impl HttpFileWriter {
pub fn new(url: &str, disk: &str, volume: &str, path: &str, size: usize, append: bool) -> Result<Self> {
let (rd, wd) = tokio::io::simplex(1024 * 1024 * 10);
let body = reqwest::Body::wrap_stream(ReaderStream::new(rd));
let url = url.to_owned();
let disk = disk.to_owned();
let volume = volume.to_owned();
let path = path.to_owned();
tokio::spawn(async move {
let client = reqwest::Client::new();
if let Err(err) = client
.put(format!(
"{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}",
url,
urlencoding::encode(&disk),
urlencoding::encode(&volume),
urlencoding::encode(&path),
append,
size
))
.body(body)
.send()
.await
{
error!("HttpFileWriter put file err: {:?}", err);
// return;
}
// TODO: handle response
// debug!("http write done {}", path);
});
Ok(Self {
wd,
// client: reqwest::Client::new(),
// url: url.to_string(),
// disk: disk.to_string(),
// volume: volume.to_string(),
})
}
}
impl AsyncWrite for HttpFileWriter {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
Pin::new(&mut self.wd).poll_write(cx, buf)
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.wd).poll_flush(cx)
}
#[tracing::instrument(level = "debug", skip(self))]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.wd).poll_shutdown(cx)
}
}

View File

@@ -17,7 +17,7 @@ use crate::disk::error::{
is_sys_err_not_dir, map_err_not_exists, os_err_to_file_err,
};
use crate::disk::os::{check_path_length, is_empty_dir};
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::disk::STORAGE_FORMAT_FILE;
use crate::error::{Error, Result};
use crate::file_meta::{get_file_info, read_xl_meta_no_data, FileInfoOpts};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
@@ -745,15 +745,7 @@ impl LocalDisk {
let meta = file.metadata().await?;
bitrot_verify(
FileReader::Local(LocalFileReader::new(file)),
meta.size() as usize,
part_size,
algo,
sum.to_vec(),
shard_size,
)
.await
bitrot_verify(FileReader::Local(file), meta.size() as usize, part_size, algo, sum.to_vec(), shard_size).await
}
async fn scan_dir<W: AsyncWrite + Unpin>(
@@ -1297,6 +1289,7 @@ impl DiskAPI for LocalDisk {
Ok(resp)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec<u8>) -> Result<()> {
let src_volume_dir = self.get_bucket_path(src_volume)?;
let dst_volume_dir = self.get_bucket_path(dst_volume)?;
@@ -1311,12 +1304,18 @@ impl DiskAPI for LocalDisk {
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR);
if !src_is_dir && dst_is_dir || src_is_dir && !dst_is_dir {
warn!(
"rename_part src and dst must be both dir or file src_is_dir:{}, dst_is_dir:{}",
src_is_dir, dst_is_dir
);
return Err(Error::from(DiskError::FileAccessDenied));
}
let src_file_path = src_volume_dir.join(Path::new(src_path));
let dst_file_path = dst_volume_dir.join(Path::new(dst_path));
warn!("rename_part src_file_path:{:?}, dst_file_path:{:?}", &src_file_path, &dst_file_path);
check_path_length(src_file_path.to_string_lossy().as_ref())?;
check_path_length(dst_file_path.to_string_lossy().as_ref())?;
@@ -1337,12 +1336,14 @@ impl DiskAPI for LocalDisk {
if let Some(meta) = meta_op {
if !meta.is_dir() {
warn!("rename_part src is not dir {:?}", &src_file_path);
return Err(Error::new(DiskError::FileAccessDenied));
}
}
if let Err(e) = utils::fs::remove(&dst_file_path).await {
if is_sys_err_not_empty(&e) || is_sys_err_not_dir(&e) {
warn!("rename_part remove dst failed {:?} err {:?}", &dst_file_path, e);
return Err(Error::new(DiskError::FileAccessDenied));
} else if is_sys_err_io(&e) {
return Err(Error::new(DiskError::FaultyDisk));
@@ -1355,6 +1356,7 @@ impl DiskAPI for LocalDisk {
if let Err(err) = os::rename_all(&src_file_path, &dst_file_path, &dst_volume_dir).await {
if let Some(e) = err.to_io_err() {
if is_sys_err_not_empty(&e) || is_sys_err_not_dir(&e) {
warn!("rename_part rename all failed {:?} err {:?}", &dst_file_path, e);
return Err(Error::new(DiskError::FileAccessDenied));
}
@@ -1467,8 +1469,10 @@ impl DiskAPI for LocalDisk {
Ok(())
}
// TODO: use io.reader
#[tracing::instrument(level = "debug", skip(self))]
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
warn!("disk create_file: origvolume: {}, volume: {}, path: {}", origvolume, volume, path);
if !origvolume.is_empty() {
let origvolume_dir = self.get_bucket_path(origvolume)?;
if !skip_access_checks(origvolume) {
@@ -1491,12 +1495,16 @@ impl DiskAPI for LocalDisk {
.await
.map_err(os_err_to_file_err)?;
Ok(FileWriter::Local(LocalFileWriter::new(f)))
Ok(FileWriter::Local(f))
// Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
// async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<File> {
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
warn!("disk append_file: volume: {}, path: {}", volume, path);
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
if let Err(e) = utils::fs::access(&volume_dir).await {
@@ -1509,11 +1517,13 @@ impl DiskAPI for LocalDisk {
let f = self.open_file(file_path, O_CREATE | O_APPEND | O_WRONLY, volume_dir).await?;
Ok(FileWriter::Local(LocalFileWriter::new(f)))
Ok(FileWriter::Local(f))
}
// TODO: io verifier
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
warn!("disk read_file: volume: {}, path: {}", volume, path);
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
if let Err(e) = utils::fs::access(&volume_dir).await {
@@ -1542,10 +1552,16 @@ impl DiskAPI for LocalDisk {
}
})?;
Ok(FileReader::Local(LocalFileReader::new(f)))
Ok(FileReader::Local(f))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<FileReader> {
warn!(
"disk read_file_stream: volume: {}, path: {}, offset: {}, length: {}",
volume, path, offset, length
);
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
if let Err(e) = utils::fs::access(&volume_dir).await {
@@ -1587,7 +1603,7 @@ impl DiskAPI for LocalDisk {
f.seek(SeekFrom::Start(offset as u64)).await?;
Ok(FileReader::Local(LocalFileReader::new(f)))
Ok(FileReader::Local(f))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result<Vec<String>> {

View File

@@ -1,6 +1,7 @@
pub mod endpoint;
pub mod error;
pub mod format;
pub mod io;
pub mod local;
pub mod os;
pub mod remote;
@@ -14,10 +15,8 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json";
pub const STORAGE_FORMAT_FILE: &str = "xl.meta";
pub const STORAGE_FORMAT_FILE_BACKUP: &str = "xl.meta.bkp";
use crate::utils::proto_err_to_err;
use crate::{
bucket::{metadata_sys::get_versioning_config, versioning::VersioningApi},
erasure::Writer,
error::{Error, Result},
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion, VersionType},
heal::{
@@ -28,28 +27,16 @@ use crate::{
store_api::{FileInfo, ObjectInfo, RawFileInfo},
utils::path::SLASH_SEPARATOR,
};
use endpoint::Endpoint;
use error::DiskError;
use futures::StreamExt;
use io::{FileReader, FileWriter};
use local::LocalDisk;
use madmin::info_commands::DiskMetrics;
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, WriteRequest, WriteResponse};
use remote::RemoteDisk;
use serde::{Deserialize, Serialize};
use std::io::Read as _;
use std::pin::Pin;
use std::task::Poll;
use std::{any::Any, cmp::Ordering, fmt::Debug, io::Cursor, path::PathBuf, sync::Arc};
use std::{cmp::Ordering, fmt::Debug, path::PathBuf, sync::Arc};
use time::OffsetDateTime;
use tokio::io::AsyncRead;
use tokio::{
fs::File,
io::{AsyncWrite, AsyncWriteExt},
sync::mpsc::{self, Sender},
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{service::interceptor::InterceptedService, transport::Channel, Request, Status, Streaming};
use tokio::{io::AsyncWrite, sync::mpsc::Sender};
use tracing::info;
use tracing::warn;
use uuid::Uuid;
@@ -1256,164 +1243,142 @@ pub struct ReadOptions {
// }
// }
#[derive(Debug)]
pub enum FileWriter {
Local(LocalFileWriter),
Remote(RemoteFileWriter),
Buffer(BufferWriter),
}
// #[derive(Debug)]
// pub struct BufferWriter {
// pub inner: Vec<u8>,
// }
#[async_trait::async_trait]
impl Writer for FileWriter {
fn as_any(&self) -> &dyn Any {
self
}
// impl BufferWriter {
// pub fn new(inner: Vec<u8>) -> Self {
// Self { inner }
// }
// #[allow(clippy::should_implement_trait)]
// pub fn as_ref(&self) -> &[u8] {
// self.inner.as_ref()
// }
// }
async fn write(&mut self, buf: &[u8]) -> Result<()> {
match self {
Self::Local(writer) => writer.write(buf).await,
Self::Remote(writter) => writter.write(buf).await,
Self::Buffer(writer) => writer.write(buf).await,
}
}
}
// #[async_trait::async_trait]
// impl Writer for BufferWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
#[derive(Debug)]
pub struct BufferWriter {
pub inner: Vec<u8>,
}
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let _ = self.inner.write(buf).await?;
// self.inner.flush().await?;
impl BufferWriter {
pub fn new(inner: Vec<u8>) -> Self {
Self { inner }
}
#[allow(clippy::should_implement_trait)]
pub fn as_ref(&self) -> &[u8] {
self.inner.as_ref()
}
}
// Ok(())
// }
// }
#[async_trait::async_trait]
impl Writer for BufferWriter {
fn as_any(&self) -> &dyn Any {
self
}
// #[derive(Debug)]
// pub struct LocalFileWriter {
// pub inner: File,
// }
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let _ = self.inner.write(buf).await?;
self.inner.flush().await?;
// impl LocalFileWriter {
// pub fn new(inner: File) -> Self {
// Self { inner }
// }
// }
Ok(())
}
}
// #[async_trait::async_trait]
// impl Writer for LocalFileWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
#[derive(Debug)]
pub struct LocalFileWriter {
pub inner: File,
}
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let _ = self.inner.write(buf).await?;
// self.inner.flush().await?;
impl LocalFileWriter {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
// Ok(())
// }
// }
#[async_trait::async_trait]
impl Writer for LocalFileWriter {
fn as_any(&self) -> &dyn Any {
self
}
// type NodeClient = NodeServiceClient<
// InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
// >;
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let _ = self.inner.write(buf).await?;
self.inner.flush().await?;
// #[derive(Debug)]
// pub struct RemoteFileWriter {
// pub endpoint: Endpoint,
// pub volume: String,
// pub path: String,
// pub is_append: bool,
// tx: Sender<WriteRequest>,
// resp_stream: Streaming<WriteResponse>,
// }
Ok(())
}
}
// impl RemoteFileWriter {
// pub async fn new(endpoint: Endpoint, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result<Self> {
// let (tx, rx) = mpsc::channel(128);
// let in_stream = ReceiverStream::new(rx);
type NodeClient = NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>;
// let response = client.write_stream(in_stream).await.unwrap();
#[derive(Debug)]
pub struct RemoteFileWriter {
pub endpoint: Endpoint,
pub volume: String,
pub path: String,
pub is_append: bool,
tx: Sender<WriteRequest>,
resp_stream: Streaming<WriteResponse>,
}
// let resp_stream = response.into_inner();
impl RemoteFileWriter {
pub async fn new(endpoint: Endpoint, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
// Ok(Self {
// endpoint,
// volume,
// path,
// is_append,
// tx,
// resp_stream,
// })
// }
// }
let response = client.write_stream(in_stream).await.unwrap();
// #[async_trait::async_trait]
// impl Writer for RemoteFileWriter {
// fn as_any(&self) -> &dyn Any {
// self
// }
let resp_stream = response.into_inner();
// async fn write(&mut self, buf: &[u8]) -> Result<()> {
// let request = WriteRequest {
// disk: self.endpoint.to_string(),
// volume: self.volume.to_string(),
// path: self.path.to_string(),
// is_append: self.is_append,
// data: buf.to_vec(),
// };
// self.tx.send(request).await?;
Ok(Self {
endpoint,
volume,
path,
is_append,
tx,
resp_stream,
})
}
}
// if let Some(resp) = self.resp_stream.next().await {
// // match resp {
// // Ok(resp) => {
// // if resp.success {
// // info!("write stream success");
// // } else {
// // info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string()));
// // }
// // }
// // Err(_err) => {
#[async_trait::async_trait]
impl Writer for RemoteFileWriter {
fn as_any(&self) -> &dyn Any {
self
}
// // }
// // }
// let resp = resp?;
// if resp.success {
// info!("write stream success");
// } else {
// return if let Some(err) = &resp.error {
// Err(proto_err_to_err(err))
// } else {
// Err(Error::from_string(""))
// };
// }
// } else {
// let error_info = "can not get response";
// info!("write stream failed: {}", error_info);
// return Err(Error::from_string(error_info));
// }
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let request = WriteRequest {
disk: self.endpoint.to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
is_append: self.is_append,
data: buf.to_vec(),
};
self.tx.send(request).await?;
if let Some(resp) = self.resp_stream.next().await {
// match resp {
// Ok(resp) => {
// if resp.success {
// info!("write stream success");
// } else {
// info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string()));
// }
// }
// Err(_err) => {
// }
// }
let resp = resp?;
if resp.success {
info!("write stream success");
} else {
return if let Some(err) = &resp.error {
Err(proto_err_to_err(err))
} else {
Err(Error::from_string(""))
};
}
} else {
let error_info = "can not get response";
info!("write stream failed: {}", error_info);
return Err(Error::from_string(error_info));
}
Ok(())
}
}
// Ok(())
// }
// }
// #[async_trait::async_trait]
// pub trait Reader {
@@ -1422,29 +1387,6 @@ impl Writer for RemoteFileWriter {
// // async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize>;
// }
#[derive(Debug)]
pub enum FileReader {
Local(LocalFileReader),
// Remote(RemoteFileReader),
Buffer(BufferReader),
Http(HttpFileReader),
}
impl AsyncRead for FileReader {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
match &mut *self {
Self::Local(reader) => Pin::new(&mut reader.inner).poll_read(cx, buf),
Self::Buffer(reader) => Pin::new(&mut reader.inner).poll_read(cx, buf),
Self::Http(reader) => Pin::new(reader).poll_read(cx, buf),
}
}
}
// #[async_trait::async_trait]
// impl Reader for FileReader {
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
@@ -1471,44 +1413,44 @@ impl AsyncRead for FileReader {
// // }
// }
#[derive(Debug)]
pub struct BufferReader {
pub inner: Cursor<Vec<u8>>,
remaining: usize,
}
// #[derive(Debug)]
// pub struct BufferReader {
// pub inner: Cursor<Vec<u8>>,
// remaining: usize,
// }
impl BufferReader {
pub fn new(inner: Vec<u8>, offset: usize, read_length: usize) -> Self {
let mut cur = Cursor::new(inner);
cur.set_position(offset as u64);
Self {
inner: cur,
remaining: offset + read_length,
}
}
}
// impl BufferReader {
// pub fn new(inner: Vec<u8>, offset: usize, read_length: usize) -> Self {
// let mut cur = Cursor::new(inner);
// cur.set_position(offset as u64);
// Self {
// inner: cur,
// remaining: offset + read_length,
// }
// }
// }
impl AsyncRead for BufferReader {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(_)) => {
if self.inner.position() as usize >= self.remaining {
self.remaining -= buf.filled().len();
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
// impl AsyncRead for BufferReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// fn poll_read(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &mut tokio::io::ReadBuf<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// match Pin::new(&mut self.inner).poll_read(cx, buf) {
// Poll::Ready(Ok(_)) => {
// if self.inner.position() as usize >= self.remaining {
// self.remaining -= buf.filled().len();
// Poll::Ready(Ok(()))
// } else {
// Poll::Pending
// }
// }
// Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
// Poll::Pending => Poll::Pending,
// }
// }
// }
// #[async_trait::async_trait]
// impl Reader for BufferReader {
@@ -1537,17 +1479,17 @@ impl AsyncRead for BufferReader {
// // }
// }
#[derive(Debug)]
pub struct LocalFileReader {
pub inner: File,
// pos: usize,
}
// #[derive(Debug)]
// pub struct LocalFileReader {
// pub inner: File,
// // pos: usize,
// }
impl LocalFileReader {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
// impl LocalFileReader {
// pub fn new(inner: File) -> Self {
// Self { inner }
// }
// }
// #[async_trait::async_trait]
// impl Reader for LocalFileReader {
@@ -1579,16 +1521,16 @@ impl LocalFileReader {
// // }
// }
impl AsyncRead for LocalFileReader {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
// impl AsyncRead for LocalFileReader {
// #[tracing::instrument(level = "debug", skip(self, buf))]
// fn poll_read(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &mut tokio::io::ReadBuf<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_read(cx, buf)
// }
// }
// #[derive(Debug)]
// pub struct RemoteFileReader {
@@ -1670,85 +1612,3 @@ impl AsyncRead for LocalFileReader {
// unimplemented!("poll_read")
// }
// }
#[derive(Debug)]
pub struct HttpFileReader {
// client: reqwest::Client,
// url: String,
// disk: String,
// volume: String,
// path: String,
// offset: usize,
// length: usize,
inner: reqwest::blocking::Response,
// buf: Vec<u8>,
pos: usize,
}
impl HttpFileReader {
pub async fn new(url: &str, disk: &str, volume: &str, path: &str, offset: usize, length: usize) -> Result<Self> {
let client = reqwest::blocking::Client::new();
let resp = client
.get(format!(
"{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
url, disk, volume, path, offset, length
))
.send()?;
Ok(Self {
// client: reqwest::Client::new(),
// url: url.to_string(),
// disk: disk.to_string(),
// volume: volume.to_string(),
// path: path.to_string(),
// offset,
// length,
inner: resp,
// buf: Vec::new(),
pos: 0,
})
}
// pub async fn get_response(&self) -> Result<&Response, std::io::Error> {
// if let Some(resp) = self.inner.get() {
// return Ok(resp);
// } else {
// let client = reqwest::Client::new();
// let resp = client
// .get(&format!(
// "{}/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}",
// self.url, self.disk, self.volume, self.path, self.offset, self.length
// ))
// .send()
// .await
// .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
// self.inner.set(resp);
// Ok(self.inner.get().unwrap())
// }
// }
}
impl AsyncRead for HttpFileReader {
#[tracing::instrument(level = "debug", skip(self, buf))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
let buf = buf.initialize_unfilled();
self.inner.read_exact(buf)?;
self.pos += buf.len();
Poll::Ready(Ok(()))
}
}
// impl Reader for HttpFileReader {
// async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
// if self.pos != offset {
// self.inner.seek(SeekFrom::Start(offset as u64))?;
// self.pos = offset;
// }
// let bytes_read = self.inner.read(buf)?;
// self.pos += bytes_read;
// Ok(bytes_read)
// }
// }

View File

@@ -22,8 +22,8 @@ use tracing::info;
use uuid::Uuid;
use super::{
endpoint::Endpoint, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption,
FileInfoVersions, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileWriter, RenameDataResp,
endpoint::Endpoint, io::HttpFileReader, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation,
DiskOption, FileInfoVersions, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp,
UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
};
use crate::{
@@ -36,7 +36,7 @@ use crate::{
},
store_api::{FileInfo, RawFileInfo},
};
use crate::{disk::HttpFileReader, utils::proto_err_to_err};
use crate::{disk::io::HttpFileWriter, utils::proto_err_to_err};
use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter};
use protos::proto_gen::node_service::RenamePartRequst;
@@ -286,6 +286,7 @@ impl DiskAPI for RemoteDisk {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
info!("rename_file");
let mut client = node_service_time_out_client(&self.addr)
@@ -312,58 +313,55 @@ impl DiskAPI for RemoteDisk {
Ok(())
}
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
#[tracing::instrument(level = "debug", skip(self))]
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, file_size: usize) -> Result<FileWriter> {
info!("create_file");
Ok(FileWriter::Remote(
RemoteFileWriter::new(
self.endpoint.clone(),
volume.to_string(),
path.to_string(),
false,
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?,
)
.await?,
))
Ok(FileWriter::Http(HttpFileWriter::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
file_size,
false,
)?))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
Ok(FileWriter::Remote(
RemoteFileWriter::new(
self.endpoint.clone(),
volume.to_string(),
path.to_string(),
true,
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?,
)
.await?,
))
Ok(FileWriter::Http(HttpFileWriter::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
0,
true,
)?))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(FileReader::Http(
HttpFileReader::new(self.endpoint.grid_host().as_str(), self.endpoint.to_string().as_str(), volume, path, 0, 0)
.await?,
))
Ok(FileReader::Http(HttpFileReader::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
0,
0,
)?))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<FileReader> {
Ok(FileReader::Http(
HttpFileReader::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
offset,
length,
)
.await?,
))
Ok(FileReader::Http(HttpFileReader::new(
self.endpoint.grid_host().as_str(),
self.endpoint.to_string().as_str(),
volume,
path,
offset,
length,
)?))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {

View File

@@ -1,6 +1,6 @@
use std::{
collections::{HashMap, HashSet},
io::Write,
io::{Cursor, Write},
path::Path,
sync::Arc,
time::Duration,
@@ -14,10 +14,10 @@ use crate::{
endpoint::Endpoint,
error::{is_all_not_found, DiskError},
format::FormatV3,
new_disk, BufferReader, BufferWriter, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskOption,
DiskStore, FileInfoVersions, FileReader, FileWriter, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams,
ReadMultipleReq, ReadMultipleResp, ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET,
RUSTFS_META_TMP_BUCKET,
io::{FileReader, FileWriter},
new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions,
MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ReadMultipleReq, ReadMultipleResp, ReadOptions,
UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET,
},
erasure::Erasure,
error::{Error, Result},
@@ -1361,7 +1361,7 @@ impl SetDisks {
for (i, opdisk) in disks.iter().enumerate() {
if let Some(disk) = opdisk {
if disk.is_online().await && disk.get_disk_location().set_idx.is_some() {
info!("Disk {:?} is online", disk);
info!("Disk {:?} is online", disk.to_string());
continue;
}
@@ -2452,7 +2452,7 @@ impl SetDisks {
if let Some(disk) = disk {
let filewriter = {
if is_inline_buffer {
FileWriter::Buffer(BufferWriter::new(Vec::new()))
FileWriter::Buffer(Cursor::new(Vec::new()))
} else {
let disk = disk.clone();
let part_path = format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number);
@@ -2501,7 +2501,7 @@ impl SetDisks {
if let Some(ref writer) = writers[index] {
if let Some(w) = writer.as_any().downcast_ref::<BitrotFileWriter>() {
if let FileWriter::Buffer(buffer_writer) = w.writer() {
parts_metadata[index].data = Some(buffer_writer.as_ref().to_vec());
parts_metadata[index].data = Some(buffer_writer.clone().into_inner());
}
}
}
@@ -3744,7 +3744,7 @@ impl ObjectIO for SetDisks {
if let Some(disk) = disk_op {
let filewriter = {
if is_inline_buffer {
FileWriter::Buffer(BufferWriter::new(Vec::new()))
FileWriter::Buffer(Cursor::new(Vec::new()))
} else {
let disk = disk.clone();
@@ -3760,6 +3760,8 @@ impl ObjectIO for SetDisks {
}
}
warn!("put_object data.content_length {}", data.content_length);
// TODO: etag from header
let mut etag_stream = EtagReader::new(&mut data.stream, None, None);
@@ -3789,7 +3791,7 @@ impl ObjectIO for SetDisks {
if let Some(ref writer) = writers[i] {
if let Some(w) = writer.as_any().downcast_ref::<BitrotFileWriter>() {
if let FileWriter::Buffer(buffer_writer) = w.writer() {
fi.data = Some(buffer_writer.as_ref().to_vec());
fi.data = Some(buffer_writer.clone().into_inner());
}
}
}
@@ -4294,6 +4296,7 @@ impl StorageAPI for SetDisks {
unimplemented!()
}
#[tracing::instrument(level = "debug", skip(self, data, opts))]
async fn put_object_part(
&self,
bucket: &str,
@@ -5245,7 +5248,7 @@ async fn disks_with_all_parts(
let checksum_info = meta.erasure.get_checksum_info(meta.parts[0].number);
let data_len = data.len();
let verify_err = match bitrot_verify(
FileReader::Buffer(BufferReader::new(data.clone(), 0, data_len)),
FileReader::Buffer(Cursor::new(data.clone())),
data_len,
meta.erasure.shard_file_size(meta.size),
checksum_info.algorithm,

View File

@@ -53,6 +53,7 @@ pub async fn connect_load_init_formats(
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3, Error> {
warn!("connect_load_init_formats first_disk: {}", first_disk);
let (formats, errs) = load_format_erasure_all(disks, false).await;
debug!("load_format_erasure_all errs {:?}", &errs);
@@ -63,12 +64,13 @@ pub async fn connect_load_init_formats(
if first_disk && DiskError::should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
warn!("first_disk && should_init_erasure_disks");
// new format and save
let fms = init_format_erasure(disks, set_count, set_drive_count, deployment_id);
let errs = save_format_file_all(disks, &fms).await;
debug!("save_format_file_all errs {:?}", &errs);
warn!("save_format_file_all errs {:?}", &errs);
// TODO: check quorum
// reduceWriteQuorumErrs(&errs)?;
@@ -77,6 +79,12 @@ pub async fn connect_load_init_formats(
return Ok(fm);
}
warn!(
"first_disk: {}, should_init_erasure_disks: {}",
first_disk,
DiskError::should_init_erasure_disks(&errs)
);
let unformatted = DiskError::quorum_unformatted_disks(&errs);
if unformatted && !first_disk {
return Err(Error::new(ErasureError::NotFirstDisk));

View File

@@ -99,7 +99,7 @@ fn get_fs_type(fs_type: FsType) -> &'static str {
match fs_type {
statfs::TMPFS_MAGIC => "TMPFS",
statfs::MSDOS_SUPER_MAGIC => "MSDOS",
statfs::XFS_SUPER_MAGIC => "XFS",
// statfs::XFS_SUPER_MAGIC => "XFS",
statfs::NFS_SUPER_MAGIC => "NFS",
statfs::EXT4_SUPER_MAGIC => "EXT4",
statfs::ECRYPTFS_SUPER_MAGIC => "ecryptfs",

View File

@@ -61,7 +61,6 @@ tracing-subscriber.workspace = true
transform-stream.workspace = true
uuid = "1.15.1"
url.workspace = true
admin = { path = "../api/admin" }
axum.workspace = true
matchit = "0.8.6"
shadow-rs.workspace = true

View File

@@ -3,9 +3,10 @@ use super::router::Operation;
use super::router::S3Router;
use crate::storage::ecfs::bytes_stream;
use common::error::Result;
use ecstore::disk::io::FileReader;
use ecstore::disk::DiskAPI;
use ecstore::disk::FileReader;
use ecstore::store::find_local_disk;
use futures::TryStreamExt;
use http::StatusCode;
use hyper::Method;
use matchit::Params;
@@ -17,6 +18,7 @@ use s3s::S3Response;
use s3s::S3Result;
use serde_urlencoded::from_bytes;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::warn;
pub const RPC_PREFIX: &str = "/rustfs/rpc";
@@ -28,6 +30,12 @@ pub fn regist_rpc_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
AdminOperation(&ReadFile {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", RPC_PREFIX, "/put_file_stream").as_str(),
AdminOperation(&PutFile {}),
)?;
Ok(())
}
@@ -49,7 +57,7 @@ impl Operation for ReadFile {
let query = {
if let Some(query) = req.uri.query() {
let input: ReadFileQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed1"))?;
from_bytes(query.as_bytes()).map_err(|e| s3_error!(InvalidArgument, "get query failed1 {:?}", e))?;
input
} else {
ReadFileQuery::default()
@@ -95,3 +103,56 @@ impl Operation for ReadFile {
// Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::empty())))
}
}
// /rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}"
#[derive(Debug, Default, serde::Deserialize)]
pub struct PutFileQuery {
disk: String,
volume: String,
path: String,
append: bool,
size: usize,
}
pub struct PutFile {}
#[async_trait::async_trait]
impl Operation for PutFile {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle PutFile");
let query = {
if let Some(query) = req.uri.query() {
let input: PutFileQuery =
from_bytes(query.as_bytes()).map_err(|e| s3_error!(InvalidArgument, "get query failed1 {:?}", e))?;
input
} else {
PutFileQuery::default()
}
};
let Some(disk) = find_local_disk(&query.disk).await else {
return Err(s3_error!(InvalidArgument, "disk not found"));
};
let mut file = if query.append {
disk.append_file(&query.volume, &query.path)
.await
.map_err(|e| s3_error!(InternalError, "append file err {}", e))?
} else {
disk.create_file("", &query.volume, &query.path, query.size)
.await
.map_err(|e| s3_error!(InternalError, "read file err {}", e))?
};
let mut body = StreamReader::new(
req.input
.into_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);
tokio::io::copy(&mut body, &mut file)
.await
.map_err(|e| s3_error!(InternalError, "copy err {}", e))?;
Ok(S3Response::new((StatusCode::OK, Body::empty())))
}
}

View File

@@ -1,9 +1,4 @@
use std::{
collections::HashMap,
error::Error,
io::{Cursor, ErrorKind},
pin::Pin,
};
use std::{collections::HashMap, io::Cursor, pin::Pin};
use ecstore::{
admin_server_info::get_local_server_property,
@@ -11,7 +6,6 @@ use ecstore::{
disk::{
DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, UpdateMetadataOpts,
},
erasure::Writer,
error::Error as EcsError,
heal::{
data_usage_cache::DataUsageCache,
@@ -50,25 +44,25 @@ use tracing::{debug, error, info};
type ResponseStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send>>;
fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;
// fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
// let mut err: &(dyn Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// loop {
// if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
// return Some(io_err);
// }
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
// // h2::Error do not expose std::io::Error with `source()`
// // https://github.com/hyperium/h2/pull/462
// if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
// if let Some(io_err) = h2_err.get_io() {
// return Some(io_err);
// }
// }
err = err.source()?;
}
}
// err = err.source()?;
// }
// }
#[derive(Debug)]
pub struct NodeService {
@@ -558,138 +552,144 @@ impl Node for NodeService {
}
}
async fn write(&self, request: Request<WriteRequest>) -> Result<Response<WriteResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_writer = if request.is_append {
disk.append_file(&request.volume, &request.path).await
} else {
disk.create_file("", &request.volume, &request.path, 0).await
};
async fn write(&self, _request: Request<WriteRequest>) -> Result<Response<WriteResponse>, Status> {
unimplemented!("write");
// let request = request.into_inner();
// if let Some(disk) = self.find_disk(&request.disk).await {
// let file_writer = if request.is_append {
// disk.append_file(&request.volume, &request.path).await
// } else {
// disk.create_file("", &request.volume, &request.path, 0).await
// };
match file_writer {
Ok(mut file_writer) => match file_writer.write(&request.data).await {
Ok(_) => Ok(tonic::Response::new(WriteResponse {
success: true,
error: None,
})),
Err(err) => Ok(tonic::Response::new(WriteResponse {
success: false,
error: Some(err_to_proto_err(&err, &format!("write failed: {}", err))),
})),
},
Err(err) => Ok(tonic::Response::new(WriteResponse {
success: false,
error: Some(err_to_proto_err(&err, &format!("get writer failed: {}", err))),
})),
}
} else {
Ok(tonic::Response::new(WriteResponse {
success: false,
error: Some(err_to_proto_err(
&EcsError::new(StorageError::InvalidArgument(Default::default(), Default::default(), Default::default())),
"can not find disk",
)),
}))
}
// match file_writer {
// Ok(mut file_writer) => match file_writer.write(&request.data).await {
// Ok(_) => Ok(tonic::Response::new(WriteResponse {
// success: true,
// error: None,
// })),
// Err(err) => Ok(tonic::Response::new(WriteResponse {
// success: false,
// error: Some(err_to_proto_err(&err, &format!("write failed: {}", err))),
// })),
// },
// Err(err) => Ok(tonic::Response::new(WriteResponse {
// success: false,
// error: Some(err_to_proto_err(&err, &format!("get writer failed: {}", err))),
// })),
// }
// } else {
// Ok(tonic::Response::new(WriteResponse {
// success: false,
// error: Some(err_to_proto_err(
// &EcsError::new(StorageError::InvalidArgument(Default::default(), Default::default(), Default::default())),
// "can not find disk",
// )),
// }))
// }
}
type WriteStreamStream = ResponseStream<WriteResponse>;
async fn write_stream(&self, request: Request<Streaming<WriteRequest>>) -> Result<Response<Self::WriteStreamStream>, Status> {
async fn write_stream(
&self,
_request: Request<Streaming<WriteRequest>>,
) -> Result<Response<Self::WriteStreamStream>, Status> {
info!("write_stream");
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
unimplemented!("write_stream");
tokio::spawn(async move {
let mut file_ref = None;
while let Some(result) = in_stream.next().await {
match result {
// Ok(v) => tx
// .send(Ok(EchoResponse { message: v.message }))
// .await
// .expect("working rx"),
Ok(v) => {
match file_ref.as_ref() {
Some(_) => (),
None => {
if let Some(disk) = find_local_disk(&v.disk).await {
let file_writer = if v.is_append {
disk.append_file(&v.volume, &v.path).await
} else {
disk.create_file("", &v.volume, &v.path, 0).await
};
// let mut in_stream = request.into_inner();
// let (tx, rx) = mpsc::channel(128);
match file_writer {
Ok(file_writer) => file_ref = Some(file_writer),
Err(err) => {
tx.send(Ok(WriteResponse {
success: false,
error: Some(err_to_proto_err(
&err,
&format!("get get file writer failed: {}", err),
)),
}))
.await
.expect("working rx");
break;
}
}
} else {
tx.send(Ok(WriteResponse {
success: false,
error: Some(err_to_proto_err(
&EcsError::new(StorageError::InvalidArgument(
Default::default(),
Default::default(),
Default::default(),
)),
"can not find disk",
)),
}))
.await
.expect("working rx");
break;
}
}
};
// tokio::spawn(async move {
// let mut file_ref = None;
// while let Some(result) = in_stream.next().await {
// match result {
// // Ok(v) => tx
// // .send(Ok(EchoResponse { message: v.message }))
// // .await
// // .expect("working rx"),
// Ok(v) => {
// match file_ref.as_ref() {
// Some(_) => (),
// None => {
// if let Some(disk) = find_local_disk(&v.disk).await {
// let file_writer = if v.is_append {
// disk.append_file(&v.volume, &v.path).await
// } else {
// disk.create_file("", &v.volume, &v.path, 0).await
// };
match file_ref.as_mut().unwrap().write(&v.data).await {
Ok(_) => tx.send(Ok(WriteResponse {
success: true,
error: None,
})),
Err(err) => tx.send(Ok(WriteResponse {
success: false,
error: Some(err_to_proto_err(&err, &format!("write failed: {}", err))),
})),
}
.await
.unwrap();
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
// match file_writer {
// Ok(file_writer) => file_ref = Some(file_writer),
// Err(err) => {
// tx.send(Ok(WriteResponse {
// success: false,
// error: Some(err_to_proto_err(
// &err,
// &format!("get get file writer failed: {}", err),
// )),
// }))
// .await
// .expect("working rx");
// break;
// }
// }
// } else {
// tx.send(Ok(WriteResponse {
// success: false,
// error: Some(err_to_proto_err(
// &EcsError::new(StorageError::InvalidArgument(
// Default::default(),
// Default::default(),
// Default::default(),
// )),
// "can not find disk",
// )),
// }))
// .await
// .expect("working rx");
// break;
// }
// }
// };
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was dropped
}
}
}
}
println!("\tstream ended");
});
// match file_ref.as_mut().unwrap().write(&v.data).await {
// Ok(_) => tx.send(Ok(WriteResponse {
// success: true,
// error: None,
// })),
// Err(err) => tx.send(Ok(WriteResponse {
// success: false,
// error: Some(err_to_proto_err(&err, &format!("write failed: {}", err))),
// })),
// }
// .await
// .unwrap();
// }
// Err(err) => {
// if let Some(io_err) = match_for_io_error(&err) {
// if io_err.kind() == ErrorKind::BrokenPipe {
// // here you can handle special case when client
// // disconnected in unexpected way
// eprintln!("\tclient disconnected: broken pipe");
// break;
// }
// }
let out_stream = ReceiverStream::new(rx);
// match tx.send(Err(err)).await {
// Ok(_) => (),
// Err(_err) => break, // response was dropped
// }
// }
// }
// }
// println!("\tstream ended");
// });
Ok(tonic::Response::new(Box::pin(out_stream)))
// let out_stream = ReceiverStream::new(rx);
// Ok(tonic::Response::new(Box::pin(out_stream)))
}
type ReadAtStream = ResponseStream<ReadAtResponse>;

View File

@@ -152,8 +152,8 @@ async fn run(opt: config::Opt) -> Result<()> {
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}, \n{:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line, eps
);
}