diff --git a/Cargo.lock b/Cargo.lock index ce909a40..f283234a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -430,6 +430,7 @@ dependencies = [ "openssl", "path-absolutize", "path-clean", + "protos", "reed-solomon-erasure", "regex", "rmp", @@ -444,6 +445,8 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tonic", + "tower", "tracing", "tracing-error", "transform-stream", diff --git a/Cargo.toml b/Cargo.toml index 0ec2e78d..ca8430b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread"] } tonic = { version = "0.12.1", features = ["gzip"] } tonic-build = "0.12.1" tonic-reflection = "0.12" -tower = "0.4.13" +tower = { version = "0.4.13", features = ["timeout"] } tracing = "0.1.40" tracing-error = "0.2.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } diff --git a/common/protos/build.rs b/common/protos/build.rs index 35efacb7..5ef32dde 100644 --- a/common/protos/build.rs +++ b/common/protos/build.rs @@ -40,8 +40,8 @@ fn main() -> Result<(), AnyError> { let project_root_dir = env::current_dir()?; let proto_dir = project_root_dir.join("src"); let proto_files = &["node.proto"]; - let proto_out_dir = project_root_dir.join("src").join("proto_gen"); - let flatbuffer_out_dir = project_root_dir.join("src").join("flatbuffers_generated"); + let proto_out_dir = project_root_dir.join("src").join("generated").join("proto_gen"); + let flatbuffer_out_dir = project_root_dir.join("src").join("generated").join("flatbuffers_generated"); let descriptor_set_path = PathBuf::from(env::var(ENV_OUT_DIR).unwrap()).join("proto-descriptor.bin"); tonic_build::configure() @@ -54,13 +54,17 @@ fn main() -> Result<(), AnyError> { .map_err(|e| format!("Failed to generate protobuf file: {e}."))?; // protos/gen/mod.rs - let generated_mod_rs_path = project_root_dir.join("src").join("proto_gen").join("mod.rs"); + let generated_mod_rs_path = project_root_dir + .join("src") + .join("generated") + .join("proto_gen") + .join("mod.rs"); let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?; writeln!(&mut generated_mod_rs, "pub mod node_service;")?; generated_mod_rs.flush()?; - let generated_mod_rs_path = project_root_dir.join("src").join("lib.rs"); + let generated_mod_rs_path = project_root_dir.join("src").join("generated").join("mod.rs"); let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?; writeln!(&mut generated_mod_rs, "#![allow(unused_imports)]")?; diff --git a/common/protos/src/flatbuffers_generated/mod.rs b/common/protos/src/generated/flatbuffers_generated/mod.rs similarity index 100% rename from common/protos/src/flatbuffers_generated/mod.rs rename to common/protos/src/generated/flatbuffers_generated/mod.rs diff --git a/common/protos/src/flatbuffers_generated/models.rs b/common/protos/src/generated/flatbuffers_generated/models.rs similarity index 100% rename from common/protos/src/flatbuffers_generated/models.rs rename to common/protos/src/generated/flatbuffers_generated/models.rs diff --git a/common/protos/src/generated/mod.rs b/common/protos/src/generated/mod.rs new file mode 100644 index 00000000..4ab5a438 --- /dev/null +++ b/common/protos/src/generated/mod.rs @@ -0,0 +1,6 @@ +#![allow(unused_imports)] +#![allow(clippy::all)] +pub mod proto_gen; + +mod flatbuffers_generated; +pub use flatbuffers_generated::models::*; diff --git a/common/protos/src/proto_gen/mod.rs b/common/protos/src/generated/proto_gen/mod.rs similarity index 100% rename from common/protos/src/proto_gen/mod.rs rename to common/protos/src/generated/proto_gen/mod.rs diff --git a/common/protos/src/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs similarity index 74% rename from common/protos/src/proto_gen/node_service.rs rename to common/protos/src/generated/proto_gen/node_service.rs index 463aac82..55b5b01a 100644 --- a/common/protos/src/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -16,6 +16,28 @@ pub struct PingResponse { #[prost(bytes = "vec", tag = "2")] pub body: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct MakeBucketOptions { + #[prost(bool, tag = "1")] + pub force_create: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MakeBucketRequest { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub options: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MakeBucketResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} /// Generated client implementations. pub mod node_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] @@ -109,6 +131,21 @@ pub mod node_service_client { .insert(GrpcMethod::new("node_service.NodeService", "Ping")); self.inner.unary(req, path, codec).await } + pub async fn make_bucket( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeBucket"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "MakeBucket")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -122,6 +159,10 @@ pub mod node_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn make_bucket( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct NodeServiceServer { @@ -221,6 +262,34 @@ pub mod node_service_server { }; Box::pin(fut) } + "/node_service.NodeService/MakeBucket" => { + #[allow(non_camel_case_types)] + struct MakeBucketSvc(pub Arc); + impl tonic::server::UnaryService for MakeBucketSvc { + type Response = super::MakeBucketResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::make_bucket(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = MakeBucketSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { Ok(http::Response::builder() .status(200) diff --git a/common/protos/src/lib.rs b/common/protos/src/lib.rs index 4ab5a438..639f9b06 100644 --- a/common/protos/src/lib.rs +++ b/common/protos/src/lib.rs @@ -1,6 +1,28 @@ -#![allow(unused_imports)] -#![allow(clippy::all)] -pub mod proto_gen; +mod generated; +use std::time::Duration; -mod flatbuffers_generated; -pub use flatbuffers_generated::models::*; +pub use generated::*; +use proto_gen::node_service::node_service_client::NodeServiceClient; +use tonic::{codec::CompressionEncoding, transport::Channel}; +use tower::timeout::Timeout; + +// Default 100 MB +pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024; + +pub fn node_service_time_out_client( + channel: Channel, + time_out: Duration, + max_message_size: usize, + grpc_enable_gzip: bool, +) -> NodeServiceClient> { + let timeout_channel = Timeout::new(channel, time_out); + let client = NodeServiceClient::>::new(timeout_channel); + let client = NodeServiceClient::max_decoding_message_size(client, max_message_size); + if grpc_enable_gzip { + NodeServiceClient::max_encoding_message_size(client, max_message_size) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip) + } else { + NodeServiceClient::max_encoding_message_size(client, max_message_size) + } +} diff --git a/common/protos/src/node.proto b/common/protos/src/node.proto index 879eb97f..c3d6cd29 100644 --- a/common/protos/src/node.proto +++ b/common/protos/src/node.proto @@ -12,8 +12,23 @@ message PingResponse { bytes body = 2; } +message MakeBucketOptions { + bool force_create = 1; +} + +message MakeBucketRequest { + string name = 1; + MakeBucketOptions options = 2; +} + +message MakeBucketResponse { + bool success = 1; + optional string error_info = 2; +} + /* -------------------------------------------------------------------- */ service NodeService { rpc Ping(PingRequest) returns (PingResponse) {}; + rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {}; } \ No newline at end of file diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml new file mode 100644 index 00000000..a1e30a55 --- /dev/null +++ b/coordinator/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "e2e_test" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +flatbuffers.workspace = true +protos.workspace = true +tonic = { version = "0.12.1", features = ["gzip"] } +tokio = { workspace = true } \ No newline at end of file diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs new file mode 100644 index 00000000..10439ca7 --- /dev/null +++ b/coordinator/src/lib.rs @@ -0,0 +1,5 @@ +#[async_trait::async_trait] +pub trait Coordinator: Send + Sync { + async fn ping(); + async fn create_bucket() -> Result<> +} \ No newline at end of file diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index ac8e1948..de373425 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -28,6 +28,7 @@ lazy_static = "1.5.0" regex = "1.10.5" netif = "0.1.6" path-absolutize = "3.1.1" +protos.workspace = true rmp-serde = "1.3.0" tokio-util = { version = "0.7.11", features = ["io"] } s3s = "0.10.0" @@ -38,6 +39,8 @@ sha2 = "0.10.8" hex-simd = "0.8.0" path-clean = "1.0.1" tokio-stream = "0.1.15" +tonic.workspace = true +tower.workspace = true rmp = "0.8.14" byteorder = "1.5.0" xxhash-rust = { version = "0.8.12", features = ["xxh64"] } diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 7329923c..0c9dd58e 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -1,7 +1,14 @@ use async_trait::async_trait; use futures::future::join_all; +use protos::proto_gen::node_service::MakeBucketRequest; +use protos::{ + node_service_time_out_client, proto_gen::node_service::MakeBucketOptions as proto_MakeBucketOptions, + DEFAULT_GRPC_SERVER_MESSAGE_LEN, +}; use regex::Regex; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; +use tonic::transport::{Channel, Endpoint}; +use tonic::Request; use tracing::warn; use crate::{ @@ -56,9 +63,8 @@ impl S3PeerSys { } } -#[async_trait] -impl PeerS3Client for S3PeerSys { - async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { +impl S3PeerSys { + pub async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { let mut futures = Vec::with_capacity(self.clients.len()); for cli in self.clients.iter() { futures.push(cli.make_bucket(bucket, opts)); @@ -95,7 +101,7 @@ impl PeerS3Client for S3PeerSys { Ok(()) } - async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + pub async fn list_bucket(&self, opts: &BucketOptions) -> Result> { let mut futures = Vec::with_capacity(self.clients.len()); for cli in self.clients.iter() { futures.push(cli.list_bucket(opts)); @@ -140,7 +146,7 @@ impl PeerS3Client for S3PeerSys { Ok(buckets) } - async fn delete_bucket(&self, bucket: &str) -> Result<()> { + pub async fn delete_bucket(&self, bucket: &str) -> Result<()> { let mut futures = Vec::with_capacity(self.clients.len()); for cli in self.clients.iter() { futures.push(cli.delete_bucket(bucket)); @@ -165,7 +171,7 @@ impl PeerS3Client for S3PeerSys { Ok(()) } - async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result { + pub async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result { let mut futures = Vec::with_capacity(self.clients.len()); for cli in self.clients.iter() { futures.push(cli.get_bucket_info(bucket, opts)); @@ -206,7 +212,7 @@ impl PeerS3Client for S3PeerSys { .ok_or(Error::new(DiskError::VolumeNotFound)) } - fn get_pools(&self) -> Vec { + pub fn get_pools(&self) -> Vec { unimplemented!() } } @@ -378,27 +384,50 @@ impl PeerS3Client for LocalPeerS3Client { #[derive(Debug)] pub struct RemotePeerS3Client { - // pub node: Node, - // pub pools: Vec, + pub _node: Node, + pub pools: Vec, + pub channel: Channel, } impl RemotePeerS3Client { - fn new(_node: Node, _pools: Vec) -> Self { - // Self { node, pools } - Self {} + fn new(node: Node, pools: Vec) -> Self { + let connector = Endpoint::from_shared(format!("{}", node.url.clone().as_str())).unwrap(); + let channel = tokio::runtime::Runtime::new().unwrap().block_on(connector.connect()).unwrap(); + Self { + _node: node, + pools, + channel, + } } } #[async_trait] impl PeerS3Client for RemotePeerS3Client { fn get_pools(&self) -> Vec { - unimplemented!() + self.pools.clone() } async fn list_bucket(&self, _opts: &BucketOptions) -> Result> { unimplemented!() } - async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> { - unimplemented!() + async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + let mut client = node_service_time_out_client( + self.channel.clone(), + Duration::new(30, 0), // TODO: use config setting + DEFAULT_GRPC_SERVER_MESSAGE_LEN, + // grpc_enable_gzip, + false, // TODO: use config setting + ); + let request = Request::new(MakeBucketRequest { + name: bucket.to_string(), + options: Some(proto_MakeBucketOptions { + force_create: opts.force_create, + }), + }); + let _response = client.make_bucket(request).await?.into_inner(); + + // TODO: deal with error + + Ok(()) } async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result { unimplemented!() diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 6f9b2ae7..63403b7e 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -5,7 +5,7 @@ use protos::{ models::{PingBody, PingBodyBuilder}, proto_gen::node_service::{ node_service_server::{NodeService as Node, NodeServiceServer as NodeServer}, - PingRequest, PingResponse, + MakeBucketRequest, MakeBucketResponse, PingRequest, PingResponse, }, }; @@ -44,4 +44,12 @@ impl Node for NodeService { body: finished_data.to_vec(), })) } + + async fn make_bucket(&self, request: Request) -> Result, Status> { + debug!("make bucket"); + + let req = request.into_inner(); + + unimplemented!() + } }