merge main

This commit is contained in:
weisd
2024-09-13 13:33:23 +08:00
38 changed files with 6523 additions and 270 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
/target
.DS_Store
.idea
.vscode

733
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,21 +1,43 @@
[workspace]
resolver = "2"
members = ["rustfs", "ecstore"]
members = ["rustfs", "ecstore", "e2e_test", "common/protos"]
[workspace.package]
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/rustfs/rustfs"
rust-version = "1.75"
version = "0.0.1"
[workspace.dependencies]
async-trait = "0.1.80"
backon = "1.2.0"
bytes = "1.6.0"
clap = { version = "4.5.7", features = ["derive"] }
ecstore = { path = "./ecstore" }
flatbuffers = "24.3.25"
futures = "0.3.30"
futures-util = "0.3.30"
hyper = "1.3.1"
hyper-util = { version = "0.1.5", features = [
"tokio",
"server-auto",
"server-graceful",
] }
http = "1.1.0"
http-body = "1.0.0"
mime = "0.3.17"
netif = "0.1.6"
pin-project-lite = "0.2"
# pin-utils = "0.1.0"
prost = "0.13.1"
prost-build = "0.13.1"
prost-types = "0.13.1"
protobuf = "3.2"
protos = { path = "./common/protos" }
s3s = { version = "0.10.1", default-features = true, features = ["tower"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tracing = "0.1.40"
tracing-error = "0.2.0"
futures = "0.3.30"
bytes = "1.6.0"
http = "1.1.0"
thiserror = "1.0.61"
time = { version = "0.3.36", features = [
"std",
@@ -24,6 +46,12 @@ time = { version = "0.3.36", features = [
"macros",
"serde",
] }
async-trait = "0.1.80"
tokio = { version = "1.38.0", features = ["fs"] }
futures-util = "0.3.30"
tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-build = "0.12.1"
tonic-reflection = "0.12"
tower = { version = "0.4.13", features = ["timeout"] }
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
transform-stream = "0.3.0"

21
TODO.md
View File

@@ -34,9 +34,22 @@
- [ ] 对象锁
- [ ] 复制 CopyObject
- [ ] 详情 HeadObject
- [ ] 对象预先签名get、put、head、post
## 扩展功能
- [ ] 版本控制
- [ ] 对象锁
- [ ] 修复
- [ ] 用户管理
- [ ] Policy管理
- [ ] AK/SK分配管理
- [ ] data scanner统计和对象修复
- [ ] 桶配额
- [ ] 桶只读
- [ ] 桶复制
- [ ] 桶事件通知
- [ ] 桶公开、桶私有
- [ ] 对象生命周期管理
- [ ] prometheus对接
- [ ] 日志收集和日志外发
- [ ] 对象压缩
- [ ] STS
- [ ] 分层阿里云、腾讯云、S3远程对接

17
common/protos/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "protos"
version.workspace = true
edition.workspace = true
[dependencies]
#async-backtrace = { workspace = true, optional = true }
flatbuffers = { workspace = true }
prost = { workspace = true }
protobuf = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "tls"] }
tower = { workspace = true }
[build-dependencies]
prost-build = { workspace = true }
tonic-build = { workspace = true }

262
common/protos/build.rs Normal file
View File

@@ -0,0 +1,262 @@
use std::{
cmp, env, fs,
io::Write,
path::{Path, PathBuf},
process::Command,
};
type AnyError = Box<dyn std::error::Error>;
const ENV_OUT_DIR: &str = "OUT_DIR";
const VERSION_PROTOBUF: Version = Version(27, 0, 0); // 27.0
const VERSION_FLATBUFFERS: Version = Version(24, 3, 25); // 24.3.25
/// Build protos if the major version of `flatc` or `protoc` is greater
/// or lesser than the expected version.
const ENV_BUILD_PROTOS: &str = "BUILD_PROTOS";
/// Path of `flatc` binary.
const ENV_FLATC_PATH: &str = "FLATC_PATH";
fn main() -> Result<(), AnyError> {
let version = protobuf_compiler_version()?;
let need_compile = match version.compare_ext(&VERSION_PROTOBUF) {
Ok(cmp::Ordering::Equal) => true,
Ok(_) => {
let version_err = Version::build_error_message(&version, &VERSION_PROTOBUF).unwrap();
println!("cargo:warning=Tool `protoc` {version_err}, skip compiling.");
false
}
Err(version_err) => {
// return Err(format!("Tool `protoc` {version_err}, please update it.").into());
println!("cargo:warning=Tool `protoc` {version_err}, please update it.");
false
}
};
if !need_compile {
return Ok(());
}
// path of proto file
let project_root_dir = env::current_dir()?;
let proto_dir = project_root_dir.join("src");
let proto_files = &["node.proto"];
let proto_out_dir = project_root_dir.join("src").join("generated").join("proto_gen");
let flatbuffer_out_dir = project_root_dir.join("src").join("generated").join("flatbuffers_generated");
let descriptor_set_path = PathBuf::from(env::var(ENV_OUT_DIR).unwrap()).join("proto-descriptor.bin");
tonic_build::configure()
.out_dir(proto_out_dir)
.file_descriptor_set_path(descriptor_set_path)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_well_known_types(true)
.emit_rerun_if_changed(false)
.compile(proto_files, &[proto_dir.clone()])
.map_err(|e| format!("Failed to generate protobuf file: {e}."))?;
// protos/gen/mod.rs
let generated_mod_rs_path = project_root_dir
.join("src")
.join("generated")
.join("proto_gen")
.join("mod.rs");
let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?;
writeln!(&mut generated_mod_rs, "pub mod node_service;")?;
generated_mod_rs.flush()?;
let generated_mod_rs_path = project_root_dir.join("src").join("generated").join("mod.rs");
let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?;
writeln!(&mut generated_mod_rs, "#![allow(unused_imports)]")?;
writeln!(&mut generated_mod_rs, "#![allow(clippy::all)]")?;
writeln!(&mut generated_mod_rs, "pub mod proto_gen;")?;
generated_mod_rs.flush()?;
let flatc_path = match env::var(ENV_FLATC_PATH) {
Ok(path) => {
println!("cargo:warning=Specified flatc path by environment {ENV_FLATC_PATH}={path}");
path
}
Err(_) => "flatc".to_string(),
};
// build src/protos/*.fbs files to src/protos/gen/
compile_flatbuffers_models(
&mut generated_mod_rs,
&flatc_path,
proto_dir.clone(),
flatbuffer_out_dir.clone(),
vec!["models"],
)?;
Ok(())
}
/// Compile proto/**.fbs files.
fn compile_flatbuffers_models<P: AsRef<Path>, S: AsRef<str>>(
generated_mod_rs: &mut fs::File,
flatc_path: &str,
in_fbs_dir: P,
out_rust_dir: P,
mod_names: Vec<S>,
) -> Result<(), AnyError> {
let version = flatbuffers_compiler_version(flatc_path)?;
let need_compile = match version.compare_ext(&VERSION_FLATBUFFERS) {
Ok(cmp::Ordering::Equal) => true,
Ok(_) => {
let version_err = Version::build_error_message(&version, &VERSION_FLATBUFFERS).unwrap();
println!("cargo:warning=Tool `{flatc_path}` {version_err}, skip compiling.");
false
}
Err(version_err) => {
return Err(format!("Tool `{flatc_path}` {version_err}, please update it.").into());
}
};
let fbs_dir = in_fbs_dir.as_ref();
let rust_dir = out_rust_dir.as_ref();
fs::create_dir_all(rust_dir)?;
// $rust_dir/mod.rs
let mut sub_mod_rs = fs::File::create(rust_dir.join("mod.rs"))?;
writeln!(generated_mod_rs)?;
writeln!(generated_mod_rs, "mod flatbuffers_generated;")?;
for mod_name in mod_names.iter() {
let mod_name = mod_name.as_ref();
writeln!(generated_mod_rs, "pub use flatbuffers_generated::{mod_name}::*;")?;
writeln!(&mut sub_mod_rs, "pub mod {mod_name};")?;
if need_compile {
let fbs_file_path = fbs_dir.join(format!("{mod_name}.fbs"));
let output = Command::new(flatc_path)
.arg("-o")
.arg(rust_dir)
.arg("--rust")
.arg("--gen-mutable")
.arg("--gen-onefile")
.arg("--gen-name-strings")
.arg("--filename-suffix")
.arg("")
.arg(&fbs_file_path)
.output()
.map_err(|e| format!("Failed to execute process of flatc: {e}"))?;
if !output.status.success() {
return Err(format!(
"Failed to generate file '{}' by flatc(path: '{flatc_path}'): {}.",
fbs_file_path.display(),
String::from_utf8_lossy(&output.stderr),
)
.into());
}
}
}
generated_mod_rs.flush()?;
sub_mod_rs.flush()?;
Ok(())
}
/// Run command `flatc --version` to get the version of flatc.
///
/// ```ignore
/// $ flatc --version
/// flatc version 24.3.25
/// ```
fn flatbuffers_compiler_version(flatc_path: impl AsRef<Path>) -> Result<Version, String> {
let flatc_path = flatc_path.as_ref();
Version::try_get(format!("{}", flatc_path.display()), |output| {
const PREFIX_OF_VERSION: &str = "flatc version ";
let output = output.trim();
if let Some(version) = output.strip_prefix(PREFIX_OF_VERSION) {
Ok(version.to_string())
} else {
Err(format!("Failed to get flatc version: {output}"))
}
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct Version(u32, u32, u32);
impl Version {
fn try_get<F: FnOnce(&str) -> Result<String, String>>(exe: String, output_to_version_string: F) -> Result<Self, String> {
let cmd = format!("{exe} --version");
let output = std::process::Command::new(exe)
.arg("--version")
.output()
.map_err(|e| format!("Failed to execute `{cmd}`: {e}",))?;
let output_utf8 = String::from_utf8(output.stdout).map_err(|e| {
let output_lossy = String::from_utf8_lossy(e.as_bytes());
format!("Command `{cmd}` returned invalid UTF-8('{output_lossy}'): {e}")
})?;
if output.status.success() {
let version_string = output_to_version_string(&output_utf8)?;
Ok(version_string.parse::<Self>()?)
} else {
Err(format!("Failed to get version by command `{cmd}`: {output_utf8}"))
}
}
fn build_error_message(version: &Self, expected: &Self) -> Option<String> {
match version.compare_major_version(expected) {
cmp::Ordering::Equal => None,
cmp::Ordering::Greater => Some(format!("version({version}) is greater than version({expected})")),
cmp::Ordering::Less => Some(format!("version({version}) is lesser than version({expected})")),
}
}
fn compare_ext(&self, expected_version: &Self) -> Result<cmp::Ordering, String> {
match env::var(ENV_BUILD_PROTOS) {
Ok(build_protos) => {
if build_protos.is_empty() || build_protos == "0" {
Ok(self.compare_major_version(expected_version))
} else {
match self.compare_major_version(expected_version) {
cmp::Ordering::Equal => Ok(cmp::Ordering::Equal),
_ => Err(Self::build_error_message(self, expected_version).unwrap()),
}
}
}
Err(_) => Ok(self.compare_major_version(expected_version)),
}
}
fn compare_major_version(&self, other: &Self) -> cmp::Ordering {
self.0.cmp(&other.0)
}
}
impl std::str::FromStr for Version {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut version = [0_u32; 3];
for (i, v) in s.split('.').take(3).enumerate() {
version[i] = v.parse().map_err(|e| format!("Failed to parse version string '{s}': {e}"))?;
}
Ok(Version(version[0], version[1], version[2]))
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.0, self.1, self.2)
}
}
/// Run command `protoc --version` to get the version of flatc.
///
/// ```ignore
/// $ protoc --version
/// libprotoc 27.0
/// ```
fn protobuf_compiler_version() -> Result<Version, String> {
Version::try_get("protoc".to_string(), |output| {
const PREFIX_OF_VERSION: &str = "libprotoc ";
let output = output.trim();
if let Some(version) = output.strip_prefix(PREFIX_OF_VERSION) {
Ok(version.to_string())
} else {
Err(format!("Failed to get protoc version: {output}"))
}
})
}

View File

@@ -0,0 +1 @@
pub mod models;

View File

@@ -0,0 +1,123 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

View File

@@ -0,0 +1,6 @@
#![allow(unused_imports)]
#![allow(clippy::all)]
pub mod proto_gen;
mod flatbuffers_generated;
pub use flatbuffers_generated::models::*;

View File

@@ -0,0 +1 @@
pub mod node_service;

File diff suppressed because it is too large Load Diff

28
common/protos/src/lib.rs Normal file
View File

@@ -0,0 +1,28 @@
mod generated;
use std::time::Duration;
pub use generated::*;
use proto_gen::node_service::node_service_client::NodeServiceClient;
use tonic::{codec::CompressionEncoding, transport::Channel};
use tower::timeout::Timeout;
// Default 100 MB
pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024;
pub fn node_service_time_out_client(
channel: Channel,
time_out: Duration,
max_message_size: usize,
grpc_enable_gzip: bool,
) -> NodeServiceClient<Timeout<Channel>> {
let timeout_channel = Timeout::new(channel, time_out);
let client = NodeServiceClient::<Timeout<Channel>>::new(timeout_channel);
let client = NodeServiceClient::max_decoding_message_size(client, max_message_size);
if grpc_enable_gzip {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
} else {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
}
}

View File

@@ -0,0 +1,5 @@
namespace models;
table PingBody {
payload: [ubyte];
}

View File

@@ -0,0 +1,327 @@
syntax = "proto3";
package node_service;
/* -------------------------------------------------------------------- */
message PingRequest {
uint64 version = 1;
bytes body = 2;
}
message PingResponse {
uint64 version = 1;
bytes body = 2;
}
message ListBucketRequest {
string options = 1;
}
message ListBucketResponse {
bool success = 1;
repeated string bucket_infos = 2;
optional string error_info = 3;
}
message MakeBucketRequest {
string name = 1;
string options = 2;
}
message MakeBucketResponse {
bool success = 1;
optional string error_info = 2;
}
message GetBucketInfoRequest {
string bucket = 1;
string options = 2;
}
message GetBucketInfoResponse {
bool success = 1;
string bucket_info = 2;
optional string error_info = 3;
}
message DeleteBucketRequest {
string bucket = 1;
}
message DeleteBucketResponse {
bool success = 1;
optional string error_info = 2;
}
message ReadAllRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
}
message ReadAllResponse {
bool success = 1;
bytes data = 2;
optional string error_info = 3;
}
message WriteAllRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
bytes data = 4;
}
message WriteAllResponse {
bool success = 1;
optional string error_info = 2;
}
message DeleteRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
string options = 4;
}
message DeleteResponse {
bool success = 1;
optional string error_info = 2;
}
message RenameFileRequst {
string disk = 1;
string src_volume = 2;
string src_path = 3;
string dst_volume = 4;
string dst_path = 5;
}
message RenameFileResponse {
bool success = 1;
optional string error_info = 2;
}
message WriteRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
bool is_append = 4;
bytes data = 5;
}
message WriteResponse {
bool success = 1;
optional string error_info = 2;
}
// message AppendRequest {
// string disk = 1; // indicate which one in the disks
// string volume = 2;
// string path = 3;
// bytes data = 4;
// }
//
// message AppendResponse {
// bool success = 1;
// optional string error_info = 2;
// }
message ReadAtRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
int64 offset = 4;
int64 length = 5;
}
message ReadAtResponse {
bool success = 1;
bytes data = 2;
int64 read_size = 3;
optional string error_info = 4;
}
message ListDirRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
}
message ListDirResponse {
bool success = 1;
repeated string volumes = 2;
optional string error_info = 3;
}
message WalkDirRequest {
string disk = 1; // indicate which one in the disks
string walk_dir_options = 2;
}
message WalkDirResponse {
bool success = 1;
repeated string meta_cache_entry = 2;
optional string error_info = 3;
}
message RenameDataRequest {
string disk = 1; // indicate which one in the disks
string src_volume = 2;
string src_path = 3;
string file_info = 4;
string dst_volume = 5;
string dst_path = 6;
}
message RenameDataResponse {
bool success = 1;
string rename_data_resp = 2;
optional string error_info = 3;
}
message MakeVolumesRequest {
string disk = 1; // indicate which one in the disks
repeated string volumes = 2;
}
message MakeVolumesResponse {
bool success = 1;
optional string error_info = 2;
}
message MakeVolumeRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
}
message MakeVolumeResponse {
bool success = 1;
optional string error_info = 2;
}
message ListVolumesRequest {
string disk = 1; // indicate which one in the disks
}
message ListVolumesResponse {
bool success = 1;
repeated string volume_infos = 2;
optional string error_info = 3;
}
message StatVolumeRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
}
message StatVolumeResponse {
bool success = 1;
string volume_info = 2;
optional string error_info = 3;
}
message WriteMetadataRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
string file_info = 4;
}
message WriteMetadataResponse {
bool success = 1;
optional string error_info = 2;
}
message ReadVersionRequest {
string disk = 1;
string volume = 2;
string path = 3;
string version_id = 4;
string opts = 5;
}
message ReadVersionResponse {
bool success = 1;
string file_info = 2;
optional string error_info = 3;
}
message ReadXLRequest {
string disk = 1;
string volume = 2;
string path = 3;
bool read_data = 4;
}
message ReadXLResponse {
bool success = 1;
string raw_file_info = 2;
optional string error_info = 3;
}
message DeleteVersionsRequest {
string disk = 1;
string volume = 2;
repeated string versions = 3;
string opts = 4;
}
message DeleteVersionsResponse {
bool success = 1;
repeated string errors = 2;
optional string error_info = 3;
}
message ReadMultipleRequest {
string disk = 1;
string read_multiple_req = 2;
}
message ReadMultipleResponse {
bool success = 1;
repeated string read_multiple_resps = 2;
optional string error_info = 3;
}
message DeleteVolumeRequest {
string disk = 1;
string volume = 2;
}
message DeleteVolumeResponse {
bool success = 1;
optional string error_info = 2;
}
/* -------------------------------------------------------------------- */
service NodeService {
/* -------------------------------meta service-------------------------- */
rpc Ping(PingRequest) returns (PingResponse) {};
rpc ListBucket(ListBucketRequest) returns (ListBucketResponse) {};
rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {};
rpc GetBucketInfo(GetBucketInfoRequest) returns (GetBucketInfoResponse) {};
rpc DeleteBucket(DeleteBucketRequest) returns (DeleteBucketResponse) {};
/* -------------------------------disk service-------------------------- */
rpc ReadAll(ReadAllRequest) returns (ReadAllResponse) {};
rpc WriteAll(WriteAllRequest) returns (WriteAllResponse) {};
rpc Delete(DeleteRequest) returns (DeleteResponse) {};
rpc RenameFile(RenameFileRequst) returns (RenameFileResponse) {};
rpc Write(WriteRequest) returns (WriteResponse) {};
// rpc Append(AppendRequest) returns (AppendResponse) {};
rpc ReadAt(ReadAtRequest) returns (ReadAtResponse) {};
rpc ListDir(ListDirRequest) returns (ListDirResponse) {};
rpc WalkDir(WalkDirRequest) returns (WalkDirResponse) {};
rpc RenameData(RenameDataRequest) returns (RenameDataResponse) {};
rpc MakeVolumes(MakeVolumesRequest) returns (MakeVolumesResponse) {};
rpc MakeVolume(MakeVolumeRequest) returns (MakeVolumeResponse) {};
rpc ListVolumes(ListVolumesRequest) returns (ListVolumesResponse) {};
rpc StatVolume(StatVolumeRequest) returns (StatVolumeResponse) {};
rpc WriteMetadata(WriteMetadataRequest) returns (WriteMetadataResponse) {};
rpc ReadVersion(ReadVersionRequest) returns (ReadVersionResponse) {};
rpc ReadXL(ReadXLRequest) returns (ReadXLResponse) {};
rpc DeleteVersions(DeleteVersionsRequest) returns (DeleteVersionsResponse) {};
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
}

17
e2e_test/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "e2e_test"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
protos.workspace = true
serde_json.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }

0
e2e_test/README.md Normal file
View File

1
e2e_test/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
mod reliant;

View File

@@ -0,0 +1 @@
The test cases in this dir need to run the cluster

View File

@@ -0,0 +1 @@
mod node_interact_test;

View File

@@ -0,0 +1,106 @@
#![cfg(test)]
use ecstore::disk::VolumeInfo;
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_client::NodeServiceClient, ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
},
};
use std::error::Error;
use tonic::Request;
async fn get_client() -> Result<NodeServiceClient<tonic::transport::Channel>, Box<dyn Error>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
Ok(NodeServiceClient::connect("http://localhost:9000").await?)
}
#[tokio::test]
async fn ping() -> Result<(), Box<dyn Error>> {
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
// 创建客户端
let mut client = get_client().await?;
// 构造 PingRequest
let request = Request::new(PingRequest {
version: 1,
body: finished_data.to_vec(),
});
// 发送请求并获取响应
let response: PingResponse = client.ping(request).await?.into_inner();
// 打印响应
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{}", e);
} else {
println!("ping_resp:body(flatbuffer): {:?}", ping_response_body);
}
Ok(())
}
#[tokio::test]
async fn make_volume() -> Result<(), Box<dyn Error>> {
let mut client = get_client().await?;
let request = Request::new(MakeVolumeRequest {
disk: "data".to_string(),
volume: "dandan".to_string(),
});
let response = client.make_volume(request).await?.into_inner();
if response.success {
println!("success");
} else {
println!("failed: {:?}", response.error_info);
}
Ok(())
}
#[tokio::test]
async fn list_volumes() -> Result<(), Box<dyn Error>> {
let mut client = get_client().await?;
let request = Request::new(ListVolumesRequest {
disk: "data".to_string(),
});
let response = client.list_volumes(request).await?.into_inner();
let volume_infos: Vec<VolumeInfo> = response
.volume_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<VolumeInfo>(&json_str).ok())
.collect();
println!("{:?}", volume_infos);
Ok(())
}
#[tokio::test]
async fn read_all() -> Result<(), Box<dyn Error>> {
let mut client = get_client().await?;
let request = Request::new(ReadAllRequest {
disk: "data".to_string(),
volume: "ff".to_string(),
path: "format.json".to_string(),
});
let response = client.read_all(request).await?.into_inner();
let volume_infos = response.data;
println!("{}", response.success);
println!("{:?}", volume_infos);
Ok(())
}

View File

@@ -9,7 +9,7 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true, features = ["io-util"] }
backon.workspace = true
bytes.workspace = true
thiserror.workspace = true
futures.workspace = true
@@ -28,6 +28,7 @@ lazy_static = "1.5.0"
regex = "1.10.5"
netif = "0.1.6"
path-absolutize = "3.1.1"
protos.workspace = true
rmp-serde = "1.3.0"
tokio-util = { version = "0.7.11", features = ["io"] }
s3s = "0.10.0"
@@ -37,7 +38,10 @@ base64-simd = "0.8.0"
sha2 = "0.10.8"
hex-simd = "0.8.0"
path-clean = "1.0.1"
tokio = { workspace = true, features = ["io-util"] }
tokio-stream = "0.1.15"
tonic.workspace = true
tower.workspace = true
rmp = "0.8.14"
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.12", features = ["xxh64"] }

View File

@@ -3,7 +3,7 @@ use super::{
DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp,
ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions,
};
use crate::disk::STORAGE_FORMAT_FILE;
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::{
error::{Error, Result},
file_meta::FileMeta,
@@ -19,18 +19,29 @@ use std::{
use time::OffsetDateTime;
use tokio::fs::{self, File};
use tokio::io::ErrorKind;
use tracing::{debug, error, warn};
use tokio::sync::Mutex;
use tracing::{debug, warn};
use uuid::Uuid;
#[derive(Debug)]
pub struct FormatInfo {
pub id: Option<Uuid>,
pub _data: Vec<u8>,
pub _file_info: Option<Metadata>,
pub _last_check: Option<OffsetDateTime>,
}
impl FormatInfo {}
#[derive(Debug)]
pub struct LocalDisk {
pub root: PathBuf,
pub id: Uuid,
pub _format_data: Vec<u8>,
pub _format_meta: Option<Metadata>,
pub _format_path: PathBuf,
// pub format_legacy: bool, // drop
pub _format_last_check: Option<OffsetDateTime>,
pub format_info: Mutex<FormatInfo>,
// pub id: Mutex<Option<Uuid>>,
// pub format_data: Mutex<Vec<u8>>,
// pub format_file_info: Mutex<Option<Metadata>>,
// pub format_last_check: Mutex<Option<OffsetDateTime>>,
}
impl LocalDisk {
@@ -48,7 +59,7 @@ impl LocalDisk {
let (format_data, format_meta) = read_file_exists(&format_path).await?;
let mut id = Uuid::nil();
let mut id = None;
// let mut format_legacy = false;
let mut format_last_check = None;
@@ -61,19 +72,26 @@ impl LocalDisk {
return Err(Error::from(DiskError::InconsistentDisk));
}
id = fm.erasure.this;
id = Some(fm.erasure.this);
// format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1;
format_last_check = Some(OffsetDateTime::now_utc());
}
let format_info = FormatInfo {
id,
_data: format_data,
_file_info: format_meta,
_last_check: format_last_check,
};
let disk = Self {
root,
id,
_format_meta: format_meta,
_format_data: format_data,
_format_path: format_path,
// format_legacy,
_format_last_check: format_last_check,
format_info: Mutex::new(format_info),
// // format_legacy,
// format_file_info: Mutex::new(format_meta),
// format_data: Mutex::new(format_data),
// format_last_check: Mutex::new(format_last_check),
};
disk.make_meta_volumes().await?;
@@ -204,7 +222,7 @@ impl LocalDisk {
} else {
if delete_path.is_dir() {
if let Err(err) = fs::remove_dir(&delete_path).await {
error!("remove_dir err {:?} when {:?}", &err, &delete_path);
debug!("remove_dir err {:?} when {:?}", &err, &delete_path);
match err.kind() {
ErrorKind::NotFound => (),
// ErrorKind::DirectoryNotEmpty => (),
@@ -218,7 +236,7 @@ impl LocalDisk {
}
} else {
if let Err(err) = fs::remove_file(&delete_path).await {
error!("remove_file err {:?} when {:?}", &err, &delete_path);
debug!("remove_file err {:?} when {:?}", &err, &delete_path);
match err.kind() {
ErrorKind::NotFound => (),
_ => {
@@ -413,9 +431,24 @@ impl DiskAPI for LocalDisk {
fn is_local(&self) -> bool {
true
}
async fn close(&self) -> Result<()> {
Ok(())
}
fn path(&self) -> PathBuf {
self.root.clone()
}
fn id(&self) -> Uuid {
self.id
async fn get_disk_id(&self) -> Option<Uuid> {
// TODO: check format file
let format_info = self.format_info.lock().await;
format_info.id.clone()
// TODO: 判断源文件id,是否有效
}
async fn set_disk_id(&self, _id: Option<Uuid>) -> Result<()> {
// 本地不需要设置
Ok(())
}
#[must_use]
@@ -517,7 +550,8 @@ impl DiskAPI for LocalDisk {
let file = File::create(&fpath).await?;
Ok(FileWriter::new(file))
Ok(FileWriter::Local(LocalFileWriter::new(file)))
// Ok(FileWriter::new(file))
// let mut writer = BufWriter::new(file);
@@ -543,7 +577,8 @@ impl DiskAPI for LocalDisk {
.open(&p)
.await?;
Ok(FileWriter::new(file))
Ok(FileWriter::Local(LocalFileWriter::new(file)))
// Ok(FileWriter::new(file))
// let mut writer = BufWriter::new(file);
@@ -560,7 +595,7 @@ impl DiskAPI for LocalDisk {
debug!("read_file {:?}", &p);
let file = File::options().read(true).open(&p).await?;
Ok(FileReader::new(file))
Ok(FileReader::Local(LocalFileReader::new(file)))
// file.seek(SeekFrom::Start(offset as u64)).await?;
@@ -756,9 +791,7 @@ impl DiskAPI for LocalDisk {
.await?;
}
Ok(RenameDataResp {
old_data_dir: old_data_dir,
})
Ok(RenameDataResp { old_data_dir })
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {

View File

@@ -2,6 +2,7 @@ pub mod endpoint;
pub mod error;
pub mod format;
mod local;
mod remote;
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
@@ -12,18 +13,21 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json";
const STORAGE_FORMAT_FILE: &str = "xl.meta";
use crate::{
erasure::ReadAt,
erasure::{ReadAt, Write},
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
};
use bytes::Bytes;
use std::{fmt::Debug, io::SeekFrom, pin::Pin, sync::Arc};
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, ReadAtRequest, WriteRequest};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc};
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWrite},
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tonic::{transport::Channel, Request};
use uuid::Uuid;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
@@ -33,15 +37,18 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskS
let s = local::LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Box::new(s)))
} else {
let _ = opt.health_check;
unimplemented!()
let remote_disk = remote::RemoteDisk::new(ep, opt).await?;
Ok(Arc::new(Box::new(remote_disk)))
}
}
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
fn id(&self) -> Uuid;
fn path(&self) -> PathBuf;
async fn close(&self) -> Result<()>;
async fn get_disk_id(&self) -> Option<Uuid>;
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()>;
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
@@ -88,7 +95,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct FileInfoVersions {
// Name of the volume.
pub volume: String,
@@ -104,7 +111,7 @@ pub struct FileInfoVersions {
pub free_versions: Vec<FileInfo>,
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct WalkDirOptions {
// Bucket to scanner
pub bucket: String,
@@ -131,7 +138,7 @@ pub struct WalkDirOptions {
pub disk_id: String,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MetaCacheEntry {
// name is the full name of the object including prefixes
pub name: String,
@@ -206,17 +213,18 @@ pub struct DiskOption {
pub health_check: bool,
}
#[derive(Serialize, Deserialize)]
pub struct RenameDataResp {
pub old_data_dir: Option<Uuid>,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DeleteOptions {
pub recursive: bool,
pub immediate: bool,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadMultipleReq {
pub bucket: String,
pub prefix: String,
@@ -227,7 +235,7 @@ pub struct ReadMultipleReq {
pub max_results: usize,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReadMultipleResp {
pub bucket: String,
pub prefix: String,
@@ -252,65 +260,154 @@ pub struct ReadMultipleResp {
// }
// }
#[derive(Debug, Deserialize, Serialize)]
pub struct VolumeInfo {
pub name: String,
pub created: Option<OffsetDateTime>,
}
#[derive(Deserialize, Serialize)]
pub struct ReadOptions {
pub read_data: bool,
pub healing: bool,
}
pub struct FileWriter {
pub inner: Pin<Box<dyn AsyncWrite + Send + Sync + 'static>>,
// pub struct FileWriter {
// pub inner: Pin<Box<dyn AsyncWrite + Send + Sync + 'static>>,
// }
// impl AsyncWrite for FileWriter {
// fn poll_write(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &[u8],
// ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
// Pin::new(&mut self.inner).poll_write(cx, buf)
// }
// fn poll_flush(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_flush(cx)
// }
// fn poll_shutdown(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_shutdown(cx)
// }
// }
// impl FileWriter {
// pub fn new<W>(inner: W) -> Self
// where
// W: AsyncWrite + Send + Sync + 'static,
// {
// Self { inner: Box::pin(inner) }
// }
// }
pub enum FileWriter {
Local(LocalFileWriter),
Remote(RemoteFileWriter),
}
impl AsyncWrite for FileWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
#[async_trait::async_trait]
impl Write for FileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
match self {
Self::Local(local_file_writer) => local_file_writer.write(buf).await,
Self::Remote(remote_file_writer) => remote_file_writer.write(buf).await,
}
}
}
impl FileWriter {
pub fn new<W>(inner: W) -> Self
where
W: AsyncWrite + Send + Sync + 'static,
{
Self { inner: Box::pin(inner) }
}
}
#[derive(Debug)]
pub struct FileReader {
pub struct LocalFileWriter {
pub inner: File,
}
impl FileReader {
impl LocalFileWriter {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
#[async_trait::async_trait]
impl Write for LocalFileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
self.inner.write(buf).await?;
self.inner.flush().await?;
Ok(())
}
}
pub struct RemoteFileWriter {
pub root: PathBuf,
pub volume: String,
pub path: String,
pub is_append: bool,
client: NodeServiceClient<Channel>,
}
impl RemoteFileWriter {
pub fn new(root: PathBuf, volume: String, path: String, is_append: bool, client: NodeServiceClient<Channel>) -> Self {
Self {
root,
volume,
path,
is_append,
client,
}
}
}
#[async_trait::async_trait]
impl Write for RemoteFileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let request = Request::new(WriteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
is_append: self.is_append,
data: buf.to_vec(),
});
let _response = self.client.write(request).await?.into_inner();
Ok(())
}
}
#[derive(Debug)]
pub enum FileReader {
Local(LocalFileReader),
Remote(RemoteFileReader),
}
#[async_trait::async_trait]
impl ReadAt for FileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
match self {
Self::Local(local_file_writer) => local_file_writer.read_at(offset, length).await,
Self::Remote(remote_file_writer) => remote_file_writer.read_at(offset, length).await,
}
}
}
#[derive(Debug)]
pub struct LocalFileReader {
pub inner: File,
}
impl LocalFileReader {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
#[async_trait::async_trait]
impl ReadAt for LocalFileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
self.inner.seek(SeekFrom::Start(offset as u64)).await?;
@@ -323,3 +420,38 @@ impl ReadAt for FileReader {
Ok((buffer, bytes_read))
}
}
#[derive(Debug)]
pub struct RemoteFileReader {
pub root: PathBuf,
pub volume: String,
pub path: String,
client: NodeServiceClient<Channel>,
}
impl RemoteFileReader {
pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient<Channel>) -> Self {
Self {
root,
volume,
path,
client,
}
}
}
#[async_trait::async_trait]
impl ReadAt for RemoteFileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
let request = Request::new(ReadAtRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
offset: offset.try_into().unwrap(),
length: length.try_into().unwrap(),
});
let response = self.client.read_at(request).await?.into_inner();
Ok((response.data, response.read_size.try_into().unwrap()))
}
}

529
ecstore/src/disk/remote.rs Normal file
View File

@@ -0,0 +1,529 @@
use std::{path::PathBuf, sync::Arc, time::Duration};
use bytes::Bytes;
use futures::lock::Mutex;
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
node_service_client::NodeServiceClient, DeleteRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest,
ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest,
ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest,
WriteMetadataRequest,
},
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use tokio::{fs, sync::RwLock};
use tonic::{
transport::{Channel, Endpoint as tonic_Endpoint},
Request,
};
use tower::timeout::Timeout;
use tracing::info;
use uuid::Uuid;
use crate::{
disk::error::DiskError,
error::{Error, Result},
store_api::{FileInfo, RawFileInfo},
};
use super::{
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskOption, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp, VolumeInfo,
WalkDirOptions,
};
#[derive(Debug)]
pub struct RemoteDisk {
id: Mutex<Option<Uuid>>,
channel: Arc<RwLock<Option<Channel>>>,
url: url::Url,
pub root: PathBuf,
}
impl RemoteDisk {
pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result<Self> {
let root = fs::canonicalize(ep.url.path()).await?;
Ok(Self {
channel: Arc::new(RwLock::new(None)),
url: ep.url.clone(),
root,
id: Mutex::new(None),
})
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
info!("disk url: {}", addr);
let connector = tonic_Endpoint::from_shared(addr.clone())?;
let new_channel = connector.connect().await.map_err(|_err| DiskError::DiskNotFound)?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
Ok(NodeServiceClient::connect(addr).await?)
}
}
// TODO: all api need to handle errors
#[async_trait::async_trait]
impl DiskAPI for RemoteDisk {
fn is_local(&self) -> bool {
false
}
async fn close(&self) -> Result<()> {
Ok(())
}
fn path(&self) -> PathBuf {
self.root.clone()
}
async fn get_disk_id(&self) -> Option<Uuid> {
self.id.lock().await.clone()
}
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
let mut lock = self.id.lock().await;
*lock = id;
Ok(())
}
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
info!("read_all");
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
});
let response = client.read_all(request).await?.into_inner();
info!("read_all success");
if !response.success {
return Err(DiskError::FileNotFound.into());
}
Ok(Bytes::from(response.data))
}
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
info!("write_all");
let mut client = self.get_client_v2().await?;
let request = Request::new(WriteAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
data,
});
let response = client.write_all(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
info!("delete");
let options = serde_json::to_string(&opt)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
options,
});
let response = client.delete(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
info!("rename_file");
let mut client = self.get_client_v2().await?;
let request = Request::new(RenameFileRequst {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
src_path: src_path.to_string(),
dst_volume: dst_volume.to_string(),
dst_path: dst_path.to_string(),
});
let response = client.rename_file(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
info!("create_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
self.get_client_v2().await?,
)))
}
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
self.get_client_v2().await?,
)))
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(FileReader::Remote(RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
self.get_client_v2().await?,
)))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
info!("list_dir");
let mut client = self.get_client_v2().await?;
let request = Request::new(ListDirRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.list_dir(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(response.volumes)
}
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
info!("walk_dir");
let walk_dir_options = serde_json::to_string(&opts)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(WalkDirRequest {
disk: self.root.to_string_lossy().to_string(),
walk_dir_options,
});
let response = client.walk_dir(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let entries = response
.meta_cache_entry
.into_iter()
.filter_map(|json_str| serde_json::from_str::<MetaCacheEntry>(&json_str).ok())
.collect();
Ok(entries)
}
async fn rename_data(
&self,
src_volume: &str,
src_path: &str,
fi: FileInfo,
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
info!("rename_data");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(RenameDataRequest {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
src_path: src_path.to_string(),
file_info,
dst_volume: dst_volume.to_string(),
dst_path: dst_path.to_string(),
});
let response = client.rename_data(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let rename_data_resp = serde_json::from_str::<RenameDataResp>(&response.rename_data_resp)?;
Ok(rename_data_resp)
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
info!("make_volumes");
let mut client = self.get_client_v2().await?;
let request = Request::new(MakeVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
});
let response = client.make_volumes(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn make_volume(&self, volume: &str) -> Result<()> {
info!("make_volume");
let mut client = self.get_client_v2().await?;
let request = Request::new(MakeVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.make_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
info!("list_volumes");
let mut client = self.get_client_v2().await?;
let request = Request::new(ListVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
});
let response = client.list_volumes(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let infos = response
.volume_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<VolumeInfo>(&json_str).ok())
.collect();
Ok(infos)
}
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
info!("stat_volume");
let mut client = self.get_client_v2().await?;
let request = Request::new(StatVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.stat_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let volume_info = serde_json::from_str::<VolumeInfo>(&response.volume_info)?;
Ok(volume_info)
}
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
info!("write_metadata");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(WriteMetadataRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
file_info,
});
let response = client.write_metadata(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn read_version(
&self,
_org_volume: &str,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
info!("read_version");
let opts = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadVersionRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
version_id: version_id.to_string(),
opts,
});
let response = client.read_version(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let file_info = serde_json::from_str::<FileInfo>(&response.file_info)?;
Ok(file_info)
}
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
info!("read_xl");
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadXlRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
read_data,
});
let response = client.read_xl(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let raw_file_info = serde_json::from_str::<RawFileInfo>(&response.raw_file_info)?;
Ok(raw_file_info)
}
async fn delete_versions(
&self,
volume: &str,
versions: Vec<FileInfoVersions>,
opts: DeleteOptions,
) -> Result<Vec<Option<Error>>> {
info!("delete_versions");
let opts = serde_json::to_string(&opts)?;
let mut versions_str = Vec::with_capacity(versions.len());
for file_info_versions in versions.iter() {
versions_str.push(serde_json::to_string(file_info_versions)?);
}
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteVersionsRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
versions: versions_str,
opts,
});
let response = client.delete_versions(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(format!(
"delete versions remote err: {}",
response.error_info.unwrap_or("None".to_string())
)));
}
let errors = response
.errors
.iter()
.map(|error| {
if error.is_empty() {
None
} else {
Some(Error::from_string(error))
}
})
.collect();
Ok(errors)
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
info!("read_multiple");
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadMultipleRequest {
disk: self.root.to_string_lossy().to_string(),
read_multiple_req,
});
let response = client.read_multiple(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let read_multiple_resps = response
.read_multiple_resps
.into_iter()
.filter_map(|json_str| serde_json::from_str::<ReadMultipleResp>(&json_str).ok())
.collect();
Ok(read_multiple_resps)
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
info!("delete_volume");
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.delete_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
}

View File

@@ -408,6 +408,11 @@ impl AsMut<Vec<PoolEndpoints>> for EndpointServerPools {
}
impl EndpointServerPools {
pub fn from_volumes(server_addr: &str, endpoints: Vec<String>) -> Result<(EndpointServerPools, SetupType)> {
let layouts = DisksLayout::try_from(endpoints.as_slice())?;
Self::create_server_endpoints(server_addr, &layouts)
}
/// validates and creates new endpoints from input args, supports
/// both ellipses and without ellipses transparently.
pub fn create_server_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> {

View File

@@ -3,7 +3,7 @@ use bytes::Bytes;
use futures::future::join_all;
use futures::{Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use tokio::io::AsyncWrite;
use std::fmt::Debug;
use tokio::io::AsyncWriteExt;
use tokio::io::DuplexStream;
use tracing::debug;
@@ -13,7 +13,7 @@ use uuid::Uuid;
use crate::chunk_stream::ChunkedStream;
use crate::disk::error::DiskError;
use crate::disk::FileReader;
use crate::disk::{FileReader, FileWriter};
pub struct Erasure {
data_shards: usize,
@@ -43,17 +43,16 @@ impl Erasure {
}
}
pub async fn encode<S, W>(
pub async fn encode<S>(
&self,
body: S,
writers: &mut [W],
writers: &mut [FileWriter],
// block_size: usize,
total_size: usize,
_write_quorum: usize,
) -> Result<usize>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
W: AsyncWrite + Unpin,
{
let mut stream = ChunkedStream::new(body, total_size, self.block_size, false);
let mut total: usize = 0;
@@ -85,7 +84,7 @@ impl Erasure {
let mut errs = Vec::new();
for (i, w) in writers.iter_mut().enumerate() {
match w.write_all(blocks[i].as_ref()).await {
match w.write(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
@@ -318,6 +317,12 @@ impl Erasure {
}
}
#[async_trait::async_trait]
pub trait Write {
async fn write(&mut self, buf: &[u8]) -> Result<()>;
}
#[async_trait::async_trait]
pub trait ReadAt {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
}

View File

@@ -1,12 +1,12 @@
mod bucket_meta;
mod chunk_stream;
pub mod disk;
mod disks_layout;
mod endpoints;
mod erasure;
pub mod disks_layout;
pub mod endpoints;
pub mod erasure;
pub mod error;
mod file_meta;
mod peer;
pub mod peer;
pub mod set_disk;
mod sets;
pub mod store;

View File

@@ -1,11 +1,19 @@
use async_trait::async_trait;
use futures::future::join_all;
use protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest};
use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tracing::warn;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
use tonic::Request;
use tower::timeout::Timeout;
use tracing::{info, warn};
use crate::store::all_local_disk;
use crate::{
disk::{self, error::DiskError, DiskStore, VolumeInfo},
disk::{self, error::DiskError, VolumeInfo},
endpoints::{EndpointServerPools, Node},
error::{Error, Result},
store_api::{BucketInfo, BucketOptions, DeleteBucketOptions, MakeBucketOptions},
@@ -19,7 +27,7 @@ pub trait PeerS3Client: Debug + Sync + Send + 'static {
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
fn get_pools(&self) -> Vec<usize>;
fn get_pools(&self) -> Option<Vec<usize>>;
}
#[derive(Debug)]
@@ -29,24 +37,23 @@ pub struct S3PeerSys {
}
impl S3PeerSys {
pub fn new(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Self {
pub fn new(eps: &EndpointServerPools) -> Self {
Self {
clients: Self::new_clients(eps, local_disks),
clients: Self::new_clients(eps),
pools_count: eps.as_ref().len(),
}
}
fn new_clients(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Vec<Client> {
fn new_clients(eps: &EndpointServerPools) -> Vec<Client> {
let nodes = eps.get_nodes();
let v: Vec<Client> = nodes
.iter()
.map(|e| {
if e.is_local {
let cli: Box<dyn PeerS3Client> =
Box::new(LocalPeerS3Client::new(local_disks.clone(), e.clone(), e.pools.clone()));
let cli: Box<dyn PeerS3Client> = Box::new(LocalPeerS3Client::new(Some(e.clone()), Some(e.pools.clone())));
Arc::new(cli)
} else {
let cli: Box<dyn PeerS3Client> = Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone()));
let cli: Box<dyn PeerS3Client> = Box::new(RemotePeerS3Client::new(Some(e.clone()), Some(e.pools.clone())));
Arc::new(cli)
}
})
@@ -56,9 +63,8 @@ impl S3PeerSys {
}
}
#[async_trait]
impl PeerS3Client for S3PeerSys {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
impl S3PeerSys {
pub async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.make_bucket(bucket, opts));
@@ -83,7 +89,7 @@ impl PeerS3Client for S3PeerSys {
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.contains(&idx) {
if pools.unwrap_or_default().contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
}
@@ -95,7 +101,7 @@ impl PeerS3Client for S3PeerSys {
Ok(())
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
pub async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.list_bucket(opts));
@@ -165,7 +171,7 @@ impl PeerS3Client for S3PeerSys {
Ok(())
}
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
pub async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.get_bucket_info(bucket, opts));
@@ -193,7 +199,7 @@ impl PeerS3Client for S3PeerSys {
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.contains(&idx) {
if pools.unwrap_or_default().contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
}
@@ -206,22 +212,22 @@ impl PeerS3Client for S3PeerSys {
.ok_or(Error::new(DiskError::VolumeNotFound))
}
fn get_pools(&self) -> Vec<usize> {
pub fn get_pools(&self) -> Option<Vec<usize>> {
unimplemented!()
}
}
#[derive(Debug)]
pub struct LocalPeerS3Client {
pub local_disks: Vec<DiskStore>,
// pub local_disks: Vec<DiskStore>,
// pub node: Node,
pub pools: Vec<usize>,
pub pools: Option<Vec<usize>>,
}
impl LocalPeerS3Client {
fn new(local_disks: Vec<DiskStore>, _node: Node, pools: Vec<usize>) -> Self {
pub fn new(_node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
Self {
local_disks,
// local_disks,
// node,
pools,
}
@@ -230,12 +236,14 @@ impl LocalPeerS3Client {
#[async_trait]
impl PeerS3Client for LocalPeerS3Client {
fn get_pools(&self) -> Vec<usize> {
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
let local_disks = all_local_disk().await;
let mut futures = Vec::with_capacity(local_disks.len());
for disk in local_disks.iter() {
futures.push(disk.list_volumes());
}
@@ -280,8 +288,9 @@ impl PeerS3Client for LocalPeerS3Client {
Ok(buckets)
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
let local_disks = all_local_disk().await;
let mut futures = Vec::with_capacity(local_disks.len());
for disk in local_disks.iter() {
futures.push(async move {
match disk.make_volume(bucket).await {
Ok(_) => Ok(()),
@@ -313,15 +322,16 @@ impl PeerS3Client for LocalPeerS3Client {
}
async fn get_bucket_info(&self, bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
let local_disks = all_local_disk().await;
let mut futures = Vec::with_capacity(local_disks.len());
for disk in local_disks.iter() {
futures.push(disk.stat_volume(bucket));
}
let results = join_all(futures).await;
let mut ress = Vec::with_capacity(self.local_disks.len());
let mut errs = Vec::with_capacity(self.local_disks.len());
let mut ress = Vec::with_capacity(local_disks.len());
let mut errs = Vec::with_capacity(local_disks.len());
for res in results {
match res {
@@ -351,9 +361,10 @@ impl PeerS3Client for LocalPeerS3Client {
}
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.local_disks.len());
let local_disks = all_local_disk().await;
let mut futures = Vec::with_capacity(local_disks.len());
for disk in self.local_disks.iter() {
for disk in local_disks.iter() {
futures.push(disk.delete_volume(bucket));
}
@@ -398,34 +409,115 @@ impl PeerS3Client for LocalPeerS3Client {
#[derive(Debug)]
pub struct RemotePeerS3Client {
// pub node: Node,
// pub pools: Vec<usize>,
pub node: Option<Node>,
pub pools: Option<Vec<usize>>,
connector: Endpoint,
channel: Arc<RwLock<Option<Channel>>>,
}
impl RemotePeerS3Client {
fn new(_node: Node, _pools: Vec<usize>) -> Self {
// Self { node, pools }
Self {}
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
let connector =
Endpoint::from_shared(format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default())).unwrap();
Self {
node,
pools,
connector,
channel: Arc::new(RwLock::new(None)),
}
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let new_channel = self.connector.connect().await?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
// let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
let addr = format!("{}", self.node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
Ok(NodeServiceClient::connect(addr).await?)
}
}
#[async_trait]
impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Vec<usize> {
unimplemented!()
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
unimplemented!()
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
let bucket_infos = response
.bucket_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<BucketInfo>(&json_str).ok())
.collect();
Ok(bucket_infos)
}
async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> {
unimplemented!()
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
});
let response = client.make_bucket(request).await?.into_inner();
// TODO: deal with error
if !response.success {
warn!("make bucket error: {:?}", response.error_info);
}
Ok(())
}
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
unimplemented!()
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
});
let response = client.get_bucket_info(request).await?.into_inner();
let bucket_info = serde_json::from_str::<BucketInfo>(&response.bucket_info)?;
Ok(bucket_info)
}
async fn delete_bucket(&self, _bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
unimplemented!()
async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});
let _response = client.delete_bucket(request).await?.into_inner();
Ok(())
}
}

View File

@@ -12,6 +12,7 @@ use crate::{
endpoints::PoolEndpoints,
error::{Error, Result},
set_disk::SetDisks,
store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
@@ -36,7 +37,7 @@ pub struct Sets {
}
impl Sets {
pub fn new(
pub async fn new(
disks: Vec<Option<DiskStore>>,
endpoints: &PoolEndpoints,
fm: &FormatV3,
@@ -52,11 +53,32 @@ impl Sets {
let mut set_drive = Vec::with_capacity(set_drive_count);
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
if disks[idx].is_none() {
let mut disk = disks[idx].clone();
if disk.is_none() {
set_drive.push(None);
} else {
let disk = disks[idx].clone();
continue;
}
if disk.as_ref().unwrap().is_local() && *GLOBAL_IsDistErasure.read().await {
let local_disk = {
let local_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.read().await;
local_set_drives[pool_idx][i][j].clone()
};
if local_disk.is_none() {
set_drive.push(None);
continue;
}
let _ = disk.as_ref().unwrap().close().await;
disk = local_disk;
}
if let Some(_disk_id) = disk.as_ref().unwrap().get_disk_id().await {
set_drive.push(disk);
} else {
set_drive.push(None);
}
}

View File

@@ -1,10 +1,11 @@
use crate::{
bucket_meta::BucketMetadata,
disk::{error::DiskError, DeleteOptions, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
disks_layout::DisksLayout,
endpoints::EndpointServerPools,
disk::{
error::DiskError, new_disk, DeleteOptions, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET,
},
endpoints::{EndpointServerPools, SetupType},
error::{Error, Result},
peer::{PeerS3Client, S3PeerSys},
peer::S3PeerSys,
sets::Sets,
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
@@ -13,14 +14,137 @@ use crate::{
},
store_init, utils,
};
use backon::{ExponentialBuilder, Retryable};
use futures::future::join_all;
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use time::OffsetDateTime;
use tracing::{debug, warn};
use tokio::{fs, sync::RwLock};
use tracing::{debug, info, warn};
use uuid::Uuid;
use lazy_static::lazy_static;
lazy_static! {
pub static ref GLOBAL_IsErasure: RwLock<bool> = RwLock::new(false);
pub static ref GLOBAL_IsDistErasure: RwLock<bool> = RwLock::new(false);
pub static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
}
pub async fn update_erasure_type(setup_type: SetupType) {
let mut is_erasure = GLOBAL_IsErasure.write().await;
*is_erasure = setup_type == SetupType::Erasure;
let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await;
*is_dist_erasure = setup_type == SetupType::DistErasure;
if *is_dist_erasure {
*is_erasure = true
}
let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await;
*is_erasure_sd = setup_type == SetupType::ErasureSD;
}
lazy_static! {
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<Vec<Vec<Vec<Option<DiskStore>>>>>> =
Arc::new(RwLock::new(Vec::new()));
}
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
let disk_path = match fs::canonicalize(disk_path).await {
Ok(disk_path) => disk_path,
Err(_) => return None,
};
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
let path = disk_path.to_string_lossy().to_string();
if disk_map.contains_key(&path) {
let a = disk_map[&path].as_ref().cloned();
return a;
}
None
}
pub async fn all_local_disk_path() -> Vec<String> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map.keys().map(|v| v.clone()).collect()
}
pub async fn all_local_disk() -> Vec<DiskStore> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map
.values()
.filter(|v| v.is_some())
.map(|v| v.as_ref().unwrap().clone())
.collect()
}
// init_local_disks 初始化本地磁盘server启动前必须初始化成功
pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> {
let opt = &DiskOption {
cleanup: true,
health_check: true,
};
let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await;
for pool_eps in endpoint_pools.as_ref().iter() {
let mut set_count_drives = Vec::with_capacity(pool_eps.set_count);
for _ in 0..pool_eps.set_count {
set_count_drives.push(vec![None; pool_eps.drives_per_set]);
}
global_set_drives.push(set_count_drives);
}
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
for pool_eps in endpoint_pools.as_ref().iter() {
let mut set_drives = HashMap::new();
for ep in pool_eps.endpoints.as_ref().iter() {
if !ep.is_local {
continue;
}
let disk = new_disk(ep, opt).await?;
let path = disk.path().to_string_lossy().to_string();
global_local_disk_map.insert(path, Some(disk.clone()));
set_drives.insert(ep.disk_idx, Some(disk.clone()));
if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() {
global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone());
}
}
}
Ok(())
}
lazy_static! {
pub static ref GLOBAL_OBJECT_API: Arc<RwLock<Option<ECStore>>> = Arc::new(RwLock::new(None));
pub static ref GLOBAL_LOCAL_DISK: Arc<RwLock<Vec<Option<DiskStore>>>> = Arc::new(RwLock::new(Vec::new()));
}
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
GLOBAL_OBJECT_API.clone()
}
async fn set_object_layer(o: ECStore) {
let mut global_object_api = GLOBAL_OBJECT_API.write().await;
*global_object_api = Some(o);
}
#[derive(Debug)]
pub struct ECStore {
pub id: uuid::Uuid,
@@ -28,16 +152,16 @@ pub struct ECStore {
pub disk_map: HashMap<usize, Vec<Option<DiskStore>>>,
pub pools: Vec<Sets>,
pub peer_sys: S3PeerSys,
pub local_disks: Vec<DiskStore>,
// pub local_disks: Vec<DiskStore>,
}
impl ECStore {
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
let layouts = DisksLayout::try_from(endpoints.as_slice())?;
pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<()> {
// let layouts = DisksLayout::try_from(endpoints.as_slice())?;
let mut deployment_id = None;
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?;
// let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?;
let mut pools = Vec::with_capacity(endpoint_pools.as_ref().len());
let mut disk_map = HashMap::with_capacity(endpoint_pools.as_ref().len());
@@ -46,6 +170,8 @@ impl ECStore {
let mut local_disks = Vec::new();
info!("endpoint_pools: {:?}", endpoint_pools);
for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() {
// TODO: read from config parseStorageClass
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
@@ -61,13 +187,21 @@ impl ECStore {
DiskError::check_disk_fatal_errs(&errs)?;
let fm = store_init::do_init_format_file(
first_is_local,
&disks,
pool_eps.set_count,
pool_eps.drives_per_set,
deployment_id,
)
let fm = (|| async {
store_init::connect_load_init_formats(
first_is_local,
&disks,
pool_eps.set_count,
pool_eps.drives_per_set,
deployment_id,
)
.await
})
.retry(ExponentialBuilder::default().with_max_times(usize::MAX))
.sleep(tokio::time::sleep)
.notify(|err, dur: Duration| {
info!("retrying get formats {:?} after {:?}", err, dur);
})
.await?;
if deployment_id.is_none() {
@@ -88,24 +222,42 @@ impl ECStore {
}
}
let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count)?;
let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count).await?;
pools.push(sets);
disk_map.insert(i, disks);
}
let peer_sys = S3PeerSys::new(&endpoint_pools, local_disks.clone());
// 替换本地磁盘
if !*GLOBAL_IsDistErasure.read().await {
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
for disk in local_disks {
let path = disk.path().to_string_lossy().to_string();
global_local_disk_map.insert(path, Some(disk.clone()));
}
}
Ok(ECStore {
let peer_sys = S3PeerSys::new(&endpoint_pools);
let ec = ECStore {
id: deployment_id.unwrap(),
disk_map,
pools,
local_disks,
peer_sys,
})
};
set_object_layer(ec).await;
Ok(())
}
pub fn init_local_disks() {}
// pub fn local_disks(&self) -> Vec<DiskStore> {
// self.local_disks.clone()
// }
fn single_pool(&self) -> bool {
self.pools.len() == 1
}

View File

@@ -194,6 +194,7 @@ pub struct ObjectPartInfo {
// }
// }
#[derive(Serialize, Deserialize)]
pub struct RawFileInfo {
pub buf: Vec<u8>,
}
@@ -239,6 +240,7 @@ pub enum BitrotAlgorithm {
BLAKE2b512,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MakeBucketOptions {
pub force_create: bool,
}
@@ -380,9 +382,10 @@ pub struct ObjectOptions {
// }
// }
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketOptions {}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketInfo {
pub name: String,
pub created: Option<OffsetDateTime>,

View File

@@ -1,7 +1,9 @@
use crate::{
disk::error::DiskError,
disk::format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
disk::{new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
disk::{
error::DiskError,
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET,
},
endpoints::Endpoints,
error::{Error, Result},
};
@@ -10,6 +12,7 @@ use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};
use tracing::warn;
use uuid::Uuid;
@@ -40,30 +43,34 @@ pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskSt
(res, errors)
}
pub async fn do_init_format_file(
pub async fn connect_load_init_formats(
first_disk: bool,
disks: &[Option<DiskStore>],
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3, Error> {
let (formats, errs) = read_format_file_all(disks, false).await;
warn!("connect_load_init_formats id: {:?}, first_disk: {}", deployment_id, first_disk);
let (formats, errs) = load_format_erasure_all(disks, false).await;
DiskError::check_disk_fatal_errs(&errs)?;
warn!("load_format_erasure_all errs {:?}", &errs);
check_format_erasure_values(&formats, set_drive_count)?;
if first_disk && DiskError::should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
// new format and save
let fms = init_format_files(disks, set_count, set_drive_count, deployment_id);
let fms = init_format_erasure(disks, set_count, set_drive_count, deployment_id);
let _errs = save_format_file_all(disks, &fms).await;
// TODO: check quorum
// reduceWriteQuorumErrs(&errs)?;
let fm = get_format_file_in_quorum(&fms)?;
let fm = get_format_erasure_in_quorum(&fms)?;
return Ok(fm);
}
@@ -77,12 +84,12 @@ pub async fn do_init_format_file(
return Err(Error::new(ErasureError::FirstDiskWait));
}
let fm = get_format_file_in_quorum(&formats)?;
let fm = get_format_erasure_in_quorum(&formats)?;
Ok(fm)
}
fn init_format_files(
fn init_format_erasure(
disks: &[Option<DiskStore>],
set_count: usize,
set_drive_count: usize,
@@ -106,7 +113,7 @@ fn init_format_files(
fms
}
fn get_format_file_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3> {
fn get_format_erasure_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3> {
let mut countmap = HashMap::new();
for f in formats.iter() {
@@ -124,8 +131,15 @@ fn get_format_file_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3> {
let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0));
if *max_drives == 0 || *max_count < formats.len() / 2 {
warn!("*max_drives == 0 || *max_count < formats.len() / 2");
warn!("get_format_erasure_in_quorum fi: {:?}", &formats);
if *max_drives == 0 || *max_count <= formats.len() / 2 {
warn!(
"*max_drives == 0 || *max_count < formats.len() / 2, {} || {}<{}",
max_drives,
max_count,
formats.len() / 2
);
return Err(Error::new(ErasureError::ErasureReadQuorum));
}
@@ -184,21 +198,26 @@ pub fn default_partiy_count(drive: usize) -> usize {
_ => 4,
}
}
// read_format_file_all 读取所有foramt.json
async fn read_format_file_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
// load_format_erasure_all 读取所有foramt.json
async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
for ep in disks.iter() {
futures.push(read_format_file(ep, heal));
for disk in disks.iter() {
futures.push(read_format_file(disk, heal));
}
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
let mut i = 0;
for result in results {
match result {
Ok(s) => {
if !heal {
let _ = disks[i].as_ref().unwrap().set_disk_id(Some(s.erasure.this.clone())).await;
}
datas.push(Some(s));
errors.push(None);
}
@@ -207,6 +226,8 @@ async fn read_format_file_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<O
errors.push(Some(e));
}
}
i += 1;
}
(datas, errors)
@@ -238,8 +259,8 @@ async fn read_format_file(disk: &Option<DiskStore>, _heal: bool) -> Result<Forma
async fn save_format_file_all(disks: &[Option<DiskStore>], formats: &[Option<FormatV3>]) -> Vec<Option<Error>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, ep) in disks.iter().enumerate() {
futures.push(save_format_file(ep, &formats[i]));
for (i, disk) in disks.iter().enumerate() {
futures.push(save_format_file(disk, &formats[i]));
}
let mut errors = Vec::with_capacity(disks.len());
@@ -277,9 +298,7 @@ async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -
disk.rename_file(RUSTFS_META_BUCKET, tmpfile.as_str(), RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await?;
// let mut disk = disk;
// disk.set_disk_id(format.erasure.this);
disk.set_disk_id(Some(format.erasure.this)).await?;
Ok(())
}

Binary file not shown.

View File

@@ -10,6 +10,25 @@ rust-version.workspace = true
[dependencies]
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true
ecstore.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
mime.workspace = true
netif.workspace = true
pin-project-lite.workspace = true
prost.workspace = true
prost-types.workspace = true
protos.workspace = true
protobuf.workspace = true
s3s.workspace = true
serde_json.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
tokio = { workspace = true, features = [
@@ -18,12 +37,22 @@ tokio = { workspace = true, features = [
"net",
"signal",
] }
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-reflection.workspace = true
tower.workspace = true
tracing-error.workspace = true
tracing-subscriber.workspace = true
transform-stream.workspace = true
uuid = "1.10.0"
[build-dependencies]
prost-build.workspace = true
tonic-build.workspace = true
http.workspace = true
bytes.workspace = true
futures.workspace = true
futures-util.workspace = true
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
# uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
ecstore = { path = "../ecstore" }
s3s = "0.10.0"
clap = { version = "4.5.7", features = ["derive"] }

816
rustfs/src/grpc.rs Normal file
View File

@@ -0,0 +1,816 @@
use ecstore::{
disk::{DeleteOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, WalkDirOptions},
erasure::{ReadAt, Write},
peer::{LocalPeerS3Client, PeerS3Client},
store::{all_local_disk_path, find_local_disk},
store_api::{BucketOptions, FileInfo, MakeBucketOptions},
};
use tonic::{Request, Response, Status};
use tracing::{debug, error, info};
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVersionsRequest, DeleteVersionsResponse,
DeleteVolumeRequest, DeleteVolumeResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest,
ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest,
MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest,
PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse,
ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse,
RenameFileRequst, RenameFileResponse, StatVolumeRequest, StatVolumeResponse, WalkDirRequest, WalkDirResponse,
WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse,
},
};
#[derive(Debug)]
struct NodeService {
pub local_peer: LocalPeerS3Client,
}
pub fn make_server() -> NodeServer<impl Node> {
// let local_disks = all_local_disk().await;
let local_peer = LocalPeerS3Client::new(None, None);
NodeServer::new(NodeService { local_peer })
}
impl NodeService {
async fn find_disk(&self, disk_path: &String) -> Option<DiskStore> {
find_local_disk(disk_path).await
// let disk_path = match fs::canonicalize(disk_path).await {
// Ok(disk_path) => disk_path,
// Err(_) => return None,
// };
// self.local_peer.local_disks.iter().find(|&x| x.path() == disk_path).cloned()
}
async fn all_disk(&self) -> Vec<String> {
all_local_disk_path().await
// self.local_peer
// .local_disks
// .iter()
// .map(|disk| disk.path().to_string_lossy().to_string())
// .collect()
}
}
#[tonic::async_trait]
impl Node for NodeService {
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
debug!("PING");
let ping_req = request.into_inner();
let ping_body = flatbuffers::root::<PingBody>(&ping_req.body);
if let Err(e) = ping_body {
error!("{}", e);
} else {
info!("ping_req:body(flatbuffer): {:?}", ping_body);
}
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello, caller");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();
Ok(tonic::Response::new(PingResponse {
version: 1,
body: finished_data.to_vec(),
}))
}
async fn list_bucket(&self, request: Request<ListBucketRequest>) -> Result<Response<ListBucketResponse>, Status> {
debug!("list bucket");
let request = request.into_inner();
let options = match serde_json::from_str::<BucketOptions>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(ListBucketResponse {
success: false,
bucket_infos: Vec::new(),
error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())),
}))
}
};
match self.local_peer.list_bucket(&options).await {
Ok(bucket_infos) => {
let bucket_infos = bucket_infos
.into_iter()
.filter_map(|bucket_info| serde_json::to_string(&bucket_info).ok())
.collect();
Ok(tonic::Response::new(ListBucketResponse {
success: true,
bucket_infos,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(ListBucketResponse {
success: false,
bucket_infos: Vec::new(),
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn make_bucket(&self, request: Request<MakeBucketRequest>) -> Result<Response<MakeBucketResponse>, Status> {
debug!("make bucket");
let request = request.into_inner();
let options = match serde_json::from_str::<MakeBucketOptions>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(MakeBucketResponse {
success: false,
error_info: Some(format!("decode MakeBucketOptions failed: {}", err.to_string())),
}))
}
};
match self.local_peer.make_bucket(&request.name, &options).await {
Ok(_) => Ok(tonic::Response::new(MakeBucketResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(MakeBucketResponse {
success: false,
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn get_bucket_info(&self, request: Request<GetBucketInfoRequest>) -> Result<Response<GetBucketInfoResponse>, Status> {
debug!("get bucket info");
let request = request.into_inner();
let options = match serde_json::from_str::<BucketOptions>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),
error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())),
}))
}
};
match self.local_peer.get_bucket_info(&request.bucket, &options).await {
Ok(bucket_info) => {
let bucket_info = match serde_json::to_string(&bucket_info) {
Ok(bucket_info) => bucket_info,
Err(err) => {
return Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),
error_info: Some(format!("encode BucketInfo failed: {}", err.to_string())),
}));
}
};
Ok(tonic::Response::new(GetBucketInfoResponse {
success: true,
bucket_info,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn delete_bucket(&self, request: Request<DeleteBucketRequest>) -> Result<Response<DeleteBucketResponse>, Status> {
debug!("make bucket");
let request = request.into_inner();
match self.local_peer.delete_bucket(&request.bucket).await {
Ok(_) => Ok(tonic::Response::new(DeleteBucketResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DeleteBucketResponse {
success: false,
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn read_all(&self, request: Request<ReadAllRequest>) -> Result<Response<ReadAllResponse>, Status> {
debug!("read all");
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_all(&request.volume, &request.path).await {
Ok(data) => Ok(tonic::Response::new(ReadAllResponse {
success: true,
data: data.to_vec(),
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadAllResponse {
success: false,
data: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadAllResponse {
success: false,
data: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn write_all(&self, request: Request<WriteAllRequest>) -> Result<Response<WriteAllResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.write_all(&request.volume, &request.path, request.data).await {
Ok(_) => Ok(tonic::Response::new(WriteAllResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(WriteAllResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WriteAllResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn delete(&self, request: Request<DeleteRequest>) -> Result<Response<DeleteResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let options = match serde_json::from_str::<DeleteOptions>(&request.options) {
Ok(options) => options,
Err(_) => {
return Ok(tonic::Response::new(DeleteResponse {
success: false,
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk.delete(&request.volume, &request.path, options).await {
Ok(_) => Ok(tonic::Response::new(DeleteResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DeleteResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DeleteResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn rename_file(&self, request: Request<RenameFileRequst>) -> Result<Response<RenameFileResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk
.rename_file(&request.src_volume, &request.src_path, &request.dst_volume, &request.dst_path)
.await
{
Ok(_) => Ok(tonic::Response::new(RenameFileResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(RenameFileResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(RenameFileResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn write(&self, request: Request<WriteRequest>) -> Result<Response<WriteResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_writer = if request.is_append {
disk.append_file(&request.volume, &request.path).await
} else {
disk.create_file("", &request.volume, &request.path, 0).await
};
match file_writer {
Ok(mut file_writer) => match file_writer.write(&request.data).await {
Ok(_) => Ok(tonic::Response::new(WriteResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(WriteResponse {
success: false,
error_info: Some(err.to_string()),
})),
},
Err(err) => Ok(tonic::Response::new(WriteResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WriteResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_at(&self, request: Request<ReadAtRequest>) -> Result<Response<ReadAtResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_file(&request.volume, &request.path).await {
Ok(mut file_reader) => {
match file_reader
.read_at(request.offset.try_into().unwrap(), request.length.try_into().unwrap())
.await
{
Ok((data, read_size)) => Ok(tonic::Response::new(ReadAtResponse {
success: true,
data,
read_size: read_size.try_into().unwrap(),
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
}
}
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn list_dir(&self, request: Request<ListDirRequest>) -> Result<Response<ListDirResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.list_dir("", &request.volume, "", 0).await {
Ok(volumes) => Ok(tonic::Response::new(ListDirResponse {
success: true,
volumes,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ListDirResponse {
success: false,
volumes: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ListDirResponse {
success: false,
volumes: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn walk_dir(&self, request: Request<WalkDirRequest>) -> Result<Response<WalkDirResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let opts = match serde_json::from_str::<WalkDirOptions>(&request.walk_dir_options) {
Ok(options) => options,
Err(_) => {
return Ok(tonic::Response::new(WalkDirResponse {
success: false,
meta_cache_entry: Vec::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk.walk_dir(opts).await {
Ok(entries) => {
let entries = entries
.into_iter()
.filter_map(|entry| serde_json::to_string(&entry).ok())
.collect();
Ok(tonic::Response::new(WalkDirResponse {
success: true,
meta_cache_entry: entries,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(WalkDirResponse {
success: false,
meta_cache_entry: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WalkDirResponse {
success: false,
meta_cache_entry: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn rename_data(&self, request: Request<RenameDataRequest>) -> Result<Response<RenameDataResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_info = match serde_json::from_str::<FileInfo>(&request.file_info) {
Ok(file_info) => file_info,
Err(_) => {
return Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk
.rename_data(&request.src_volume, &request.src_path, file_info, &request.dst_volume, &request.dst_path)
.await
{
Ok(rename_data_resp) => {
let rename_data_resp = match serde_json::to_string(&rename_data_resp) {
Ok(file_info) => file_info,
Err(_) => {
return Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some("can not encode RenameDataResp".to_string()),
}));
}
};
Ok(tonic::Response::new(RenameDataResponse {
success: true,
rename_data_resp,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn make_volumes(&self, request: Request<MakeVolumesRequest>) -> Result<Response<MakeVolumesResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.make_volumes(request.volumes.iter().map(|s| &**s).collect()).await {
Ok(_) => Ok(tonic::Response::new(MakeVolumesResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(MakeVolumesResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(MakeVolumesResponse {
success: false,
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk().await)),
}))
}
}
async fn make_volume(&self, request: Request<MakeVolumeRequest>) -> Result<Response<MakeVolumeResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.make_volume(&request.volume).await {
Ok(_) => Ok(tonic::Response::new(MakeVolumeResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(MakeVolumeResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(MakeVolumeResponse {
success: false,
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk().await)),
}))
}
}
async fn list_volumes(&self, request: Request<ListVolumesRequest>) -> Result<Response<ListVolumesResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.list_volumes().await {
Ok(volume_infos) => {
let volume_infos = volume_infos
.into_iter()
.filter_map(|volume_info| serde_json::to_string(&volume_info).ok())
.collect();
Ok(tonic::Response::new(ListVolumesResponse {
success: true,
volume_infos,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(ListVolumesResponse {
success: false,
volume_infos: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ListVolumesResponse {
success: false,
volume_infos: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn stat_volume(&self, request: Request<StatVolumeRequest>) -> Result<Response<StatVolumeResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.stat_volume(&request.volume).await {
Ok(volume_info) => match serde_json::to_string(&volume_info) {
Ok(volume_info) => Ok(tonic::Response::new(StatVolumeResponse {
success: true,
volume_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(StatVolumeResponse {
success: false,
volume_info: String::new(),
error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(StatVolumeResponse {
success: false,
volume_info: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(StatVolumeResponse {
success: false,
volume_info: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn write_metadata(&self, request: Request<WriteMetadataRequest>) -> Result<Response<WriteMetadataResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_info = match serde_json::from_str::<FileInfo>(&request.file_info) {
Ok(file_info) => file_info,
Err(err) => {
return Ok(tonic::Response::new(WriteMetadataResponse {
success: false,
error_info: Some(format!("decode FileInfo failed, {}", err.to_string())),
}));
}
};
match disk.write_metadata("", &request.volume, &request.path, file_info).await {
Ok(_) => Ok(tonic::Response::new(WriteMetadataResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(WriteMetadataResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WriteMetadataResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_version(&self, request: Request<ReadVersionRequest>) -> Result<Response<ReadVersionResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let opts = match serde_json::from_str::<ReadOptions>(&request.opts) {
Ok(options) => options,
Err(_) => {
return Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk
.read_version("", &request.volume, &request.path, &request.version_id, &opts)
.await
{
Ok(file_info) => match serde_json::to_string(&file_info) {
Ok(file_info) => Ok(tonic::Response::new(ReadVersionResponse {
success: true,
file_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_xl(&self, request: Request<ReadXlRequest>) -> Result<Response<ReadXlResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_xl(&request.volume, &request.path, request.read_data).await {
Ok(raw_file_info) => match serde_json::to_string(&raw_file_info) {
Ok(raw_file_info) => Ok(tonic::Response::new(ReadXlResponse {
success: true,
raw_file_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadXlResponse {
success: false,
raw_file_info: String::new(),
error_info: Some(format!("encode RawFileInfo failed, {}", err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(ReadXlResponse {
success: false,
raw_file_info: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadXlResponse {
success: false,
raw_file_info: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn delete_versions(&self, request: Request<DeleteVersionsRequest>) -> Result<Response<DeleteVersionsResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let mut versions = Vec::with_capacity(request.versions.len());
for version in request.versions.iter() {
match serde_json::from_str::<FileInfoVersions>(&version) {
Ok(version) => versions.push(version),
Err(_) => {
return Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some("can not decode FileInfoVersions".to_string()),
}));
}
};
}
let opts = match serde_json::from_str::<DeleteOptions>(&request.opts) {
Ok(opts) => opts,
Err(_) => {
return Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk.delete_versions(&request.volume, versions, opts).await {
Ok(errors) => {
let errors = errors
.into_iter()
.map(|error| match error {
Some(e) => e.to_string(),
None => "".to_string(),
})
.collect();
Ok(tonic::Response::new(DeleteVersionsResponse {
success: true,
errors,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_multiple(&self, request: Request<ReadMultipleRequest>) -> Result<Response<ReadMultipleResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let read_multiple_req = match serde_json::from_str::<ReadMultipleReq>(&request.read_multiple_req) {
Ok(read_multiple_req) => read_multiple_req,
Err(_) => {
return Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some("can not decode ReadMultipleReq".to_string()),
}));
}
};
match disk.read_multiple(read_multiple_req).await {
Ok(read_multiple_resps) => {
let read_multiple_resps = read_multiple_resps
.into_iter()
.filter_map(|read_multiple_resp| serde_json::to_string(&read_multiple_resp).ok())
.collect();
Ok(tonic::Response::new(ReadMultipleResponse {
success: true,
read_multiple_resps,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn delete_volume(&self, request: Request<DeleteVolumeRequest>) -> Result<Response<DeleteVolumeResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.delete_volume(&request.volume).await {
Ok(_) => Ok(tonic::Response::new(DeleteVolumeResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DeleteVolumeResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DeleteVolumeResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
}

View File

@@ -1,16 +1,25 @@
mod config;
mod grpc;
mod service;
mod storage;
use clap::Parser;
use ecstore::error::Result;
use ecstore::{
endpoints::EndpointServerPools,
error::Result,
store::{init_local_disks, update_erasure_type, ECStore},
};
use grpc::make_server;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
service::TowerToHyperService,
};
use s3s::{auth::SimpleAuth, service::S3ServiceBuilder};
use service::hybrid;
use std::{io::IsTerminal, net::SocketAddr, str::FromStr};
use tokio::net::TcpListener;
use tracing::{debug, info};
use tracing::{debug, info, warn};
use tracing_error::ErrorLayer;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
@@ -66,10 +75,19 @@ async fn run(opt: config::Opt) -> Result<()> {
// })
// };
// 用于rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())?;
update_erasure_type(setup_type).await;
// 初始化本地磁盘
init_local_disks(endpoint_pools.clone()).await?;
// Setup S3 service
// 本项目使用s3s库来实现s3服务
let service = {
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?);
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), endpoint_pools).await?);
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new());
//设置AK和SK
//其中部份内容从config配置文件中读取
let mut access_key = String::from_str(config::DEFAULT_ACCESS_KEY).unwrap();
@@ -101,44 +119,59 @@ async fn run(opt: config::Opt) -> Result<()> {
b.build()
};
let hyper_service = service.into_shared();
let rpc_service = make_server();
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
tokio::spawn(async move {
let hyper_service = service.into_shared();
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service));
info!("server is running at http://{local_addr}");
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
info!("server is running at http://{local_addr}");
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};
_ = ctrl_c.as_mut() => {
break;
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
});
warn!(" init store");
// init store
ECStore::new(opt.address.clone(), endpoint_pools.clone()).await?;
warn!(" init store success!");
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
_ = tokio::signal::ctrl_c() => {
}
}

150
rustfs/src/service.rs Normal file
View File

@@ -0,0 +1,150 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Future;
use http_body::Frame;
use hyper::body::Incoming;
use hyper::{Request, Response};
use pin_project_lite::pin_project;
use tower::Service;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Generate a [`HybridService`]
pub(crate) fn hybrid<MakeRest, Grpc>(make_rest: MakeRest, grpc: Grpc) -> HybridService<MakeRest, Grpc> {
HybridService { rest: make_rest, grpc }
}
/// The service that can serve both gRPC and REST HTTP Requests
#[derive(Clone)]
pub struct HybridService<Rest, Grpc> {
rest: Rest,
grpc: Grpc,
}
impl<Rest, Grpc, RestBody, GrpcBody> Service<Request<Incoming>> for HybridService<Rest, Grpc>
where
Rest: Service<Request<Incoming>, Response = Response<RestBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Rest::Error: Into<BoxError>,
Grpc::Error: Into<BoxError>,
{
type Response = Response<HybridBody<RestBody, GrpcBody>>;
type Error = BoxError;
type Future = HybridFuture<Rest::Future, Grpc::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.rest.poll_ready(cx) {
Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
}
}
/// When calling the service, gRPC is served if the HTTP request version is HTTP/2
/// and if the Content-Type is "application/grpc"; otherwise, the request is served
/// as a REST request
fn call(&mut self, req: Request<Incoming>) -> Self::Future {
match (req.version(), req.headers().get(hyper::header::CONTENT_TYPE)) {
(hyper::Version::HTTP_2, Some(hv)) if hv.as_bytes().starts_with(b"application/grpc") => HybridFuture::Grpc {
grpc_future: self.grpc.call(req),
},
_ => HybridFuture::Rest {
rest_future: self.rest.call(req),
},
}
}
}
pin_project! {
/// A hybrid HTTP body that will be used in the response type for the
/// [`HybridFuture`], i.e., the output of the [`HybridService`]
#[project = HybridBodyProj]
pub enum HybridBody<RestBody, GrpcBody> {
Rest {
#[pin]
rest_body: RestBody
},
Grpc {
#[pin]
grpc_body: GrpcBody
},
}
}
impl<RestBody, GrpcBody> http_body::Body for HybridBody<RestBody, GrpcBody>
where
RestBody: http_body::Body + Send + Unpin,
GrpcBody: http_body::Body<Data = RestBody::Data> + Send + Unpin,
RestBody::Error: Into<BoxError>,
GrpcBody::Error: Into<BoxError>,
{
type Data = RestBody::Data;
type Error = BoxError;
fn is_end_stream(&self) -> bool {
match self {
Self::Rest { rest_body } => rest_body.is_end_stream(),
Self::Grpc { grpc_body } => grpc_body.is_end_stream(),
}
}
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.project() {
HybridBodyProj::Rest { rest_body } => rest_body.poll_frame(cx).map_err(Into::into),
HybridBodyProj::Grpc { grpc_body } => grpc_body.poll_frame(cx).map_err(Into::into),
}
}
fn size_hint(&self) -> http_body::SizeHint {
match self {
Self::Rest { rest_body } => rest_body.size_hint(),
Self::Grpc { grpc_body } => grpc_body.size_hint(),
}
}
}
pin_project! {
/// A future that accepts an HTTP request as input and returns an HTTP
/// response as output for the [`HybridService`]
#[project = HybridFutureProj]
pub enum HybridFuture<RestFuture, GrpcFuture> {
Rest {
#[pin]
rest_future: RestFuture,
},
Grpc {
#[pin]
grpc_future: GrpcFuture,
}
}
}
impl<RestFuture, GrpcFuture, RestBody, GrpcBody, RestError, GrpcError> Future for HybridFuture<RestFuture, GrpcFuture>
where
RestFuture: Future<Output = Result<Response<RestBody>, RestError>>,
GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
RestError: Into<BoxError>,
GrpcError: Into<BoxError>,
{
type Output = Result<Response<HybridBody<RestBody, GrpcBody>>, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
HybridFutureProj::Rest { rest_future } => match rest_future.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|rest_body| HybridBody::Rest { rest_body }))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
HybridFutureProj::Grpc { grpc_future } => match grpc_future.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|grpc_body| HybridBody::Grpc { grpc_body }))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
}
}
}

View File

@@ -1,5 +1,6 @@
use bytes::Bytes;
use ecstore::disk::error::DiskError;
use ecstore::store::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
use ecstore::store_api::DeleteBucketOptions;
@@ -22,12 +23,10 @@ use s3s::S3;
use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use tracing::warn;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
use ecstore::error::Result;
use ecstore::store::ECStore;
use tracing::debug;
macro_rules! try_ {
@@ -43,13 +42,13 @@ macro_rules! try_ {
#[derive(Debug)]
pub struct FS {
pub store: ECStore,
// pub store: ECStore,
}
impl FS {
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
let store: ECStore = ECStore::new(address, endpoints).await?;
Ok(Self { store })
pub fn new() -> Self {
// let store: ECStore = ECStore::new(address, endpoint_pools).await?;
Self {}
}
}
#[async_trait::async_trait]
@@ -62,8 +61,15 @@ impl S3 for FS {
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let input = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
try_!(
self.store
store
.make_bucket(&input.bucket, &MakeBucketOptions { force_create: true })
.await
);
@@ -88,8 +94,14 @@ impl S3 for FS {
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let input = req.input;
// TODO: DeleteBucketInput 没有force参数
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
try_!(
self.store
store
.delete_bucket(&input.bucket, &DeleteBucketOptions { force: false })
.await
);
@@ -117,7 +129,13 @@ impl S3 for FS {
let objects: Vec<ObjectToDelete> = vec![dobj];
let (dobjs, _errs) = try_!(self.store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let (dobjs, _errs) = try_!(store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
// TODO: let errors;
@@ -178,8 +196,15 @@ impl S3 for FS {
})
.collect();
let (dobjs, errs) = try_!(self.store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
warn!("delete_objects res {:?} {:?}", &dobjs, errs);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let (dobjs, _errs) = try_!(store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
// info!("delete_objects res {:?} {:?}", &dobjs, errs);
let deleted = dobjs
.iter()
@@ -212,7 +237,14 @@ impl S3 for FS {
// mc get 1
let input = req.input;
if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
@@ -248,11 +280,14 @@ impl S3 for FS {
let h = HeaderMap::new();
let opts = &ObjectOptions::default();
let reader = try_!(
self.store
.get_object_reader(bucket.as_str(), key.as_str(), range, h, opts)
.await
);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let reader = try_!(store.get_object_reader(bucket.as_str(), key.as_str(), range, h, opts).await);
let info = reader.object_info;
@@ -275,7 +310,14 @@ impl S3 for FS {
async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
let input = req.input;
if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
@@ -292,7 +334,14 @@ impl S3 for FS {
// mc get 2
let HeadObjectInput { bucket, key, .. } = req.input;
let info = try_!(self.store.get_object_info(&bucket, &key, &ObjectOptions::default()).await);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let info = try_!(store.get_object_info(&bucket, &key, &ObjectOptions::default()).await);
debug!("info {:?}", info);
let content_type = try_!(ContentType::from_str("application/x-msdownload"));
@@ -312,7 +361,14 @@ impl S3 for FS {
async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
// mc ls
let bucket_infos = try_!(self.store.list_bucket(&BucketOptions {}).await);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let bucket_infos = try_!(store.list_bucket(&BucketOptions {}).await);
let buckets: Vec<Bucket> = bucket_infos
.iter()
@@ -360,8 +416,15 @@ impl S3 for FS {
let prefix = prefix.unwrap_or_default();
let delimiter = delimiter.unwrap_or_default();
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let object_infos = try_!(
self.store
store
.list_objects_v2(
&bucket,
&prefix,
@@ -440,9 +503,16 @@ impl S3 for FS {
let reader = PutObjReader::new(body, content_length as usize);
try_!(self.store.put_object(&bucket, &key, reader, &ObjectOptions::default()).await);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
// self.store.put_object(bucket, object, data, opts);
try_!(store.put_object(&bucket, &key, reader, &ObjectOptions::default()).await);
// store.put_object(bucket, object, data, opts);
let output = PutObjectOutput { ..Default::default() };
Ok(S3Response::new(output))
@@ -461,11 +531,15 @@ impl S3 for FS {
debug!("create_multipart_upload meta {:?}", &metadata);
let MultipartUploadResult { upload_id, .. } = try_!(
self.store
.new_multipart_upload(&bucket, &key, &ObjectOptions::default())
.await
);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let MultipartUploadResult { upload_id, .. } =
try_!(store.new_multipart_upload(&bucket, &key, &ObjectOptions::default()).await);
let output = CreateMultipartUploadOutput {
bucket: Some(bucket),
@@ -500,11 +574,14 @@ impl S3 for FS {
let data = PutObjReader::new(body, content_length as usize);
let opts = ObjectOptions::default();
try_!(
self.store
.put_object_part(&bucket, &key, &upload_id, part_id, data, &opts)
.await
);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
try_!(store.put_object_part(&bucket, &key, &upload_id, part_id, data, &opts).await);
let output = UploadPartOutput { ..Default::default() };
Ok(S3Response::new(output))
@@ -559,8 +636,15 @@ impl S3 for FS {
uploaded_parts.push(CompletePart::from(part));
}
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
try_!(
self.store
store
.complete_multipart_upload(&bucket, &key, &upload_id, uploaded_parts, opts)
.await
);
@@ -582,9 +666,16 @@ impl S3 for FS {
bucket, key, upload_id, ..
} = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let opts = &ObjectOptions::default();
try_!(
self.store
store
.abort_multipart_upload(bucket.as_str(), key.as_str(), upload_id.as_str(), opts)
.await
);