remote make bucket

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-08-27 16:43:12 +08:00
parent 0a373bc260
commit 78f3944b5d
15 changed files with 206 additions and 27 deletions

3
Cargo.lock generated
View File

@@ -430,6 +430,7 @@ dependencies = [
"openssl",
"path-absolutize",
"path-clean",
"protos",
"reed-solomon-erasure",
"regex",
"rmp",
@@ -444,6 +445,8 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic",
"tower",
"tracing",
"tracing-error",
"transform-stream",

View File

@@ -49,7 +49,7 @@ tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-build = "0.12.1"
tonic-reflection = "0.12"
tower = "0.4.13"
tower = { version = "0.4.13", features = ["timeout"] }
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }

View File

@@ -40,8 +40,8 @@ fn main() -> Result<(), AnyError> {
let project_root_dir = env::current_dir()?;
let proto_dir = project_root_dir.join("src");
let proto_files = &["node.proto"];
let proto_out_dir = project_root_dir.join("src").join("proto_gen");
let flatbuffer_out_dir = project_root_dir.join("src").join("flatbuffers_generated");
let proto_out_dir = project_root_dir.join("src").join("generated").join("proto_gen");
let flatbuffer_out_dir = project_root_dir.join("src").join("generated").join("flatbuffers_generated");
let descriptor_set_path = PathBuf::from(env::var(ENV_OUT_DIR).unwrap()).join("proto-descriptor.bin");
tonic_build::configure()
@@ -54,13 +54,17 @@ fn main() -> Result<(), AnyError> {
.map_err(|e| format!("Failed to generate protobuf file: {e}."))?;
// protos/gen/mod.rs
let generated_mod_rs_path = project_root_dir.join("src").join("proto_gen").join("mod.rs");
let generated_mod_rs_path = project_root_dir
.join("src")
.join("generated")
.join("proto_gen")
.join("mod.rs");
let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?;
writeln!(&mut generated_mod_rs, "pub mod node_service;")?;
generated_mod_rs.flush()?;
let generated_mod_rs_path = project_root_dir.join("src").join("lib.rs");
let generated_mod_rs_path = project_root_dir.join("src").join("generated").join("mod.rs");
let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?;
writeln!(&mut generated_mod_rs, "#![allow(unused_imports)]")?;

View File

@@ -0,0 +1,6 @@
#![allow(unused_imports)]
#![allow(clippy::all)]
pub mod proto_gen;
mod flatbuffers_generated;
pub use flatbuffers_generated::models::*;

View File

@@ -16,6 +16,28 @@ pub struct PingResponse {
#[prost(bytes = "vec", tag = "2")]
pub body: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct MakeBucketOptions {
#[prost(bool, tag = "1")]
pub force_create: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MakeBucketRequest {
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
#[prost(message, optional, tag = "2")]
pub options: ::core::option::Option<MakeBucketOptions>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MakeBucketResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(string, optional, tag = "2")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
/// Generated client implementations.
pub mod node_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
@@ -109,6 +131,21 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "Ping"));
self.inner.unary(req, path, codec).await
}
pub async fn make_bucket(
&mut self,
request: impl tonic::IntoRequest<super::MakeBucketRequest>,
) -> std::result::Result<tonic::Response<super::MakeBucketResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeBucket");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "MakeBucket"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
@@ -122,6 +159,10 @@ pub mod node_service_server {
&self,
request: tonic::Request<super::PingRequest>,
) -> std::result::Result<tonic::Response<super::PingResponse>, tonic::Status>;
async fn make_bucket(
&self,
request: tonic::Request<super::MakeBucketRequest>,
) -> std::result::Result<tonic::Response<super::MakeBucketResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct NodeServiceServer<T: NodeService> {
@@ -221,6 +262,34 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/MakeBucket" => {
#[allow(non_camel_case_types)]
struct MakeBucketSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::MakeBucketRequest> for MakeBucketSvc<T> {
type Response = super::MakeBucketResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::MakeBucketRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::make_bucket(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = MakeBucketSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)

View File

@@ -1,6 +1,28 @@
#![allow(unused_imports)]
#![allow(clippy::all)]
pub mod proto_gen;
mod generated;
use std::time::Duration;
mod flatbuffers_generated;
pub use flatbuffers_generated::models::*;
pub use generated::*;
use proto_gen::node_service::node_service_client::NodeServiceClient;
use tonic::{codec::CompressionEncoding, transport::Channel};
use tower::timeout::Timeout;
// Default 100 MB
pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024;
pub fn node_service_time_out_client(
channel: Channel,
time_out: Duration,
max_message_size: usize,
grpc_enable_gzip: bool,
) -> NodeServiceClient<Timeout<Channel>> {
let timeout_channel = Timeout::new(channel, time_out);
let client = NodeServiceClient::<Timeout<Channel>>::new(timeout_channel);
let client = NodeServiceClient::max_decoding_message_size(client, max_message_size);
if grpc_enable_gzip {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
} else {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
}
}

View File

@@ -12,8 +12,23 @@ message PingResponse {
bytes body = 2;
}
message MakeBucketOptions {
bool force_create = 1;
}
message MakeBucketRequest {
string name = 1;
MakeBucketOptions options = 2;
}
message MakeBucketResponse {
bool success = 1;
optional string error_info = 2;
}
/* -------------------------------------------------------------------- */
service NodeService {
rpc Ping(PingRequest) returns (PingResponse) {};
rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {};
}

15
coordinator/Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "e2e_test"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flatbuffers.workspace = true
protos.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }

5
coordinator/src/lib.rs Normal file
View File

@@ -0,0 +1,5 @@
#[async_trait::async_trait]
pub trait Coordinator: Send + Sync {
async fn ping();
async fn create_bucket() -> Result<>
}

View File

@@ -28,6 +28,7 @@ lazy_static = "1.5.0"
regex = "1.10.5"
netif = "0.1.6"
path-absolutize = "3.1.1"
protos.workspace = true
rmp-serde = "1.3.0"
tokio-util = { version = "0.7.11", features = ["io"] }
s3s = "0.10.0"
@@ -38,6 +39,8 @@ sha2 = "0.10.8"
hex-simd = "0.8.0"
path-clean = "1.0.1"
tokio-stream = "0.1.15"
tonic.workspace = true
tower.workspace = true
rmp = "0.8.14"
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.12", features = ["xxh64"] }

View File

@@ -1,7 +1,14 @@
use async_trait::async_trait;
use futures::future::join_all;
use protos::proto_gen::node_service::MakeBucketRequest;
use protos::{
node_service_time_out_client, proto_gen::node_service::MakeBucketOptions as proto_MakeBucketOptions,
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tonic::transport::{Channel, Endpoint};
use tonic::Request;
use tracing::warn;
use crate::{
@@ -56,9 +63,8 @@ impl S3PeerSys {
}
}
#[async_trait]
impl PeerS3Client for S3PeerSys {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
impl S3PeerSys {
pub async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.make_bucket(bucket, opts));
@@ -95,7 +101,7 @@ impl PeerS3Client for S3PeerSys {
Ok(())
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
pub async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.list_bucket(opts));
@@ -140,7 +146,7 @@ impl PeerS3Client for S3PeerSys {
Ok(buckets)
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
pub async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.delete_bucket(bucket));
@@ -165,7 +171,7 @@ impl PeerS3Client for S3PeerSys {
Ok(())
}
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
pub async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.get_bucket_info(bucket, opts));
@@ -206,7 +212,7 @@ impl PeerS3Client for S3PeerSys {
.ok_or(Error::new(DiskError::VolumeNotFound))
}
fn get_pools(&self) -> Vec<usize> {
pub fn get_pools(&self) -> Vec<usize> {
unimplemented!()
}
}
@@ -378,27 +384,50 @@ impl PeerS3Client for LocalPeerS3Client {
#[derive(Debug)]
pub struct RemotePeerS3Client {
// pub node: Node,
// pub pools: Vec<usize>,
pub _node: Node,
pub pools: Vec<usize>,
pub channel: Channel,
}
impl RemotePeerS3Client {
fn new(_node: Node, _pools: Vec<usize>) -> Self {
// Self { node, pools }
Self {}
fn new(node: Node, pools: Vec<usize>) -> Self {
let connector = Endpoint::from_shared(format!("{}", node.url.clone().as_str())).unwrap();
let channel = tokio::runtime::Runtime::new().unwrap().block_on(connector.connect()).unwrap();
Self {
_node: node,
pools,
channel,
}
}
}
#[async_trait]
impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Vec<usize> {
unimplemented!()
self.pools.clone()
}
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
unimplemented!()
}
async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> {
unimplemented!()
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options: Some(proto_MakeBucketOptions {
force_create: opts.force_create,
}),
});
let _response = client.make_bucket(request).await?.into_inner();
// TODO: deal with error
Ok(())
}
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
unimplemented!()

View File

@@ -5,7 +5,7 @@ use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
PingRequest, PingResponse,
MakeBucketRequest, MakeBucketResponse, PingRequest, PingResponse,
},
};
@@ -44,4 +44,12 @@ impl Node for NodeService {
body: finished_data.to_vec(),
}))
}
async fn make_bucket(&self, request: Request<MakeBucketRequest>) -> Result<Response<MakeBucketResponse>, Status> {
debug!("make bucket");
let req = request.into_inner();
unimplemented!()
}
}