multiplex channel && rpc service add authentication

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-09-27 16:29:40 +08:00
parent a65b074e9f
commit 04ab9d75a9
15 changed files with 1590 additions and 606 deletions

4
Cargo.lock generated
View File

@@ -368,6 +368,7 @@ version = "0.0.1"
dependencies = [
"lazy_static",
"tokio",
"tonic",
"tracing-error",
]
@@ -435,11 +436,13 @@ version = "0.0.1"
dependencies = [
"ecstore",
"flatbuffers",
"lazy_static",
"lock",
"protos",
"serde_json",
"tokio",
"tonic",
"tower",
"url",
]
@@ -1423,6 +1426,7 @@ dependencies = [
name = "protos"
version = "0.0.1"
dependencies = [
"common",
"flatbuffers",
"prost",
"prost-build",

View File

@@ -6,4 +6,5 @@ edition.workspace = true
[dependencies]
lazy_static.workspace = true
tokio.workspace = true
tonic.workspace = true
tracing-error.workspace = true

View File

@@ -1,8 +1,12 @@
use std::collections::HashMap;
use lazy_static::lazy_static;
use tokio::sync::RwLock;
use tonic::transport::Channel;
lazy_static! {
pub static ref GLOBAL_Local_Node_Name: RwLock<String> = RwLock::new("".to_string());
pub static ref GLOBAL_Rustfs_Host: RwLock<String> = RwLock::new("".to_string());
pub static ref GLOBAL_Rustfs_Port: RwLock<String> = RwLock::new("9000".to_string());
pub static ref GLOBAL_Conn_Map: RwLock<HashMap<String, Channel>> = RwLock::new(HashMap::new());
}

View File

@@ -1,6 +1,6 @@
use async_trait::async_trait;
use common::error::{Error, Result};
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest};
use protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
use tonic::Request;
use tracing::info;
@@ -8,18 +8,13 @@ use crate::{lock_args::LockArgs, Locker};
#[derive(Debug, Clone)]
pub struct RemoteClient {
url: url::Url,
addr: String,
}
impl RemoteClient {
pub fn new(url: url::Url) -> Self {
Self { url }
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
Ok(NodeServiceClient::connect(addr).await?)
let addr = format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap());
Self { addr }
}
}
@@ -28,7 +23,9 @@ impl Locker for RemoteClient {
async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote lock");
let args = serde_json::to_string(args)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.lock(request).await?.into_inner();
@@ -43,7 +40,9 @@ impl Locker for RemoteClient {
async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote unlock");
let args = serde_json::to_string(args)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.un_lock(request).await?.into_inner();
@@ -58,7 +57,9 @@ impl Locker for RemoteClient {
async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote rlock");
let args = serde_json::to_string(args)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.r_lock(request).await?.into_inner();
@@ -73,7 +74,9 @@ impl Locker for RemoteClient {
async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote runlock");
let args = serde_json::to_string(args)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.r_un_lock(request).await?.into_inner();
@@ -88,7 +91,9 @@ impl Locker for RemoteClient {
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote force_unlock");
let args = serde_json::to_string(args)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.force_un_lock(request).await?.into_inner();
@@ -103,7 +108,9 @@ impl Locker for RemoteClient {
async fn refresh(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote refresh");
let args = serde_json::to_string(args)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.refresh(request).await?.into_inner();

View File

@@ -5,6 +5,7 @@ edition.workspace = true
[dependencies]
#async-backtrace = { workspace = true, optional = true }
common.workspace = true
flatbuffers = { workspace = true }
prost = { workspace = true }
protobuf = { workspace = true }

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -1,28 +1,44 @@
mod generated;
use std::time::Duration;
use std::{error::Error, time::Duration};
use common::globals::GLOBAL_Conn_Map;
pub use generated::*;
use proto_gen::node_service::node_service_client::NodeServiceClient;
use tonic::{codec::CompressionEncoding, transport::Channel};
use tower::timeout::Timeout;
use tonic::{
metadata::MetadataValue,
service::interceptor::InterceptedService,
transport::{Channel, Endpoint},
Request, Status,
};
// Default 100 MB
pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024;
pub fn node_service_time_out_client(
channel: Channel,
time_out: Duration,
max_message_size: usize,
grpc_enable_gzip: bool,
) -> NodeServiceClient<Timeout<Channel>> {
let timeout_channel = Timeout::new(channel, time_out);
let client = NodeServiceClient::<Timeout<Channel>>::new(timeout_channel);
let client = NodeServiceClient::max_decoding_message_size(client, max_message_size);
if grpc_enable_gzip {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
} else {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
}
pub async fn node_service_time_out_client(
addr: &String,
) -> Result<
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
Box<dyn Error>,
> {
let token: MetadataValue<_> = "rustfs rpc".parse()?;
let channel = match GLOBAL_Conn_Map.read().await.get(addr) {
Some(channel) => channel.clone(),
None => {
let connector = Endpoint::from_shared(addr.to_string())?.connect_timeout(Duration::from_secs(60));
let channel = connector.connect().await?;
channel
}
};
GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone());
// let timeout_channel = Timeout::new(channel, Duration::from_secs(60));
Ok(NodeServiceClient::with_interceptor(
channel,
Box::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
}),
))
}

View File

@@ -11,9 +11,11 @@ rust-version.workspace = true
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
lazy_static.workspace = true
lock.workspace = true
protos.workspace = true
serde_json.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }
tower.workspace = true
url.workspace = true

View File

@@ -1,7 +1,8 @@
#![cfg(test)]
use std::{error::Error, sync::Arc, time::Duration, vec};
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration, vec};
use lazy_static::lazy_static;
use lock::{
drwmutex::Options,
lock_args::LockArgs,
@@ -10,11 +11,44 @@ use lock::{
};
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest};
use tokio::sync::RwLock;
use tonic::Request;
use tonic::{
metadata::MetadataValue,
service::interceptor::InterceptedService,
transport::{Channel, Endpoint},
Request, Status,
};
async fn get_client() -> Result<NodeServiceClient<tonic::transport::Channel>, Box<dyn Error>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
Ok(NodeServiceClient::connect("http://localhost:9000").await?)
lazy_static! {
pub static ref GLOBAL_Conn_Map: RwLock<HashMap<String, Channel>> = RwLock::new(HashMap::new());
}
async fn get_client() -> Result<
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
Box<dyn Error>,
> {
let token: MetadataValue<_> = "rustfs rpc".parse()?;
let channel = match GLOBAL_Conn_Map.read().await.get("local") {
Some(channel) => channel.clone(),
None => {
println!("get channel start");
let connector = Endpoint::from_static("http://localhost:9000").connect_timeout(Duration::from_secs(60));
let channel = connector.connect().await?;
// let channel = Channel::from_static("http://localhost:9000").connect().await?;
channel
}
};
GLOBAL_Conn_Map.write().await.insert("local".to_owned(), channel.clone());
// let timeout_channel = Timeout::new(channel, Duration::from_secs(60));
Ok(NodeServiceClient::with_interceptor(
channel,
Box::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
}),
))
}
#[tokio::test]
@@ -29,12 +63,22 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
let args = serde_json::to_string(&args)?;
let mut client = get_client().await?;
let request = Request::new(GenerallyLockRequest { args });
println!("got client");
let request = Request::new(GenerallyLockRequest { args: args.clone() });
println!("start request");
let response = client.lock(request).await?.into_inner();
println!("request ended");
if let Some(error_info) = response.error_info {
assert!(false, "can not get lock: {}", error_info);
}
let request = Request::new(GenerallyLockRequest { args });
let response = client.un_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
assert!(false, "can not get un_lock: {}", error_info);
}
Ok(())
}

View File

@@ -32,7 +32,7 @@ use tokio::{
sync::mpsc::{self, Sender},
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Channel, Streaming};
use tonic::{service::interceptor::InterceptedService, transport::Channel, Request, Status, Streaming};
use tracing::info;
use uuid::Uuid;
@@ -365,7 +365,9 @@ impl RemoteFileWriter {
volume: String,
path: String,
is_append: bool,
mut client: NodeServiceClient<Channel>,
mut client: NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
@@ -480,7 +482,14 @@ pub struct RemoteFileReader {
}
impl RemoteFileReader {
pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeServiceClient<Channel>) -> Result<Self> {
pub async fn new(
root: PathBuf,
volume: String,
path: String,
mut client: NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc, time::Duration};
use std::path::PathBuf;
use bytes::Bytes;
use common::error::{Error, Result};
@@ -6,19 +6,12 @@ use futures::lock::Mutex;
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
node_service_client::NodeServiceClient, DeleteRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest,
ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest,
ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest,
WriteMetadataRequest,
DeleteRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest,
MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest,
RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
},
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use tokio::sync::RwLock;
use tonic::{
transport::{Channel, Endpoint as tonic_Endpoint},
Request,
};
use tower::timeout::Timeout;
use tonic::Request;
use tracing::info;
use uuid::Uuid;
@@ -36,7 +29,7 @@ use super::{
#[derive(Debug)]
pub struct RemoteDisk {
pub id: Mutex<Option<Uuid>>,
pub channel: Arc<RwLock<Option<Channel>>>,
pub addr: String,
pub url: url::Url,
pub root: PathBuf,
}
@@ -45,52 +38,14 @@ impl RemoteDisk {
pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result<Self> {
// let root = fs::canonicalize(ep.url.path()).await?;
let root = PathBuf::from(ep.url.path());
let addr = format!("{}://{}:{}", ep.url.scheme(), ep.url.host_str().unwrap(), ep.url.port().unwrap());
Ok(Self {
channel: Arc::new(RwLock::new(None)),
id: Mutex::new(None),
addr,
url: ep.url.clone(),
root,
id: Mutex::new(None),
})
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
info!("disk url: {}", addr);
let connector = tonic_Endpoint::from_shared(addr.clone())?;
let new_channel = connector.connect().await.map_err(|_err| DiskError::DiskNotFound)?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
Ok(NodeServiceClient::connect(addr).await?)
}
}
// TODO: all api need to handle errors
@@ -118,7 +73,9 @@ impl DiskAPI for RemoteDisk {
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
info!("read_all");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -138,7 +95,9 @@ impl DiskAPI for RemoteDisk {
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
info!("write_all");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(WriteAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -158,7 +117,9 @@ impl DiskAPI for RemoteDisk {
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
info!("delete");
let options = serde_json::to_string(&opt)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -177,7 +138,9 @@ impl DiskAPI for RemoteDisk {
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
info!("rename_file");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(RenameFileRequst {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -203,7 +166,9 @@ impl DiskAPI for RemoteDisk {
volume.to_string(),
path.to_string(),
false,
self.get_client_v2().await?,
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
)
.await?,
))
@@ -212,21 +177,39 @@ impl DiskAPI for RemoteDisk {
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
Ok(FileWriter::Remote(
RemoteFileWriter::new(self.root.clone(), volume.to_string(), path.to_string(), true, self.get_client_v2().await?)
.await?,
RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
)
.await?,
))
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(FileReader::Remote(
RemoteFileReader::new(self.root.clone(), volume.to_string(), path.to_string(), self.get_client_v2().await?).await?,
RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
)
.await?,
))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
info!("list_dir");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ListDirRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -244,7 +227,9 @@ impl DiskAPI for RemoteDisk {
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
info!("walk_dir");
let walk_dir_options = serde_json::to_string(&opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(WalkDirRequest {
disk: self.root.to_string_lossy().to_string(),
walk_dir_options,
@@ -275,7 +260,9 @@ impl DiskAPI for RemoteDisk {
) -> Result<RenameDataResp> {
info!("rename_data");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(RenameDataRequest {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -298,7 +285,9 @@ impl DiskAPI for RemoteDisk {
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
info!("make_volumes");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(MakeVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
@@ -315,7 +304,9 @@ impl DiskAPI for RemoteDisk {
async fn make_volume(&self, volume: &str) -> Result<()> {
info!("make_volume");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(MakeVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -332,7 +323,9 @@ impl DiskAPI for RemoteDisk {
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
info!("list_volumes");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ListVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
});
@@ -354,7 +347,9 @@ impl DiskAPI for RemoteDisk {
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
info!("stat_volume");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(StatVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -374,7 +369,9 @@ impl DiskAPI for RemoteDisk {
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
info!("write_metadata");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(WriteMetadataRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -401,7 +398,9 @@ impl DiskAPI for RemoteDisk {
) -> Result<FileInfo> {
info!("read_version");
let opts = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadVersionRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -423,7 +422,9 @@ impl DiskAPI for RemoteDisk {
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
info!("read_xl");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadXlRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -454,7 +455,9 @@ impl DiskAPI for RemoteDisk {
for file_info_versions in versions.iter() {
versions_str.push(serde_json::to_string(file_info_versions)?);
}
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteVersionsRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -487,7 +490,9 @@ impl DiskAPI for RemoteDisk {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
info!("read_multiple");
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadMultipleRequest {
disk: self.root.to_string_lossy().to_string(),
read_multiple_req,
@@ -510,7 +515,9 @@ impl DiskAPI for RemoteDisk {
async fn delete_volume(&self, volume: &str) -> Result<()> {
info!("delete_volume");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),

View File

@@ -1,16 +1,12 @@
use async_trait::async_trait;
use common::error::{Error, Result};
use futures::future::join_all;
use protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest};
use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tonic::Request;
use tower::timeout::Timeout;
use tracing::{info, warn};
use tracing::warn;
use crate::store::all_local_disk;
use crate::{
@@ -391,55 +387,13 @@ impl PeerS3Client for LocalPeerS3Client {
pub struct RemotePeerS3Client {
pub node: Option<Node>,
pub pools: Option<Vec<usize>>,
connector: Endpoint,
channel: Arc<RwLock<Option<Channel>>>,
addr: String,
}
impl RemotePeerS3Client {
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
let connector =
Endpoint::from_shared(format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default())).unwrap();
Self {
node,
pools,
connector,
channel: Arc::new(RwLock::new(None)),
}
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let new_channel = self.connector.connect().await?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
// let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
let addr = format!("{}", self.node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
Ok(NodeServiceClient::connect(addr).await?)
let addr = format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
Self { node, pools, addr }
}
}
@@ -450,7 +404,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
let bucket_infos = response
@@ -463,7 +419,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
@@ -479,7 +437,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
@@ -491,7 +451,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});

View File

@@ -17,16 +17,15 @@ use tracing::{debug, error, info};
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVersionsRequest, DeleteVersionsResponse,
DeleteVolumeRequest, DeleteVolumeResponse, GenerallyLockRequest, GenerallyLockResponse, GetBucketInfoRequest,
GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest,
ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest,
MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse,
ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse,
RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, StatVolumeRequest, StatVolumeResponse,
WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse,
WriteRequest, WriteResponse,
node_service_server::NodeService as Node, DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse,
DeleteVersionsRequest, DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse, GenerallyLockRequest,
GenerallyLockResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse,
ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse,
MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse,
ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse,
ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse,
RenameFileRequst, RenameFileResponse, StatVolumeRequest, StatVolumeResponse, WalkDirRequest, WalkDirResponse,
WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse,
},
};
@@ -56,13 +55,13 @@ fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
}
#[derive(Debug)]
struct NodeService {
pub struct NodeService {
local_peer: LocalPeerS3Client,
}
pub fn make_server() -> NodeServer<impl Node> {
pub fn make_server() -> NodeService {
let local_peer = LocalPeerS3Client::new(None, None);
NodeServer::new(NodeService { local_peer })
NodeService { local_peer }
}
impl NodeService {

View File

@@ -15,10 +15,12 @@ use hyper_util::{
server::conn::auto::Builder as ConnBuilder,
service::TowerToHyperService,
};
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use s3s::{auth::SimpleAuth, service::S3ServiceBuilder};
use service::hybrid;
use std::{io::IsTerminal, net::SocketAddr, str::FromStr};
use tokio::net::TcpListener;
use tonic::{metadata::MetadataValue, Request, Status};
use tracing::{debug, info, warn};
use tracing_error::ErrorLayer;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
@@ -39,6 +41,15 @@ fn setup_tracing() {
subscriber.try_init().expect("failed to set global default subscriber");
}
fn check_auth(req: Request<()>) -> Result<Request<()>, Status> {
let token: MetadataValue<_> = "rustfs rpc".parse().unwrap();
match req.metadata().get("authorization") {
Some(t) if token == t => Ok(req),
_ => Err(Status::unauthenticated("No valid auth token")),
}
}
fn main() -> Result<()> {
//解析获得到的参数
let opt = config::Opt::parse();
@@ -119,7 +130,7 @@ async fn run(opt: config::Opt) -> Result<()> {
b.build()
};
let rpc_service = make_server();
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
tokio::spawn(async move {
let hyper_service = service.into_shared();