Merge pull request #66 from rustfs/wip-dandan

new future support
This commit is contained in:
loverustfs
2024-09-29 15:33:17 +08:00
committed by GitHub
33 changed files with 3149 additions and 294 deletions

61
Cargo.lock generated
View File

@@ -362,6 +362,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
[[package]]
name = "common"
version = "0.0.1"
dependencies = [
"lazy_static",
"tokio",
"tonic",
"tracing-error",
]
[[package]]
name = "cpufeatures"
version = "0.2.12"
@@ -426,10 +436,14 @@ version = "0.0.1"
dependencies = [
"ecstore",
"flatbuffers",
"lazy_static",
"lock",
"protos",
"serde_json",
"tokio",
"tonic",
"tower",
"url",
]
[[package]]
@@ -441,11 +455,13 @@ dependencies = [
"base64-simd",
"byteorder",
"bytes",
"common",
"crc32fast",
"futures",
"hex-simd",
"http",
"lazy_static",
"lock",
"netif",
"num_cpus",
"openssl",
@@ -690,9 +706,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
dependencies = [
"atomic-waker",
"bytes",
@@ -946,6 +962,26 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock"
version = "0.0.1"
dependencies = [
"async-trait",
"backon",
"common",
"lazy_static",
"protos",
"rand",
"serde",
"serde_json",
"tokio",
"tonic",
"tracing",
"tracing-error",
"url",
"uuid",
]
[[package]]
name = "lock_api"
version = "0.4.12"
@@ -1391,6 +1427,7 @@ dependencies = [
name = "protos"
version = "0.0.1"
dependencies = [
"common",
"flatbuffers",
"prost",
"prost-build",
@@ -1575,14 +1612,17 @@ dependencies = [
"async-trait",
"bytes",
"clap",
"common",
"ecstore",
"flatbuffers",
"futures",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-util",
"lock",
"log",
"mime",
"netif",
@@ -1596,6 +1636,7 @@ dependencies = [
"serde_json",
"time",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tonic-reflection",
@@ -2025,9 +2066,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.15"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1"
dependencies = [
"futures-core",
"pin-project-lite",
@@ -2291,6 +2332,18 @@ dependencies = [
"getrandom",
"rand",
"serde",
"uuid-macro-internal",
]
[[package]]
name = "uuid-macro-internal"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee1cd046f83ea2c4e920d6ee9f7c3537ef928d75dce5d84a87c2c5d6b3999a3a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]

View File

@@ -1,6 +1,13 @@
[workspace]
resolver = "2"
members = ["rustfs", "ecstore", "e2e_test", "common/protos"]
members = [
"rustfs",
"ecstore",
"e2e_test",
"common/common",
"common/lock",
"common/protos",
]
[workspace.package]
edition = "2021"
@@ -18,6 +25,7 @@ ecstore = { path = "./ecstore" }
flatbuffers = "24.3.25"
futures = "0.3.30"
futures-util = "0.3.30"
common = { path = "./common/common" }
hyper = "1.3.1"
hyper-util = { version = "0.1.5", features = [
"tokio",
@@ -26,6 +34,8 @@ hyper-util = { version = "0.1.5", features = [
] }
http = "1.1.0"
http-body = "1.0.0"
lock = { path = "./common/lock" }
lazy_static = "1.5.0"
mime = "0.3.17"
netif = "0.1.6"
pin-project-lite = "0.2"
@@ -35,6 +45,7 @@ prost-build = "0.13.1"
prost-types = "0.13.1"
protobuf = "3.2"
protos = { path = "./common/protos" }
rand = "0.8.5"
s3s = { version = "0.10.1", default-features = true, features = ["tower"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
@@ -50,9 +61,16 @@ tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-build = "0.12.1"
tonic-reflection = "0.12"
tokio-stream = "0.1.16"
tower = { version = "0.4.13", features = ["timeout"] }
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
transform-stream = "0.3.0"
url = "2.5.2"
uuid = { version = "1.10.0", features = [
"v4",
"fast-rng",
"macro-diagnostics",
] }
log = "0.4.22"

10
common/common/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "common"
version.workspace = true
edition.workspace = true
[dependencies]
lazy_static.workspace = true
tokio.workspace = true
tonic.workspace = true
tracing-error.workspace = true

View File

@@ -0,0 +1,82 @@
use tracing_error::{SpanTrace, SpanTraceStatus};
pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type Result<T = (), E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Error {
inner: Box<dyn std::error::Error + Send + Sync + 'static>,
span_trace: SpanTrace,
}
impl Error {
/// Create a new error from a `std::error::Error`.
#[must_use]
#[track_caller]
pub fn new<T: std::error::Error + Send + Sync + 'static>(source: T) -> Self {
Self::from_std_error(source.into())
}
/// Create a new error from a `std::error::Error`.
#[must_use]
#[track_caller]
pub fn from_std_error(inner: StdError) -> Self {
Self {
inner,
span_trace: SpanTrace::capture(),
}
}
/// Create a new error from a string.
#[must_use]
#[track_caller]
pub fn from_string(s: impl Into<String>) -> Self {
Self::msg(s)
}
/// Create a new error from a string.
#[must_use]
#[track_caller]
pub fn msg(s: impl Into<String>) -> Self {
Self::from_std_error(s.into().into())
}
/// Returns `true` if the inner type is the same as `T`.
#[inline]
pub fn is<T: std::error::Error + 'static>(&self) -> bool {
self.inner.is::<T>()
}
/// Returns some reference to the inner value if it is of type `T`, or
/// `None` if it isn't.
#[inline]
pub fn downcast_ref<T: std::error::Error + 'static>(&self) -> Option<&T> {
self.inner.downcast_ref()
}
/// Returns some mutable reference to the inner value if it is of type `T`, or
/// `None` if it isn't.
#[inline]
pub fn downcast_mut<T: std::error::Error + 'static>(&mut self) -> Option<&mut T> {
self.inner.downcast_mut()
}
}
impl<T: std::error::Error + Send + Sync + 'static> From<T> for Error {
fn from(e: T) -> Self {
Self::new(e)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)?;
if self.span_trace.status() != SpanTraceStatus::EMPTY {
write!(f, "\nspan_trace:\n{}", self.span_trace)?;
}
Ok(())
}
}

View File

@@ -0,0 +1,12 @@
use std::collections::HashMap;
use lazy_static::lazy_static;
use tokio::sync::RwLock;
use tonic::transport::Channel;
lazy_static! {
pub static ref GLOBAL_Local_Node_Name: RwLock<String> = RwLock::new("".to_string());
pub static ref GLOBAL_Rustfs_Host: RwLock<String> = RwLock::new("".to_string());
pub static ref GLOBAL_Rustfs_Port: RwLock<String> = RwLock::new("9000".to_string());
pub static ref GLOBAL_Conn_Map: RwLock<HashMap<String, Channel>> = RwLock::new(HashMap::new());
}

2
common/common/src/lib.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod error;
pub mod globals;

20
common/lock/Cargo.toml Normal file
View File

@@ -0,0 +1,20 @@
[package]
name = "lock"
version.workspace = true
edition.workspace = true
[dependencies]
async-trait.workspace = true
backon.workspace = true
common.workspace = true
lazy_static.workspace = true
protos.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tonic.workspace = true
tracing.workspace = true
tracing-error.workspace = true
url.workspace = true
uuid.workspace = true

347
common/lock/src/drwmutex.rs Normal file
View File

@@ -0,0 +1,347 @@
use std::time::{Duration, Instant};
use tokio::{sync::mpsc::Sender, time::sleep};
use tracing::{info, warn};
use crate::{lock_args::LockArgs, LockApi, Locker};
const DRW_MUTEX_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
const LOCK_RETRY_MIN_INTERVAL: Duration = Duration::from_millis(250);
#[derive(Debug)]
pub struct DRWMutex {
owner: String,
names: Vec<String>,
write_locks: Vec<String>,
read_locks: Vec<String>,
cancel_refresh_sender: Option<Sender<bool>>,
// rng: ThreadRng,
lockers: Vec<LockApi>,
refresh_interval: Duration,
lock_retry_min_interval: Duration,
}
#[derive(Debug, Default, Clone)]
pub struct Granted {
index: usize,
lock_uid: String,
}
impl Granted {
fn is_locked(&self) -> bool {
is_locked(&self.lock_uid)
}
}
fn is_locked(uid: &String) -> bool {
uid.len() > 0
}
#[derive(Debug, Clone)]
pub struct Options {
pub timeout: Duration,
pub retry_interval: Duration,
}
impl DRWMutex {
pub fn new(owner: String, names: Vec<String>, lockers: Vec<LockApi>) -> Self {
let mut names = names;
names.sort();
Self {
owner,
names,
write_locks: Vec::with_capacity(lockers.len()),
read_locks: Vec::with_capacity(lockers.len()),
cancel_refresh_sender: None,
// rng: rand::thread_rng(),
lockers,
refresh_interval: DRW_MUTEX_REFRESH_INTERVAL,
lock_retry_min_interval: LOCK_RETRY_MIN_INTERVAL,
}
}
}
impl DRWMutex {
pub async fn lock(&mut self, id: &String, source: &String) {
let is_read_lock = false;
let opts = Options {
timeout: Duration::from_secs(10),
retry_interval: Duration::from_millis(50),
};
self.lock_blocking(id, source, is_read_lock, &opts).await;
}
pub async fn get_lock(&mut self, id: &String, source: &String, opts: &Options) -> bool {
let is_read_lock = false;
self.lock_blocking(id, source, is_read_lock, opts).await
}
pub async fn r_lock(&mut self, id: &String, source: &String) {
let is_read_lock = true;
let opts = Options {
timeout: Duration::from_secs(10),
retry_interval: Duration::from_millis(50),
};
self.lock_blocking(id, source, is_read_lock, &opts).await;
}
pub async fn get_r_lock(&mut self, id: &String, source: &String, opts: &Options) -> bool {
let is_read_lock = true;
self.lock_blocking(id, source, is_read_lock, opts).await
}
pub async fn lock_blocking(&mut self, id: &String, source: &String, is_read_lock: bool, opts: &Options) -> bool {
let locker_len = self.lockers.len();
let mut tolerance = locker_len / 2;
let mut quorum = locker_len - tolerance;
if !is_read_lock {
// In situations for write locks, as a special case
// to avoid split brains we make sure to acquire
// quorum + 1 when tolerance is exactly half of the
// total locker clients.
if quorum == tolerance {
quorum += 1;
}
}
info!("lockBlocking {}/{} for {:?}: lockType readLock({}), additional opts: {:?}, quorum: {}, tolerance: {}, lockClients: {}\n", id, source, self.names, is_read_lock, opts, quorum, tolerance, locker_len);
tolerance = locker_len - quorum;
let mut attempt = 0;
let mut locks = vec!["".to_string(); self.lockers.len()];
loop {
if self.inner_lock(&mut locks, id, source, is_read_lock, tolerance, quorum).await {
if is_read_lock {
self.read_locks = locks;
} else {
self.write_locks = locks;
}
info!("lock_blocking {}/{} for {:?}: granted", id, source, self.names);
return true;
}
attempt += 1;
if attempt >= 10 {
break;
}
sleep(opts.retry_interval).await;
}
false
}
async fn inner_lock(
&mut self,
locks: &mut Vec<String>,
id: &String,
source: &String,
is_read_lock: bool,
tolerance: usize,
quorum: usize,
) -> bool {
locks.iter_mut().for_each(|lock| *lock = "".to_string());
let mut granteds = Vec::with_capacity(self.lockers.len());
let args = LockArgs {
uid: id.to_string(),
resources: self.names.clone(),
owner: self.owner.clone(),
source: source.to_string(),
quorum,
};
for (index, locker) in self.lockers.iter_mut().enumerate() {
let mut granted = Granted {
index,
..Default::default()
};
if is_read_lock {
match locker.rlock(&args).await {
Ok(locked) => {
if locked {
granted.lock_uid = id.to_string();
}
}
Err(err) => {
warn!("Unable to call RLock failed with {} for {} at {:?}", err, args, locker);
}
}
} else {
match locker.lock(&args).await {
Ok(locked) => {
if locked {
granted.lock_uid = id.to_string();
}
}
Err(err) => {
warn!("Unable to call Lock failed with {} for {} at {:?}", err, args, locker);
}
}
}
granteds.push(granted);
}
granteds.iter().for_each(|granted| {
locks[granted.index] = granted.lock_uid.clone();
});
let quorum_locked = check_quorum_locked(locks, quorum);
if !quorum_locked {
info!("Unable to acquire lock in quorum, {}", args);
if !self.release_all(tolerance, locks, is_read_lock).await {
info!("Unable to release acquired locks, these locks will expire automatically {}", args);
}
}
quorum_locked
}
pub async fn un_lock(&mut self) {
if self.write_locks.is_empty() || !self.write_locks.iter().any(|w_lock| is_locked(w_lock)) {
panic!("Trying to un_lock() while no lock() is active, write_locks: {:?}", self.write_locks)
}
let tolerance = self.lockers.len() / 2;
let is_read_lock = false;
let mut locks = std::mem::take(&mut self.write_locks);
let start = Instant::now();
loop {
if self.release_all(tolerance, &mut locks, is_read_lock).await {
return;
}
sleep(self.lock_retry_min_interval).await;
if Instant::now().duration_since(start) > Duration::from_secs(30) {
return;
}
}
}
pub async fn un_r_lock(&mut self) {
if self.read_locks.is_empty() || !self.read_locks.iter().any(|r_lock| is_locked(r_lock)) {
panic!("Trying to un_r_lock() while no r_lock() is active, read_locks: {:?}", self.read_locks)
}
let tolerance = self.lockers.len() / 2;
let is_read_lock = true;
let mut locks = std::mem::take(&mut self.read_locks);
let start = Instant::now();
loop {
if self.release_all(tolerance, &mut locks, is_read_lock).await {
return;
}
sleep(self.lock_retry_min_interval).await;
if Instant::now().duration_since(start) > Duration::from_secs(30) {
return;
}
}
}
async fn release_all(&mut self, tolerance: usize, locks: &mut Vec<String>, is_read_lock: bool) -> bool {
for (index, locker) in self.lockers.iter_mut().enumerate() {
if send_release(locker, &locks[index], &self.owner, &self.names, is_read_lock).await {
locks[index] = "".to_string();
}
}
!check_failed_unlocks(&locks, tolerance)
}
}
// async fn start_continuous_lock_refresh(lockers: &Vec<&mut LockApi>, id: &String, source: &String, quorum: usize, refresh_interval: Duration, mut cancel_refresh_receiver: Receiver<bool>) {
// let uid = id.to_string();
// tokio::spawn(async move {
// let mut ticker = interval(refresh_interval);
// let args = LockArgs {
// uid,
// ..Default::default()
// };
// loop {
// select! {
// _ = ticker.tick() => {
// for (index, locker) in lockers.iter().enumerate() {
// }
// },
// _ = cancel_refresh_receiver.recv() => {
// return;
// }
// }
// }
// });
// }
fn check_failed_unlocks(locks: &Vec<String>, tolerance: usize) -> bool {
let mut un_locks_failed = 0;
locks.iter().for_each(|lock| {
if is_locked(lock) {
un_locks_failed += 1;
}
});
if locks.len() - tolerance == tolerance {
return un_locks_failed >= tolerance;
}
un_locks_failed > tolerance
}
async fn send_release(locker: &mut LockApi, uid: &String, owner: &String, names: &Vec<String>, is_read_lock: bool) -> bool {
if uid.is_empty() {
return false;
}
let args = LockArgs {
uid: uid.to_string(),
owner: owner.clone(),
resources: names.clone(),
..Default::default()
};
if is_read_lock {
match locker.runlock(&args).await {
Ok(locked) => {
if !locked {
warn!("Unable to release runlock, args: {}", args);
return false;
}
}
Err(err) => {
warn!("Unable to call RLock failed with {} for {} at {:?}", err, args, locker);
return false;
}
}
} else {
match locker.unlock(&args).await {
Ok(locked) => {
if !locked {
warn!("Unable to release unlock, args: {}", args);
return false;
}
}
Err(err) => {
warn!("Unable to call Lock failed with {} for {} at {:?}", err, args, locker);
return false;
}
}
}
true
}
fn check_quorum_locked(locks: &Vec<String>, quorum: usize) -> bool {
let mut count = 0;
locks.iter().for_each(|lock| {
if is_locked(lock) {
count += 1;
}
});
count >= quorum
}

117
common/lock/src/lib.rs Normal file
View File

@@ -0,0 +1,117 @@
#![allow(dead_code)]
use std::sync::Arc;
use async_trait::async_trait;
use common::error::Result;
use lazy_static::lazy_static;
use local_locker::LocalLocker;
use lock_args::LockArgs;
use remote_client::RemoteClient;
use tokio::sync::RwLock;
pub mod drwmutex;
pub mod local_locker;
pub mod lock_args;
pub mod lrwmutex;
pub mod namespace_lock;
pub mod remote_client;
lazy_static! {
pub static ref GLOBAL_LOCAL_SERVER: Arc<Box<RwLock<LocalLocker>>> = Arc::new(Box::new(RwLock::new(LocalLocker::new())));
}
type LockClient = dyn Locker;
#[async_trait]
pub trait Locker {
async fn lock(&mut self, args: &LockArgs) -> Result<bool>;
async fn unlock(&mut self, args: &LockArgs) -> Result<bool>;
async fn rlock(&mut self, args: &LockArgs) -> Result<bool>;
async fn runlock(&mut self, args: &LockArgs) -> Result<bool>;
async fn refresh(&mut self, args: &LockArgs) -> Result<bool>;
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool>;
async fn close(&self);
async fn is_online(&self) -> bool;
async fn is_local(&self) -> bool;
}
#[derive(Debug, Clone)]
pub enum LockApi {
Local,
Remote(RemoteClient),
}
#[async_trait]
impl Locker for LockApi {
async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.lock(args).await,
LockApi::Remote(r) => r.lock(args).await,
}
}
async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.unlock(args).await,
LockApi::Remote(r) => r.unlock(args).await,
}
}
async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.rlock(args).await,
LockApi::Remote(r) => r.rlock(args).await,
}
}
async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.runlock(args).await,
LockApi::Remote(r) => r.runlock(args).await,
}
}
async fn refresh(&mut self, args: &LockArgs) -> Result<bool> {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.refresh(args).await,
LockApi::Remote(r) => r.refresh(args).await,
}
}
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.force_unlock(args).await,
LockApi::Remote(r) => r.force_unlock(args).await,
}
}
async fn close(&self) {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.read().await.close().await,
LockApi::Remote(r) => r.close().await,
}
}
async fn is_online(&self) -> bool {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.read().await.is_online().await,
LockApi::Remote(r) => r.is_online().await,
}
}
async fn is_local(&self) -> bool {
match self {
LockApi::Local => GLOBAL_LOCAL_SERVER.write().await.is_local().await,
LockApi::Remote(r) => r.is_local().await,
}
}
}
pub fn new_lock_api(is_local: bool, url: Option<url::Url>) -> LockApi {
if is_local {
return LockApi::Local;
}
LockApi::Remote(RemoteClient::new(url.unwrap()))
}

View File

@@ -0,0 +1,423 @@
use async_trait::async_trait;
use common::error::{Error, Result};
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use crate::{lock_args::LockArgs, Locker};
const MAX_DELETE_LIST: usize = 1000;
#[derive(Clone, Debug)]
struct LockRequesterInfo {
name: String,
writer: bool,
uid: String,
time_stamp: Instant,
time_last_refresh: Instant,
source: String,
group: bool,
owner: String,
quorum: usize,
idx: usize,
}
impl Default for LockRequesterInfo {
fn default() -> Self {
Self {
name: Default::default(),
writer: Default::default(),
uid: Default::default(),
time_stamp: Instant::now(),
time_last_refresh: Instant::now(),
source: Default::default(),
group: Default::default(),
owner: Default::default(),
quorum: Default::default(),
idx: Default::default(),
}
}
}
fn is_write_lock(lri: &[LockRequesterInfo]) -> bool {
lri.len() == 1 && lri[0].writer
}
#[derive(Debug, Default)]
pub struct LockStats {
total: usize,
writes: usize,
reads: usize,
}
#[derive(Debug, Default)]
pub struct LocalLocker {
lock_map: HashMap<String, Vec<LockRequesterInfo>>,
lock_uid: HashMap<String, String>,
}
impl LocalLocker {
pub fn new() -> Self {
LocalLocker::default()
}
}
impl LocalLocker {
fn can_take_lock(&self, resource: &[String]) -> bool {
resource.iter().fold(true, |acc, x| !self.lock_map.contains_key(x) && acc)
}
pub fn stats(&self) -> LockStats {
let mut st = LockStats {
total: self.lock_map.len(),
..Default::default()
};
self.lock_map.iter().for_each(|(_, value)| {
if value.len() > 0 {
if value[0].writer {
st.writes += 1;
} else {
st.reads += 1;
}
}
});
return st;
}
fn dump_lock_map(&mut self) -> HashMap<String, Vec<LockRequesterInfo>> {
let mut lock_copy = HashMap::new();
self.lock_map.iter().for_each(|(key, value)| {
lock_copy.insert(key.to_string(), value.to_vec());
});
return lock_copy;
}
fn expire_old_locks(&mut self, interval: Duration) {
self.lock_map.iter_mut().for_each(|(_, lris)| {
lris.retain(|lri| {
if Instant::now().duration_since(lri.time_last_refresh) > interval {
let mut key = lri.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key);
return false;
}
true
});
});
return;
}
}
#[async_trait]
impl Locker for LocalLocker {
async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
if args.resources.len() > MAX_DELETE_LIST {
return Err(Error::from_string(format!(
"internal error: LocalLocker.lock called with more than {} resources",
MAX_DELETE_LIST
)));
}
if !self.can_take_lock(&args.resources) {
return Ok(false);
}
args.resources.iter().enumerate().for_each(|(idx, resource)| {
self.lock_map.insert(
resource.to_string(),
vec![LockRequesterInfo {
name: resource.to_string(),
writer: true,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
group: args.resources.len() > 1,
quorum: args.quorum,
idx,
..Default::default()
}],
);
let mut uuid = args.uid.to_string();
format_uuid(&mut uuid, &idx);
self.lock_uid.insert(uuid, resource.to_string());
});
Ok(true)
}
async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
if args.resources.len() > MAX_DELETE_LIST {
return Err(Error::from_string(format!(
"internal error: LocalLocker.unlock called with more than {} resources",
MAX_DELETE_LIST
)));
}
let mut reply = false;
let mut err_info = String::new();
for resource in args.resources.iter() {
match self.lock_map.get_mut(resource) {
Some(lris) => {
if !is_write_lock(&lris) {
if err_info.is_empty() {
err_info = String::from(format!("unlock attempted on a read locked entity: {}", resource));
} else {
err_info.push_str(&format!(", {}", resource));
}
} else {
lris.retain(|lri| {
if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
let mut key = args.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key).unwrap();
reply |= true;
return false;
}
true
});
}
if lris.len() == 0 {
self.lock_map.remove(resource);
}
}
None => {
continue;
}
};
}
Ok(reply)
}
async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
if args.resources.len() != 1 {
return Err(Error::from_string("internal error: localLocker.RLock called with more than one resource"));
}
let resource = &args.resources[0];
match self.lock_map.get_mut(resource) {
Some(lri) => {
if !is_write_lock(lri) {
lri.push(LockRequesterInfo {
name: resource.to_string(),
writer: false,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
quorum: args.quorum,
..Default::default()
});
} else {
return Ok(false);
}
}
None => {
self.lock_map.insert(
resource.to_string(),
vec![LockRequesterInfo {
name: resource.to_string(),
writer: false,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
quorum: args.quorum,
..Default::default()
}],
);
}
}
let mut uuid = args.uid.to_string();
format_uuid(&mut uuid, &0);
self.lock_uid.insert(uuid, resource.to_string());
Ok(true)
}
async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
if args.resources.len() != 1 {
return Err(Error::from_string("internal error: localLocker.RLock called with more than one resource"));
}
let mut reply = false;
let resource = &args.resources[0];
match self.lock_map.get_mut(resource) {
Some(lris) => {
if is_write_lock(&lris) {
return Err(Error::from_string(format!("runlock attempted on a write locked entity: {}", resource)));
} else {
lris.retain(|lri| {
if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
let mut key = args.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key).unwrap();
reply |= true;
return false;
}
true
});
}
if lris.len() == 0 {
self.lock_map.remove(resource);
}
}
None => {
return Ok(reply || true);
}
};
Ok(reply)
}
async fn close(&self) {}
async fn is_online(&self) -> bool {
true
}
async fn is_local(&self) -> bool {
true
}
// TODO: need add timeout mechanism
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
let mut reply: bool;
if args.uid.is_empty() {
args.resources.iter().for_each(|resource| match self.lock_map.get(resource) {
Some(lris) => {
lris.iter().for_each(|lri| {
let mut key = lri.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key);
});
if lris.len() == 0 {
self.lock_map.remove(resource);
}
}
None => (),
});
return Ok(true);
}
let mut idx = 0;
let mut need_remove_resource = Vec::new();
let mut need_remove_map_id = Vec::new();
loop {
let mut map_id = args.uid.to_string();
format_uuid(&mut map_id, &idx);
match self.lock_uid.get(&map_id) {
Some(resource) => match self.lock_map.get_mut(resource) {
Some(lris) => {
reply = true;
{
lris.retain(|lri| {
if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
let mut key = args.uid.to_string();
format_uuid(&mut key, &lri.idx);
need_remove_map_id.push(key);
return false;
}
true
});
}
idx += 1;
if lris.len() == 0 {
need_remove_resource.push(resource.to_string());
}
}
None => {
need_remove_map_id.push(map_id);
idx += 1;
continue;
}
},
None => {
reply = idx > 0;
break;
}
}
}
need_remove_resource.into_iter().for_each(|resource| {
self.lock_map.remove(&resource);
});
need_remove_map_id.into_iter().for_each(|map_id| {
self.lock_uid.remove(&map_id);
});
Ok(reply)
}
async fn refresh(&mut self, args: &LockArgs) -> Result<bool> {
let mut idx = 0;
let mut key = args.uid.to_string();
format_uuid(&mut key, &idx);
match self.lock_uid.get(&key) {
Some(resource) => {
let mut resource = resource;
loop {
match self.lock_map.get_mut(resource) {
Some(_lris) => {}
None => {
let mut key = args.uid.to_string();
format_uuid(&mut key, &0);
self.lock_uid.remove(&key);
return Ok(idx > 0);
}
}
idx += 1;
let mut key = args.uid.to_string();
format_uuid(&mut key, &idx);
resource = match self.lock_uid.get(&key) {
Some(resource) => resource,
None => return Ok(true),
};
}
}
None => {
return Ok(false);
}
}
}
}
fn format_uuid(s: &mut String, idx: &usize) {
s.push_str(&idx.to_string());
}
#[cfg(test)]
mod test {
use super::LocalLocker;
use crate::{lock_args::LockArgs, Locker};
use common::error::Result;
use tokio;
#[tokio::test]
async fn test_lock_unlock() -> Result<()> {
let mut local_locker = LocalLocker::new();
let args = LockArgs {
uid: "1111".to_string(),
resources: vec!["dandan".to_string()],
owner: "dd".to_string(),
source: "".to_string(),
quorum: 3,
};
local_locker.lock(&args).await?;
println!("lock local_locker: {:?} \n", local_locker);
local_locker.unlock(&args).await?;
println!("unlock local_locker: {:?}", local_locker);
Ok(())
}
}

View File

@@ -0,0 +1,22 @@
use std::fmt::Display;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct LockArgs {
pub uid: String,
pub resources: Vec<String>,
pub owner: String,
pub source: String,
pub quorum: usize,
}
impl Display for LockArgs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"LockArgs[ uid: {}, resources: {:?}, owner: {}, source:{}, quorum: {} ]",
self.uid, self.resources, self.owner, self.source, self.quorum
)
}
}

187
common/lock/src/lrwmutex.rs Normal file
View File

@@ -0,0 +1,187 @@
use std::time::{Duration, Instant};
use rand::Rng;
use tokio::{sync::RwLock, time::sleep};
#[derive(Debug, Default)]
pub struct LRWMutex {
id: RwLock<String>,
source: RwLock<String>,
is_write: RwLock<bool>,
refrence: RwLock<usize>,
}
impl LRWMutex {
pub async fn lock(&self) -> bool {
let is_write = true;
let id = self.id.read().await.clone();
let source = self.source.read().await.clone();
let timeout = Duration::from_secs(10000);
let x = self
.look_loop(
&id, &source, &timeout, // big enough
is_write,
)
.await;
x
}
pub async fn get_lock(&self, id: &str, source: &str, timeout: &Duration) -> bool {
let is_write = true;
self.look_loop(id, source, timeout, is_write).await
}
pub async fn r_lock(&self) -> bool {
let is_write: bool = false;
let id = self.id.read().await.clone();
let source = self.source.read().await.clone();
let timeout = Duration::from_secs(10000);
let x = self
.look_loop(
&id, &source, &timeout, // big enough
is_write,
)
.await;
x
}
pub async fn get_r_lock(&self, id: &str, source: &str, timeout: &Duration) -> bool {
let is_write = false;
self.look_loop(id, source, timeout, is_write).await
}
async fn inner_lock(&self, id: &str, source: &str, is_write: bool) -> bool {
*self.id.write().await = id.to_string();
*self.source.write().await = source.to_string();
let mut locked = false;
if is_write {
if *self.refrence.read().await == 0 && !*self.is_write.read().await {
*self.refrence.write().await = 1;
*self.is_write.write().await = true;
locked = true;
}
} else {
if !*self.is_write.read().await {
*self.refrence.write().await += 1;
locked = true;
}
}
locked
}
async fn look_loop(&self, id: &str, source: &str, timeout: &Duration, is_write: bool) -> bool {
let start = Instant::now();
loop {
if self.inner_lock(id, source, is_write).await {
return true;
} else {
if Instant::now().duration_since(start) > *timeout {
return false;
}
let sleep_time: u64;
{
let mut rng = rand::thread_rng();
sleep_time = rng.gen_range(10..=50);
}
sleep(Duration::from_millis(sleep_time)).await;
}
}
}
pub async fn un_lock(&self) {
let is_write = true;
if !self.unlock(is_write).await {
panic!("Trying to un_lock() while no Lock() is active")
}
}
pub async fn un_r_lock(&self) {
let is_write = false;
if !self.unlock(is_write).await {
panic!("Trying to un_r_lock() while no Lock() is active")
}
}
async fn unlock(&self, is_write: bool) -> bool {
let mut unlocked = false;
if is_write {
if *self.is_write.read().await && *self.refrence.read().await == 1 {
*self.refrence.write().await = 0;
*self.is_write.write().await = false;
unlocked = true;
}
} else {
if !*self.is_write.read().await {
if *self.refrence.read().await > 0 {
*self.refrence.write().await -= 1;
unlocked = true;
}
}
}
unlocked
}
pub async fn force_un_lock(&self) {
*self.refrence.write().await = 0;
*self.is_write.write().await = false;
}
}
#[cfg(test)]
mod test {
use std::{sync::Arc, time::Duration};
use common::error::Result;
use tokio::time::sleep;
use crate::lrwmutex::LRWMutex;
#[tokio::test]
async fn test_lock_unlock() -> Result<()> {
let l_rw_lock = LRWMutex::default();
let id = "foo";
let source = "dandan";
let timeout = Duration::from_secs(5);
assert_eq!(true, l_rw_lock.get_lock(id, source, &timeout).await);
l_rw_lock.un_lock().await;
l_rw_lock.lock().await;
assert_eq!(false, l_rw_lock.get_r_lock(id, source, &timeout).await);
l_rw_lock.un_lock().await;
assert_eq!(true, l_rw_lock.get_r_lock(id, source, &timeout).await);
Ok(())
}
#[tokio::test]
async fn multi_thread_test() -> Result<()> {
let l_rw_lock = Arc::new(LRWMutex::default());
let id = "foo";
let source = "dandan";
let one_fn = async {
let one = Arc::clone(&l_rw_lock);
let timeout = Duration::from_secs(1);
assert_eq!(true, one.get_lock(id, source, &timeout).await);
sleep(Duration::from_secs(5)).await;
l_rw_lock.un_lock().await;
};
let two_fn = async {
let two = Arc::clone(&l_rw_lock);
let timeout = Duration::from_secs(2);
assert_eq!(false, two.get_r_lock(id, source, &timeout).await);
sleep(Duration::from_secs(5)).await;
assert_eq!(true, two.get_r_lock(id, source, &timeout).await);
two.un_r_lock().await;
};
tokio::join!(one_fn, two_fn);
Ok(())
}
}

View File

@@ -0,0 +1,281 @@
use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
use async_trait::async_trait;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
drwmutex::{DRWMutex, Options},
lrwmutex::LRWMutex,
LockApi,
};
use common::error::Result;
pub type RWLockerImpl = Box<dyn RWLocker + Send>;
#[async_trait]
pub trait RWLocker {
async fn get_lock(&mut self, opts: &Options) -> Result<bool>;
async fn un_lock(&mut self) -> Result<()>;
async fn get_u_lock(&mut self, opts: &Options) -> Result<bool>;
async fn un_r_lock(&mut self) -> Result<()>;
}
#[derive(Debug)]
struct NsLock {
reference: usize,
lock: LRWMutex,
}
#[derive(Debug, Default)]
pub struct NsLockMap {
is_dist_erasure: bool,
lock_map: RwLock<HashMap<String, NsLock>>,
}
impl NsLockMap {
pub fn new(is_dist_erasure: bool) -> Self {
Self {
is_dist_erasure,
..Default::default()
}
}
async fn lock(
&mut self,
volume: &String,
path: &String,
lock_source: &String,
ops_id: &String,
read_lock: bool,
timeout: Duration,
) -> bool {
let resource = Path::new(volume).join(path).to_str().unwrap().to_string();
let mut w_lock_map = self.lock_map.write().await;
let nslk = w_lock_map.entry(resource.clone()).or_insert(NsLock {
reference: 0,
lock: LRWMutex::default(),
});
nslk.reference += 1;
let locked: bool;
if read_lock {
locked = nslk.lock.get_r_lock(ops_id, lock_source, &timeout).await;
} else {
locked = nslk.lock.get_lock(ops_id, lock_source, &timeout).await;
}
if !locked {
nslk.reference -= 1;
if nslk.reference == 0 {
w_lock_map.remove(&resource);
}
}
return locked;
}
async fn un_lock(&mut self, volume: &String, path: &String, read_lock: bool) {
let resource = Path::new(volume).join(path).to_str().unwrap().to_string();
let mut w_lock_map = self.lock_map.write().await;
if let Some(nslk) = w_lock_map.get_mut(&resource) {
if read_lock {
nslk.lock.un_r_lock().await;
} else {
nslk.lock.un_lock().await;
}
nslk.reference -= 0;
if nslk.reference == 0 {
w_lock_map.remove(&resource);
}
} else {
return;
}
}
}
pub async fn new_nslock(
ns: Arc<RwLock<NsLockMap>>,
owner: String,
volume: String,
paths: Vec<String>,
lockers: Vec<LockApi>,
) -> RWLockerImpl {
if ns.read().await.is_dist_erasure {
let names = paths
.iter()
.map(|path| Path::new(&volume).join(path).to_str().unwrap().to_string())
.collect();
return Box::new(DistLockInstance::new(owner, names, lockers));
}
Box::new(LocalLockInstance::new(ns, volume, paths))
}
struct DistLockInstance {
lock: Box<DRWMutex>,
ops_id: String,
}
impl DistLockInstance {
fn new(owner: String, names: Vec<String>, lockers: Vec<LockApi>) -> Self {
let ops_id = Uuid::new_v4().to_string();
Self {
lock: Box::new(DRWMutex::new(owner, names, lockers)),
ops_id,
}
}
}
#[async_trait]
impl RWLocker for DistLockInstance {
async fn get_lock(&mut self, opts: &Options) -> Result<bool> {
let source = "".to_string();
Ok(self.lock.get_lock(&self.ops_id, &source, opts).await)
}
async fn un_lock(&mut self) -> Result<()> {
Ok(self.lock.un_lock().await)
}
async fn get_u_lock(&mut self, opts: &Options) -> Result<bool> {
let source = "".to_string();
Ok(self.lock.get_r_lock(&self.ops_id, &source, opts).await)
}
async fn un_r_lock(&mut self) -> Result<()> {
Ok(self.lock.un_r_lock().await)
}
}
struct LocalLockInstance {
ns: Arc<RwLock<NsLockMap>>,
volume: String,
paths: Vec<String>,
ops_id: String,
}
impl LocalLockInstance {
fn new(ns: Arc<RwLock<NsLockMap>>, volume: String, paths: Vec<String>) -> Self {
let ops_id = Uuid::new_v4().to_string();
Self {
ns,
volume,
paths,
ops_id,
}
}
}
#[async_trait]
impl RWLocker for LocalLockInstance {
async fn get_lock(&mut self, opts: &Options) -> Result<bool> {
let source = "".to_string();
let read_lock = false;
let mut success = vec![false; self.paths.len()];
for (idx, path) in self.paths.iter().enumerate() {
if !self
.ns
.write()
.await
.lock(&self.volume, path, &source, &self.ops_id, read_lock, opts.timeout)
.await
{
for (i, x) in success.iter().enumerate() {
if *x {
self.ns.write().await.un_lock(&self.volume, &self.paths[i], read_lock).await;
}
}
return Ok(false);
}
success[idx] = true;
}
Ok(true)
}
async fn un_lock(&mut self) -> Result<()> {
let read_lock = false;
for path in self.paths.iter() {
self.ns.write().await.un_lock(&self.volume, path, read_lock).await;
}
Ok(())
}
async fn get_u_lock(&mut self, opts: &Options) -> Result<bool> {
let source = "".to_string();
let read_lock = true;
let mut success = Vec::with_capacity(self.paths.len());
for (idx, path) in self.paths.iter().enumerate() {
if !self
.ns
.write()
.await
.lock(&self.volume, path, &source, &self.ops_id, read_lock, opts.timeout)
.await
{
for (i, x) in success.iter().enumerate() {
if *x {
self.ns.write().await.un_lock(&self.volume, &self.paths[i], read_lock).await;
}
}
return Ok(false);
}
success[idx] = true;
}
Ok(true)
}
async fn un_r_lock(&mut self) -> Result<()> {
let read_lock = true;
for path in self.paths.iter() {
self.ns.write().await.un_lock(&self.volume, path, read_lock).await;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::{sync::Arc, time::Duration};
use common::error::Result;
use tokio::sync::RwLock;
use crate::{
drwmutex::Options,
namespace_lock::{new_nslock, NsLockMap},
};
#[tokio::test]
async fn test_local_instance() -> Result<()> {
let ns_lock_map = Arc::new(RwLock::new(NsLockMap::default()));
let mut ns = new_nslock(
Arc::clone(&ns_lock_map),
"local".to_string(),
"test".to_string(),
vec!["foo".to_string()],
Vec::new(),
)
.await;
let result = ns
.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await?;
assert_eq!(result, true);
Ok(())
}
}

View File

@@ -0,0 +1,134 @@
use async_trait::async_trait;
use common::error::{Error, Result};
use protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
use tonic::Request;
use tracing::info;
use crate::{lock_args::LockArgs, Locker};
#[derive(Debug, Clone)]
pub struct RemoteClient {
addr: String,
}
impl RemoteClient {
pub fn new(url: url::Url) -> Self {
let addr = format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap());
Self { addr }
}
}
#[async_trait]
impl Locker for RemoteClient {
async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote lock");
let args = serde_json::to_string(args)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
return Err(Error::from_string(error_info));
}
Ok(response.success)
}
async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote unlock");
let args = serde_json::to_string(args)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.un_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
return Err(Error::from_string(error_info));
}
Ok(response.success)
}
async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote rlock");
let args = serde_json::to_string(args)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.r_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
return Err(Error::from_string(error_info));
}
Ok(response.success)
}
async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote runlock");
let args = serde_json::to_string(args)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.r_un_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
return Err(Error::from_string(error_info));
}
Ok(response.success)
}
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote force_unlock");
let args = serde_json::to_string(args)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.force_un_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
return Err(Error::from_string(error_info));
}
Ok(response.success)
}
async fn refresh(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote refresh");
let args = serde_json::to_string(args)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.refresh(request).await?.into_inner();
if let Some(error_info) = response.error_info {
return Err(Error::from_string(error_info));
}
Ok(response.success)
}
async fn is_local(&self) -> bool {
false
}
async fn close(&self) {}
async fn is_online(&self) -> bool {
true
}
}

View File

@@ -5,6 +5,7 @@ edition.workspace = true
[dependencies]
#async-backtrace = { workspace = true, optional = true }
common.workspace = true
flatbuffers = { workspace = true }
prost = { workspace = true }
protobuf = { workspace = true }

View File

@@ -563,6 +563,21 @@ pub struct DeleteVolumeResponse {
#[prost(string, optional, tag = "2")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
/// lock api have same argument type
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GenerallyLockRequest {
#[prost(string, tag = "1")]
pub args: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GenerallyLockResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(string, optional, tag = "2")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
/// Generated client implementations.
pub mod node_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
@@ -915,11 +930,39 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "Write"));
self.inner.unary(req, path, codec).await
}
pub async fn write_stream(
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::WriteRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::WriteResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/WriteStream",
);
let mut req = request.into_streaming_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "WriteStream"));
self.inner.streaming(req, path, codec).await
}
/// rpc Append(AppendRequest) returns (AppendResponse) {};
pub async fn read_at(
&mut self,
request: impl tonic::IntoRequest<super::ReadAtRequest>,
) -> std::result::Result<tonic::Response<super::ReadAtResponse>, tonic::Status> {
request: impl tonic::IntoStreamingRequest<Message = super::ReadAtRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::ReadAtResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
@@ -933,10 +976,10 @@ pub mod node_service_client {
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/ReadAt",
);
let mut req = request.into_request();
let mut req = request.into_streaming_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "ReadAt"));
self.inner.unary(req, path, codec).await
self.inner.streaming(req, path, codec).await
}
pub async fn list_dir(
&mut self,
@@ -1335,6 +1378,156 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "DeleteVolume"));
self.inner.unary(req, path, codec).await
}
pub async fn lock(
&mut self,
request: impl tonic::IntoRequest<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/Lock",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "Lock"));
self.inner.unary(req, path, codec).await
}
pub async fn un_lock(
&mut self,
request: impl tonic::IntoRequest<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/UnLock",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "UnLock"));
self.inner.unary(req, path, codec).await
}
pub async fn r_lock(
&mut self,
request: impl tonic::IntoRequest<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/RLock",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "RLock"));
self.inner.unary(req, path, codec).await
}
pub async fn r_un_lock(
&mut self,
request: impl tonic::IntoRequest<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/RUnLock",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "RUnLock"));
self.inner.unary(req, path, codec).await
}
pub async fn force_un_lock(
&mut self,
request: impl tonic::IntoRequest<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/ForceUnLock",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "ForceUnLock"));
self.inner.unary(req, path, codec).await
}
pub async fn refresh(
&mut self,
request: impl tonic::IntoRequest<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/Refresh",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "Refresh"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
@@ -1410,11 +1603,30 @@ pub mod node_service_server {
&self,
request: tonic::Request<super::WriteRequest>,
) -> std::result::Result<tonic::Response<super::WriteResponse>, tonic::Status>;
/// Server streaming response type for the WriteStream method.
type WriteStreamStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::WriteResponse, tonic::Status>,
>
+ Send
+ 'static;
async fn write_stream(
&self,
request: tonic::Request<tonic::Streaming<super::WriteRequest>>,
) -> std::result::Result<
tonic::Response<Self::WriteStreamStream>,
tonic::Status,
>;
/// Server streaming response type for the ReadAt method.
type ReadAtStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::ReadAtResponse, tonic::Status>,
>
+ Send
+ 'static;
/// rpc Append(AppendRequest) returns (AppendResponse) {};
async fn read_at(
&self,
request: tonic::Request<super::ReadAtRequest>,
) -> std::result::Result<tonic::Response<super::ReadAtResponse>, tonic::Status>;
request: tonic::Request<tonic::Streaming<super::ReadAtRequest>>,
) -> std::result::Result<tonic::Response<Self::ReadAtStream>, tonic::Status>;
async fn list_dir(
&self,
request: tonic::Request<super::ListDirRequest>,
@@ -1518,6 +1730,48 @@ pub mod node_service_server {
tonic::Response<super::DeleteVolumeResponse>,
tonic::Status,
>;
async fn lock(
&self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
>;
async fn un_lock(
&self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
>;
async fn r_lock(
&self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
>;
async fn r_un_lock(
&self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
>;
async fn force_un_lock(
&self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
>;
async fn refresh(
&self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> std::result::Result<
tonic::Response<super::GenerallyLockResponse>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct NodeServiceServer<T: NodeService> {
@@ -2086,21 +2340,72 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/WriteStream" => {
#[allow(non_camel_case_types)]
struct WriteStreamSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::StreamingService<super::WriteRequest>
for WriteStreamSvc<T> {
type Response = super::WriteResponse;
type ResponseStream = T::WriteStreamStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<
tonic::Streaming<super::WriteRequest>,
>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::write_stream(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = WriteStreamSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/ReadAt" => {
#[allow(non_camel_case_types)]
struct ReadAtSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::ReadAtRequest>
> tonic::server::StreamingService<super::ReadAtRequest>
for ReadAtSvc<T> {
type Response = super::ReadAtResponse;
type ResponseStream = T::ReadAtStream;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ReadAtRequest>,
request: tonic::Request<
tonic::Streaming<super::ReadAtRequest>,
>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
@@ -2126,7 +2431,7 @@ pub mod node_service_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
let res = grpc.streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
@@ -2851,6 +3156,276 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/Lock" => {
#[allow(non_camel_case_types)]
struct LockSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::GenerallyLockRequest>
for LockSvc<T> {
type Response = super::GenerallyLockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::lock(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = LockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/UnLock" => {
#[allow(non_camel_case_types)]
struct UnLockSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::GenerallyLockRequest>
for UnLockSvc<T> {
type Response = super::GenerallyLockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::un_lock(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = UnLockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/RLock" => {
#[allow(non_camel_case_types)]
struct RLockSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::GenerallyLockRequest>
for RLockSvc<T> {
type Response = super::GenerallyLockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::r_lock(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = RLockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/RUnLock" => {
#[allow(non_camel_case_types)]
struct RUnLockSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::GenerallyLockRequest>
for RUnLockSvc<T> {
type Response = super::GenerallyLockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::r_un_lock(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = RUnLockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/ForceUnLock" => {
#[allow(non_camel_case_types)]
struct ForceUnLockSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::GenerallyLockRequest>
for ForceUnLockSvc<T> {
type Response = super::GenerallyLockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::force_un_lock(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ForceUnLockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/Refresh" => {
#[allow(non_camel_case_types)]
struct RefreshSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::GenerallyLockRequest>
for RefreshSvc<T> {
type Response = super::GenerallyLockResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GenerallyLockRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::refresh(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = RefreshSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(

View File

@@ -1,28 +1,44 @@
mod generated;
use std::time::Duration;
use std::{error::Error, time::Duration};
use common::globals::GLOBAL_Conn_Map;
pub use generated::*;
use proto_gen::node_service::node_service_client::NodeServiceClient;
use tonic::{codec::CompressionEncoding, transport::Channel};
use tower::timeout::Timeout;
use tonic::{
metadata::MetadataValue,
service::interceptor::InterceptedService,
transport::{Channel, Endpoint},
Request, Status,
};
// Default 100 MB
pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024;
pub fn node_service_time_out_client(
channel: Channel,
time_out: Duration,
max_message_size: usize,
grpc_enable_gzip: bool,
) -> NodeServiceClient<Timeout<Channel>> {
let timeout_channel = Timeout::new(channel, time_out);
let client = NodeServiceClient::<Timeout<Channel>>::new(timeout_channel);
let client = NodeServiceClient::max_decoding_message_size(client, max_message_size);
if grpc_enable_gzip {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
} else {
NodeServiceClient::max_encoding_message_size(client, max_message_size)
}
pub async fn node_service_time_out_client(
addr: &String,
) -> Result<
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
Box<dyn Error>,
> {
let token: MetadataValue<_> = "rustfs rpc".parse()?;
let channel = match GLOBAL_Conn_Map.read().await.get(addr) {
Some(channel) => channel.clone(),
None => {
let connector = Endpoint::from_shared(addr.to_string())?.connect_timeout(Duration::from_secs(60));
let channel = connector.connect().await?;
channel
}
};
GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone());
// let timeout_channel = Timeout::new(channel, Duration::from_secs(60));
Ok(NodeServiceClient::with_interceptor(
channel,
Box::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
}),
))
}

View File

@@ -345,6 +345,16 @@ message DeleteVolumeResponse {
optional string error_info = 2;
}
// lock api have same argument type
message GenerallyLockRequest {
string args = 1;
}
message GenerallyLockResponse {
bool success = 1;
optional string error_info = 2;
}
/* -------------------------------------------------------------------- */
service NodeService {
@@ -363,8 +373,9 @@ service NodeService {
rpc RenamePart(RenamePartRequst) returns (RenamePartResponse) {};
rpc RenameFile(RenameFileRequst) returns (RenameFileResponse) {};
rpc Write(WriteRequest) returns (WriteResponse) {};
rpc WriteStream(stream WriteRequest) returns (stream WriteResponse) {};
// rpc Append(AppendRequest) returns (AppendResponse) {};
rpc ReadAt(ReadAtRequest) returns (ReadAtResponse) {};
rpc ReadAt(stream ReadAtRequest) returns (stream ReadAtResponse) {};
rpc ListDir(ListDirRequest) returns (ListDirResponse) {};
rpc WalkDir(WalkDirRequest) returns (WalkDirResponse) {};
rpc RenameData(RenameDataRequest) returns (RenameDataResponse) {};
@@ -381,4 +392,13 @@ service NodeService {
rpc DeleteVersions(DeleteVersionsRequest) returns (DeleteVersionsResponse) {};
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
/* -------------------------------lock service-------------------------- */
rpc Lock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc UnLock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc RLock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc RUnLock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc ForceUnLock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc Refresh(GenerallyLockRequest) returns (GenerallyLockResponse) {};
}

View File

@@ -11,7 +11,11 @@ rust-version.workspace = true
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
lazy_static.workspace = true
lock.workspace = true
protos.workspace = true
serde_json.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }
tokio = { workspace = true }
tower.workspace = true
url.workspace = true

View File

@@ -0,0 +1,110 @@
#![cfg(test)]
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration, vec};
use lazy_static::lazy_static;
use lock::{
drwmutex::Options,
lock_args::LockArgs,
namespace_lock::{new_nslock, NsLockMap},
new_lock_api,
};
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest};
use tokio::sync::RwLock;
use tonic::{
metadata::MetadataValue,
service::interceptor::InterceptedService,
transport::{Channel, Endpoint},
Request, Status,
};
lazy_static! {
pub static ref GLOBAL_Conn_Map: RwLock<HashMap<String, Channel>> = RwLock::new(HashMap::new());
}
async fn get_client() -> Result<
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
Box<dyn Error>,
> {
let token: MetadataValue<_> = "rustfs rpc".parse()?;
let channel = match GLOBAL_Conn_Map.read().await.get("local") {
Some(channel) => channel.clone(),
None => {
println!("get channel start");
let connector = Endpoint::from_static("http://localhost:9000").connect_timeout(Duration::from_secs(60));
let channel = connector.connect().await?;
// let channel = Channel::from_static("http://localhost:9000").connect().await?;
channel
}
};
GLOBAL_Conn_Map.write().await.insert("local".to_owned(), channel.clone());
// let timeout_channel = Timeout::new(channel, Duration::from_secs(60));
Ok(NodeServiceClient::with_interceptor(
channel,
Box::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
}),
))
}
#[tokio::test]
async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
let args = LockArgs {
uid: "1111".to_string(),
resources: vec!["dandan".to_string()],
owner: "dd".to_string(),
source: "".to_string(),
quorum: 3,
};
let args = serde_json::to_string(&args)?;
let mut client = get_client().await?;
println!("got client");
let request = Request::new(GenerallyLockRequest { args: args.clone() });
println!("start request");
let response = client.lock(request).await?.into_inner();
println!("request ended");
if let Some(error_info) = response.error_info {
assert!(false, "can not get lock: {}", error_info);
}
let request = Request::new(GenerallyLockRequest { args });
let response = client.un_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
assert!(false, "can not get un_lock: {}", error_info);
}
Ok(())
}
#[tokio::test]
async fn test_lock_unlock_ns_lock() -> Result<(), Box<dyn Error>> {
let url = url::Url::parse("http://127.0.0.1:9000/data")?;
let locker = new_lock_api(false, Some(url));
let ns_mutex = Arc::new(RwLock::new(NsLockMap::new(true)));
let mut ns = new_nslock(
Arc::clone(&ns_mutex),
"local".to_string(),
"dandan".to_string(),
vec!["foo".to_string()],
vec![locker],
)
.await;
assert_eq!(
ns.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await
.unwrap(),
true
);
ns.un_lock().await.unwrap();
Ok(())
}

View File

@@ -1 +1,2 @@
mod lock;
mod node_interact_test;

View File

@@ -9,22 +9,24 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait.workspace = true
backon.workspace = true
bytes.workspace = true
thiserror.workspace = true
futures.workspace = true
async-trait.workspace = true
common.workspace = true
tracing.workspace = true
serde.workspace = true
time.workspace = true
serde_json.workspace = true
tracing-error.workspace = true
http.workspace = true
url = "2.5.2"
url.workspace = true
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = "6.0.0"
transform-stream = "0.3.0"
lazy_static = "1.5.0"
lazy_static.workspace = true
lock.workspace = true
regex = "1.10.5"
netif = "0.1.6"
path-absolutize = "3.1.1"

View File

@@ -10,10 +10,10 @@ use crate::disk::error::{
};
use crate::disk::os::check_path_length;
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::error::{Error, Result};
use crate::utils::fs::{lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY};
use crate::utils::path::{has_suffix, SLASH_SEPARATOR};
use crate::{
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
utils,
@@ -1322,6 +1322,7 @@ impl DiskAPI for LocalDisk {
if let Err(e) = utils::fs::access(&volume_dir).await {
if os_is_not_exist(&e) {
os::make_dir_all(&volume_dir, self.root.as_path()).await?;
return Ok(());
}
if os_is_permission(&e) {
return Err(Error::new(DiskError::DiskAccessDenied));

View File

@@ -3,7 +3,7 @@ pub mod error;
pub mod format;
mod local;
pub mod os;
mod remote;
pub mod remote;
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
@@ -21,15 +21,21 @@ use crate::{
};
use endpoint::Endpoint;
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, ReadAtRequest, WriteRequest};
use futures::StreamExt;
use protos::proto_gen::node_service::{
node_service_client::NodeServiceClient, ReadAtRequest, ReadAtResponse, WriteRequest, WriteResponse,
};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc};
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
sync::mpsc::{self, Sender},
};
use tonic::{transport::Channel, Request};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{service::interceptor::InterceptedService, transport::Channel, Request, Status, Streaming};
use tracing::info;
use uuid::Uuid;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
@@ -404,32 +410,77 @@ pub struct RemoteFileWriter {
pub volume: String,
pub path: String,
pub is_append: bool,
client: NodeServiceClient<Channel>,
tx: Sender<WriteRequest>,
resp_stream: Streaming<WriteResponse>,
}
impl RemoteFileWriter {
pub fn new(root: PathBuf, volume: String, path: String, is_append: bool, client: NodeServiceClient<Channel>) -> Self {
Self {
pub async fn new(
root: PathBuf,
volume: String,
path: String,
is_append: bool,
mut client: NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
let response = client.write_stream(in_stream).await.unwrap();
let resp_stream = response.into_inner();
Ok(Self {
root,
volume,
path,
is_append,
client,
}
tx,
resp_stream,
})
}
}
#[async_trait::async_trait]
impl Write for RemoteFileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let request = Request::new(WriteRequest {
let request = WriteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
is_append: self.is_append,
data: buf.to_vec(),
});
let _response = self.client.write(request).await?.into_inner();
};
self.tx.send(request).await?;
if let Some(resp) = self.resp_stream.next().await {
// match resp {
// Ok(resp) => {
// if resp.success {
// info!("write stream success");
// } else {
// info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string()));
// }
// }
// Err(_err) => {
// }
// }
let resp = resp?;
if resp.success {
info!("write stream success");
} else {
let error_info = resp.error_info.unwrap_or("".to_string());
info!("write stream failed: {}", error_info);
return Err(Error::from_string(error_info));
}
} else {
let error_info = "can not get response";
info!("write stream failed: {}", error_info);
return Err(Error::from_string(error_info));
}
Ok(())
}
}
@@ -481,32 +532,62 @@ pub struct RemoteFileReader {
pub root: PathBuf,
pub volume: String,
pub path: String,
client: NodeServiceClient<Channel>,
tx: Sender<ReadAtRequest>,
resp_stream: Streaming<ReadAtResponse>,
}
impl RemoteFileReader {
pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient<Channel>) -> Self {
Self {
pub async fn new(
root: PathBuf,
volume: String,
path: String,
mut client: NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
let response = client.read_at(in_stream).await.unwrap();
let resp_stream = response.into_inner();
Ok(Self {
root,
volume,
path,
client,
}
tx,
resp_stream,
})
}
}
#[async_trait::async_trait]
impl ReadAt for RemoteFileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
let request = Request::new(ReadAtRequest {
let request = ReadAtRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
offset: offset.try_into().unwrap(),
length: length.try_into().unwrap(),
});
let response = self.client.read_at(request).await?.into_inner();
};
self.tx.send(request).await?;
Ok((response.data, response.read_size.try_into().unwrap()))
if let Some(resp) = self.resp_stream.next().await {
let resp = resp?;
if resp.success {
info!("read at stream success");
Ok((resp.data, resp.read_size.try_into().unwrap()))
} else {
let error_info = resp.error_info.unwrap_or("".to_string());
info!("read at stream failed: {}", error_info);
Err(Error::from_string(error_info))
}
} else {
let error_info = "can not get response";
info!("read at stream failed: {}", error_info);
Err(Error::from_string(error_info))
}
}
}

View File

@@ -1,96 +1,53 @@
use std::{path::PathBuf, sync::Arc, time::Duration};
use std::path::PathBuf;
use futures::lock::Mutex;
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
node_service_client::NodeServiceClient, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest,
DeleteVolumeRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequst, RenamePartRequst,
StatVolumeRequest, UpdateMetadataRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest,
ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest,
ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, UpdateMetadataRequest, WalkDirRequest,
WriteAllRequest, WriteMetadataRequest,
},
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use tokio::{fs, sync::RwLock};
use tonic::{
transport::{Channel, Endpoint as tonic_Endpoint},
Request,
};
use tower::timeout::Timeout;
use tonic::Request;
use tracing::info;
use uuid::Uuid;
use crate::{
disk::error::DiskError,
error::{Error, Result},
store_api::{FileInfo, RawFileInfo},
};
use super::{
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskLocation, DiskOption, FileInfoVersions, FileReader, FileWriter,
MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp,
UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
};
use crate::{
disk::error::DiskError,
error::{Error, Result},
store_api::{FileInfo, RawFileInfo},
};
use protos::proto_gen::node_service::RenamePartRequst;
#[derive(Debug)]
pub struct RemoteDisk {
id: Mutex<Option<Uuid>>,
channel: Arc<RwLock<Option<Channel>>>,
url: url::Url,
pub id: Mutex<Option<Uuid>>,
pub addr: String,
pub url: url::Url,
pub root: PathBuf,
endpoint: Endpoint,
}
impl RemoteDisk {
pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result<Self> {
let root = fs::canonicalize(ep.url.path()).await?;
// let root = fs::canonicalize(ep.url.path()).await?;
let root = PathBuf::from(ep.url.path());
let addr = format!("{}://{}:{}", ep.url.scheme(), ep.url.host_str().unwrap(), ep.url.port().unwrap());
Ok(Self {
channel: Arc::new(RwLock::new(None)),
id: Mutex::new(None),
addr,
url: ep.url.clone(),
root,
id: Mutex::new(None),
endpoint: ep.clone(),
})
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
info!("disk url: {}", addr);
let connector = tonic_Endpoint::from_shared(addr.clone())?;
let new_channel = connector.connect().await.map_err(|_err| DiskError::DiskNotFound)?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
Ok(NodeServiceClient::connect(addr).await?)
}
}
// TODO: all api need to handle errors
@@ -109,7 +66,7 @@ impl DiskAPI for RemoteDisk {
}
async fn is_online(&self) -> bool {
// TODO: 连接状态
if let Ok(_) = self.get_client_v2().await {
if let Ok(_) = node_service_time_out_client(&self.addr).await {
return true;
}
false
@@ -144,7 +101,9 @@ impl DiskAPI for RemoteDisk {
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>> {
info!("read_all");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -164,7 +123,9 @@ impl DiskAPI for RemoteDisk {
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
info!("write_all");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(WriteAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -184,7 +145,9 @@ impl DiskAPI for RemoteDisk {
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
info!("delete");
let options = serde_json::to_string(&opt)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -202,7 +165,9 @@ impl DiskAPI for RemoteDisk {
}
async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec<u8>) -> Result<()> {
info!("rename_part");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(RenamePartRequst {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -222,7 +187,9 @@ impl DiskAPI for RemoteDisk {
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
info!("rename_file");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(RenameFileRequst {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -242,39 +209,56 @@ impl DiskAPI for RemoteDisk {
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
info!("create_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
self.get_client_v2().await?,
)))
Ok(FileWriter::Remote(
RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
)
.await?,
))
}
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
self.get_client_v2().await?,
)))
Ok(FileWriter::Remote(
RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
)
.await?,
))
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(FileReader::Remote(RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
self.get_client_v2().await?,
)))
Ok(FileReader::Remote(
RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
)
.await?,
))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
info!("list_dir");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ListDirRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -292,7 +276,9 @@ impl DiskAPI for RemoteDisk {
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
info!("walk_dir");
let walk_dir_options = serde_json::to_string(&opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(WalkDirRequest {
disk: self.root.to_string_lossy().to_string(),
walk_dir_options,
@@ -323,7 +309,9 @@ impl DiskAPI for RemoteDisk {
) -> Result<RenameDataResp> {
info!("rename_data");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(RenameDataRequest {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -346,7 +334,9 @@ impl DiskAPI for RemoteDisk {
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
info!("make_volumes");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(MakeVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
@@ -363,7 +353,9 @@ impl DiskAPI for RemoteDisk {
async fn make_volume(&self, volume: &str) -> Result<()> {
info!("make_volume");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(MakeVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -380,7 +372,9 @@ impl DiskAPI for RemoteDisk {
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
info!("list_volumes");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ListVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
});
@@ -402,7 +396,9 @@ impl DiskAPI for RemoteDisk {
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
info!("stat_volume");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(StatVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -422,7 +418,9 @@ impl DiskAPI for RemoteDisk {
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> {
info!("delete_paths");
let paths = paths.iter().map(|s| s.to_string()).collect::<Vec<String>>();
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeletePathsRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -442,7 +440,9 @@ impl DiskAPI for RemoteDisk {
let file_info = serde_json::to_string(&fi)?;
let opts = serde_json::to_string(&opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(UpdateMetadataRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -463,7 +463,9 @@ impl DiskAPI for RemoteDisk {
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
info!("write_metadata");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(WriteMetadataRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -490,7 +492,9 @@ impl DiskAPI for RemoteDisk {
) -> Result<FileInfo> {
info!("read_version");
let opts = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadVersionRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -512,7 +516,9 @@ impl DiskAPI for RemoteDisk {
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
info!("read_xl");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadXlRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -542,7 +548,9 @@ impl DiskAPI for RemoteDisk {
let file_info = serde_json::to_string(&fi)?;
let opts = serde_json::to_string(&opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteVersionRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -574,7 +582,9 @@ impl DiskAPI for RemoteDisk {
for file_info_versions in versions.iter() {
versions_str.push(serde_json::to_string(file_info_versions)?);
}
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteVersionsRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -607,7 +617,9 @@ impl DiskAPI for RemoteDisk {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
info!("read_multiple");
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ReadMultipleRequest {
disk: self.root.to_string_lossy().to_string(),
read_multiple_req,
@@ -630,7 +642,9 @@ impl DiskAPI for RemoteDisk {
async fn delete_volume(&self, volume: &str) -> Result<()> {
info!("delete_volume");
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),

View File

@@ -1,15 +1,11 @@
use async_trait::async_trait;
use futures::future::join_all;
use protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest};
use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tonic::Request;
use tower::timeout::Timeout;
use tracing::{info, warn};
use tracing::warn;
use crate::store::all_local_disk;
use crate::{
@@ -412,55 +408,13 @@ impl PeerS3Client for LocalPeerS3Client {
pub struct RemotePeerS3Client {
pub node: Option<Node>,
pub pools: Option<Vec<usize>>,
connector: Endpoint,
channel: Arc<RwLock<Option<Channel>>>,
addr: String,
}
impl RemotePeerS3Client {
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
let connector =
Endpoint::from_shared(format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default())).unwrap();
Self {
node,
pools,
connector,
channel: Arc::new(RwLock::new(None)),
}
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let new_channel = self.connector.connect().await?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
// let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
let addr = format!("{}", self.node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
Ok(NodeServiceClient::connect(addr).await?)
let addr = format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
Self { node, pools, addr }
}
}
@@ -471,7 +425,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
let bucket_infos = response
@@ -484,7 +440,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
@@ -500,7 +458,9 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let options = serde_json::to_string(opts)?;
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
@@ -512,7 +472,10 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
let mut client = self.get_client_v2().await?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});

View File

@@ -1,6 +1,13 @@
#![allow(clippy::map_entry)]
use std::{collections::HashMap, sync::Arc};
use common::globals::GLOBAL_Local_Node_Name;
use futures::future::join_all;
use http::HeaderMap;
use lock::{namespace_lock::NsLockMap, new_lock_api, LockApi};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
disk::{
format::{DistributionAlgoVersion, FormatV3},
@@ -17,20 +24,18 @@ use crate::{
},
utils::hash,
};
use futures::future::join_all;
use http::HeaderMap;
use tokio::sync::RwLock;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct Sets {
pub id: Uuid,
// pub sets: Vec<Objects>,
// pub disk_set: Vec<Vec<Option<DiskStore>>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub lockers: Vec<Vec<LockApi>>,
pub disk_set: Vec<Arc<SetDisks>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub pool_idx: usize,
pub endpoints: PoolEndpoints,
@@ -53,6 +58,24 @@ impl Sets {
let set_count = fm.erasure.sets.len();
let set_drive_count = fm.erasure.sets[0].len();
let mut unique: Vec<Vec<String>> = vec![vec![]; set_count];
let mut lockers: Vec<Vec<LockApi>> = vec![vec![]; set_count];
endpoints.endpoints.as_ref().iter().enumerate().for_each(|(idx, endpoint)| {
let set_idx = idx / set_drive_count;
if endpoint.is_local && !unique[set_idx].contains(&"local".to_string()) {
unique[set_idx].push("local".to_string());
lockers[set_idx].push(new_lock_api(true, None));
}
if !endpoint.is_local {
let host_port = format!("{}:{}", endpoint.url.host_str().unwrap(), endpoint.url.port().unwrap().to_string());
if !unique[set_idx].contains(&host_port) {
unique[set_idx].push(host_port);
lockers[set_idx].push(new_lock_api(false, Some(endpoint.url.clone())));
}
}
});
let mut disk_set = Vec::with_capacity(set_count);
for i in 0..set_count {
@@ -99,6 +122,9 @@ impl Sets {
// warn!("sets new set_drive {:?}", &set_drive);
let set_disks = SetDisks {
lockers: lockers[i].clone(),
locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(),
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))),
disks: RwLock::new(set_drive),
set_drive_count,
default_parity_count: partiy_count,
@@ -115,6 +141,7 @@ impl Sets {
id: fm.id,
// sets: todo!(),
disk_set,
lockers,
pool_idx,
endpoints: endpoints.clone(),
format: fm.clone(),

View File

@@ -1,4 +1,5 @@
#![allow(clippy::map_entry)]
use crate::disk::endpoint::EndpointType;
use crate::{
bucket_meta::BucketMetadata,
disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
@@ -15,6 +16,7 @@ use crate::{
store_init, utils,
};
use backon::{ExponentialBuilder, Retryable};
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port};
use futures::future::join_all;
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
@@ -173,6 +175,13 @@ impl ECStore {
let mut local_disks = Vec::new();
init_local_peer(
&endpoint_pools,
&GLOBAL_Rustfs_Host.read().await.to_string(),
&GLOBAL_Rustfs_Port.read().await.to_string(),
)
.await;
debug!("endpoint_pools: {:?}", endpoint_pools);
for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() {
@@ -805,3 +814,26 @@ impl StorageAPI for ECStore {
unimplemented!()
}
}
async fn init_local_peer(endpoint_pools: &EndpointServerPools, host: &String, port: &String) {
let mut peer_set = Vec::new();
endpoint_pools.as_ref().iter().for_each(|endpoints| {
endpoints.endpoints.as_ref().iter().for_each(|endpoint| {
if endpoint.get_type() == EndpointType::Url && endpoint.is_local && endpoint.url.has_host() {
peer_set.push(endpoint.url.host_str().unwrap().to_string());
}
});
});
if peer_set.is_empty() {
if !host.is_empty() {
*GLOBAL_Local_Node_Name.write().await = format!("{}:{}", host, port);
return;
}
*GLOBAL_Local_Node_Name.write().await = format!("127.0.0.1:{}", port);
return;
}
*GLOBAL_Local_Node_Name.write().await = peer_set[0].clone();
}

View File

@@ -385,6 +385,7 @@ pub struct ObjectOptions {
pub delete_prefix: bool,
pub version_id: String,
pub no_lock: bool,
}
// impl Default for ObjectOptions {

View File

@@ -13,14 +13,17 @@ log.workspace = true
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true
common.workspace = true
ecstore.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
h2 = "0.4.6"
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
lock.workspace = true
mime.workspace = true
netif.workspace = true
pin-project-lite.workspace = true
@@ -38,6 +41,7 @@ tokio = { workspace = true, features = [
"net",
"signal",
] }
tokio-stream.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-reflection.workspace = true
tower.workspace = true

View File

@@ -1,3 +1,5 @@
use std::{error::Error, io::ErrorKind, pin::Pin};
use ecstore::{
disk::{DeleteOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, UpdateMetadataOpts, WalkDirOptions},
erasure::{ReadAt, Write},
@@ -5,54 +7,72 @@ use ecstore::{
store::{all_local_disk_path, find_local_disk},
store_api::{BucketOptions, DeleteBucketOptions, FileInfo, MakeBucketOptions},
};
use tonic::{Request, Response, Status};
use tracing::{debug, error, info};
use futures::{Stream, StreamExt};
use lock::{lock_args::LockArgs, Locker, GLOBAL_LOCAL_SERVER};
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
DeleteBucketRequest, DeleteBucketResponse, DeletePathsRequest, DeletePathsResponse, DeleteRequest, DeleteResponse,
DeleteVersionRequest, DeleteVersionResponse, DeleteVersionsRequest, DeleteVersionsResponse, DeleteVolumeRequest,
DeleteVolumeResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest,
ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest,
MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse,
ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse,
ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse,
RenamePartRequst, RenamePartResponse, StatVolumeRequest, StatVolumeResponse, UpdateMetadataRequest,
UpdateMetadataResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest,
WriteMetadataResponse, WriteRequest, WriteResponse,
node_service_server::NodeService as Node, DeleteBucketRequest, DeleteBucketResponse, DeletePathsRequest,
DeletePathsResponse, DeleteRequest, DeleteResponse, DeleteVersionRequest, DeleteVersionResponse, DeleteVersionsRequest,
DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse, GenerallyLockRequest, GenerallyLockResponse,
GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse,
ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse,
MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest,
ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest,
ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, RenamePartRequst,
RenamePartResponse, StatVolumeRequest, StatVolumeResponse, UpdateMetadataRequest, UpdateMetadataResponse, WalkDirRequest,
WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest,
WriteResponse,
},
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, error, info};
#[derive(Debug)]
struct NodeService {
pub local_peer: LocalPeerS3Client,
type ResponseStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send>>;
fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
let mut err: &(dyn Error + 'static) = err_status;
loop {
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
return Some(io_err);
}
// h2::Error do not expose std::io::Error with `source()`
// https://github.com/hyperium/h2/pull/462
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
if let Some(io_err) = h2_err.get_io() {
return Some(io_err);
}
}
err = match err.source() {
Some(err) => err,
None => return None,
};
}
}
pub fn make_server() -> NodeServer<impl Node> {
// let local_disks = all_local_disk().await;
#[derive(Debug)]
pub struct NodeService {
local_peer: LocalPeerS3Client,
}
pub fn make_server() -> NodeService {
let local_peer = LocalPeerS3Client::new(None, None);
NodeServer::new(NodeService { local_peer })
NodeService { local_peer }
}
impl NodeService {
async fn find_disk(&self, disk_path: &String) -> Option<DiskStore> {
find_local_disk(disk_path).await
// let disk_path = match fs::canonicalize(disk_path).await {
// Ok(disk_path) => disk_path,
// Err(_) => return None,
// };
// self.local_peer.local_disks.iter().find(|&x| x.path() == disk_path).cloned()
}
async fn all_disk(&self) -> Vec<String> {
all_local_disk_path().await
// self.local_peer
// .local_disks
// .iter()
// .map(|disk| disk.path().to_string_lossy().to_string())
// .collect()
}
}
@@ -370,44 +390,183 @@ impl Node for NodeService {
}
}
async fn read_at(&self, request: Request<ReadAtRequest>) -> Result<Response<ReadAtResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_file(&request.volume, &request.path).await {
Ok(mut file_reader) => {
match file_reader
.read_at(request.offset.try_into().unwrap(), request.length.try_into().unwrap())
type WriteStreamStream = ResponseStream<WriteResponse>;
async fn write_stream(&self, request: Request<Streaming<WriteRequest>>) -> Result<Response<Self::WriteStreamStream>, Status> {
info!("write_stream");
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
let mut file_ref = None;
while let Some(result) = in_stream.next().await {
match result {
// Ok(v) => tx
// .send(Ok(EchoResponse { message: v.message }))
// .await
// .expect("working rx"),
Ok(v) => {
match file_ref.as_ref() {
Some(_) => (),
None => {
if let Some(disk) = find_local_disk(&v.disk).await {
let file_writer = if v.is_append {
disk.append_file(&v.volume, &v.path).await
} else {
disk.create_file("", &v.volume, &v.path, 0).await
};
match file_writer {
Ok(file_writer) => file_ref = Some(file_writer),
Err(err) => {
tx.send(Ok(WriteResponse {
success: false,
error_info: Some(err.to_string()),
}))
.await
.expect("working rx");
break;
}
}
} else {
tx.send(Ok(WriteResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
.await
.expect("working rx");
break;
}
}
};
match file_ref.as_mut().unwrap().write(&v.data).await {
Ok(_) => tx.send(Ok(WriteResponse {
success: true,
error_info: None,
})),
Err(err) => tx.send(Ok(WriteResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
.await
{
Ok((data, read_size)) => Ok(tonic::Response::new(ReadAtResponse {
success: true,
data,
read_size: read_size.try_into().unwrap(),
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
.unwrap();
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was dropped
}
}
}
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some("can not find disk".to_string()),
}))
}
println!("\tstream ended");
});
let out_stream = ReceiverStream::new(rx);
Ok(tonic::Response::new(Box::pin(out_stream)))
}
type ReadAtStream = ResponseStream<ReadAtResponse>;
async fn read_at(&self, request: Request<Streaming<ReadAtRequest>>) -> Result<Response<Self::ReadAtStream>, Status> {
info!("read_at");
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
let mut file_ref = None;
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => {
match file_ref.as_ref() {
Some(_) => (),
None => {
if let Some(disk) = find_local_disk(&v.disk).await {
match disk.read_file(&v.volume, &v.path).await {
Ok(file_reader) => file_ref = Some(file_reader),
Err(err) => {
tx.send(Ok(ReadAtResponse {
success: false,
data: Vec::new(),
error_info: Some(err.to_string()),
read_size: -1,
}))
.await
.expect("working rx");
break;
}
}
} else {
tx.send(Ok(ReadAtResponse {
success: false,
data: Vec::new(),
error_info: Some("can not find disk".to_string()),
read_size: -1,
}))
.await
.expect("working rx");
break;
}
}
};
match file_ref
.as_mut()
.unwrap()
.read_at(v.offset.try_into().unwrap(), v.length.try_into().unwrap())
.await
{
Ok((data, read_size)) => tx.send(Ok(ReadAtResponse {
success: true,
data,
read_size: read_size.try_into().unwrap(),
error_info: None,
})),
Err(err) => tx.send(Ok(ReadAtResponse {
success: false,
data: Vec::new(),
error_info: Some(err.to_string()),
read_size: -1,
})),
}
.await
.unwrap();
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was dropped
}
}
}
}
println!("\tstream ended");
});
let out_stream = ReceiverStream::new(rx);
Ok(tonic::Response::new(Box::pin(out_stream)))
}
async fn list_dir(&self, request: Request<ListDirRequest>) -> Result<Response<ListDirResponse>, Status> {
@@ -965,4 +1124,124 @@ impl Node for NodeService {
}))
}
}
async fn lock(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {
Ok(args) => match GLOBAL_LOCAL_SERVER.write().await.lock(args).await {
Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse {
success: result,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not lock, args: {}, err: {}", args, err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
})),
}
}
async fn un_lock(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {
Ok(args) => match GLOBAL_LOCAL_SERVER.write().await.unlock(args).await {
Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse {
success: result,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not unlock, args: {}, err: {}", args, err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
})),
}
}
async fn r_lock(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {
Ok(args) => match GLOBAL_LOCAL_SERVER.write().await.rlock(args).await {
Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse {
success: result,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not rlock, args: {}, err: {}", args, err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
})),
}
}
async fn r_un_lock(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {
Ok(args) => match GLOBAL_LOCAL_SERVER.write().await.runlock(args).await {
Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse {
success: result,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not runlock, args: {}, err: {}", args, err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
})),
}
}
async fn force_un_lock(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {
Ok(args) => match GLOBAL_LOCAL_SERVER.write().await.force_unlock(args).await {
Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse {
success: result,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not force_unlock, args: {}, err: {}", args, err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
})),
}
}
async fn refresh(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {
Ok(args) => match GLOBAL_LOCAL_SERVER.write().await.refresh(args).await {
Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse {
success: result,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not refresh, args: {}, err: {}", args, err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
success: false,
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
})),
}
}
}

View File

@@ -4,9 +4,9 @@ mod service;
mod storage;
use clap::Parser;
use common::error::{Error, Result};
use ecstore::{
endpoints::EndpointServerPools,
error::Result,
store::{init_local_disks, update_erasure_type, ECStore},
};
use grpc::make_server;
@@ -15,10 +15,12 @@ use hyper_util::{
server::conn::auto::Builder as ConnBuilder,
service::TowerToHyperService,
};
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use s3s::{auth::SimpleAuth, service::S3ServiceBuilder};
use service::hybrid;
use std::{io::IsTerminal, net::SocketAddr, str::FromStr};
use tokio::net::TcpListener;
use tonic::{metadata::MetadataValue, Request, Status};
use tracing::{debug, info};
use tracing_error::ErrorLayer;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
@@ -39,6 +41,15 @@ fn setup_tracing() {
subscriber.try_init().expect("failed to set global default subscriber");
}
fn check_auth(req: Request<()>) -> Result<Request<()>, Status> {
let token: MetadataValue<_> = "rustfs rpc".parse().unwrap();
match req.metadata().get("authorization") {
Some(t) if token == t => Ok(req),
_ => Err(Status::unauthenticated("No valid auth token")),
}
}
fn main() -> Result<()> {
//解析获得到的参数
let opt = config::Opt::parse();
@@ -76,12 +87,15 @@ async fn run(opt: config::Opt) -> Result<()> {
// };
// 用于rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())?;
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())
.map_err(|err| Error::from_string(err.to_string()))?;
update_erasure_type(setup_type).await;
// 初始化本地磁盘
init_local_disks(endpoint_pools.clone()).await?;
init_local_disks(endpoint_pools.clone())
.await
.map_err(|err| Error::from_string(err.to_string()))?;
// Setup S3 service
// 本项目使用s3s库来实现s3服务
@@ -119,7 +133,7 @@ async fn run(opt: config::Opt) -> Result<()> {
b.build()
};
let rpc_service = make_server();
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
tokio::spawn(async move {
let hyper_service = service.into_shared();
@@ -165,7 +179,9 @@ async fn run(opt: config::Opt) -> Result<()> {
});
// init store
ECStore::new(opt.address.clone(), endpoint_pools.clone()).await?;
ECStore::new(opt.address.clone(), endpoint_pools.clone())
.await
.map_err(|err| Error::from_string(err.to_string()))?;
info!(" init store success!");
tokio::select! {

View File

@@ -30,7 +30,7 @@ use std::str::FromStr;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
use ecstore::error::Result;
use common::error::Result;
use tracing::debug;
macro_rules! try_ {