diff --git a/Cargo.lock b/Cargo.lock index c8a44c21..a814da0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,10 +435,12 @@ version = "0.0.1" dependencies = [ "ecstore", "flatbuffers", + "lock", "protos", "serde_json", "tokio", "tonic", + "url", ] [[package]] diff --git a/common/lock/src/drwmutex.rs b/common/lock/src/drwmutex.rs index 086a488c..e1c66460 100644 --- a/common/lock/src/drwmutex.rs +++ b/common/lock/src/drwmutex.rs @@ -107,7 +107,7 @@ impl DRWMutex { tolerance = locker_len - quorum; let mut attempt = 0; - let mut locks = Vec::with_capacity(self.lockers.len()); + let mut locks = vec!["".to_string(); self.lockers.len()]; loop { if self.inner_lock(&mut locks, id, source, is_read_lock, tolerance, quorum).await { @@ -202,12 +202,12 @@ impl DRWMutex { pub async fn un_lock(&mut self) { if self.write_locks.is_empty() || !self.write_locks.iter().any(|w_lock| is_locked(w_lock)) { - panic!("Trying to un_lock() while no lock() is active") + panic!("Trying to un_lock() while no lock() is active, write_locks: {:?}", self.write_locks) } let tolerance = self.lockers.len() / 2; let is_read_lock = false; - let mut locks = self.write_locks.clone(); + let mut locks = std::mem::take(&mut self.write_locks); let start = Instant::now(); loop { if self.release_all(tolerance, &mut locks, is_read_lock).await { @@ -222,13 +222,13 @@ impl DRWMutex { } pub async fn un_r_lock(&mut self) { - if self.write_locks.is_empty() || !self.write_locks.iter().any(|w_lock| is_locked(w_lock)) { - panic!("Trying to un_r_lock() while no r_lock() is active") + if self.read_locks.is_empty() || !self.read_locks.iter().any(|r_lock| is_locked(r_lock)) { + panic!("Trying to un_r_lock() while no r_lock() is active, read_locks: {:?}", self.read_locks) } let tolerance = self.lockers.len() / 2; let is_read_lock = true; - let mut locks = self.write_locks.clone(); + let mut locks = std::mem::take(&mut self.read_locks); let start = Instant::now(); loop { if self.release_all(tolerance, &mut locks, is_read_lock).await { @@ -249,7 +249,7 @@ impl DRWMutex { } } - check_failed_unlocks(&locks, tolerance) + !check_failed_unlocks(&locks, tolerance) } } diff --git a/common/lock/src/lib.rs b/common/lock/src/lib.rs index 8e3cd730..02aa3235 100644 --- a/common/lock/src/lib.rs +++ b/common/lock/src/lib.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::sync::Arc; use async_trait::async_trait; @@ -5,7 +7,7 @@ use common::error::Result; use lazy_static::lazy_static; use local_locker::LocalLocker; use lock_args::LockArgs; -use remote_client::RemoteClinet; +use remote_client::RemoteClient; use tokio::sync::RwLock; pub mod drwmutex; @@ -37,7 +39,7 @@ pub trait Locker { #[derive(Debug, Clone)] pub enum LockApi { Local, - Remote(RemoteClinet), + Remote(RemoteClient), } #[async_trait] @@ -111,5 +113,5 @@ pub fn new_lock_api(is_local: bool, url: Option) -> LockApi { return LockApi::Local; } - LockApi::Remote(RemoteClinet::new(url.unwrap())) + LockApi::Remote(RemoteClient::new(url.unwrap())) } diff --git a/common/lock/src/local_locker.rs b/common/lock/src/local_locker.rs index 2e47dcd2..0d22ce1b 100644 --- a/common/lock/src/local_locker.rs +++ b/common/lock/src/local_locker.rs @@ -289,7 +289,7 @@ impl Locker for LocalLocker { // TODO: need add timeout mechanism async fn force_unlock(&mut self, args: &LockArgs) -> Result { - let mut reply = false; + let mut reply: bool; if args.uid.is_empty() { args.resources.iter().for_each(|resource| match self.lock_map.get(resource) { Some(lris) => { diff --git a/common/lock/src/namespace_lock.rs b/common/lock/src/namespace_lock.rs index 6b8a43e4..dd612039 100644 --- a/common/lock/src/namespace_lock.rs +++ b/common/lock/src/namespace_lock.rs @@ -34,6 +34,13 @@ pub struct NsLockMap { } impl NsLockMap { + pub fn new(is_dist_erasure: bool) -> Self { + Self { + is_dist_erasure, + ..Default::default() + } + } + async fn lock( &mut self, volume: &String, diff --git a/common/lock/src/remote_client.rs b/common/lock/src/remote_client.rs index 09118761..731cb923 100644 --- a/common/lock/src/remote_client.rs +++ b/common/lock/src/remote_client.rs @@ -7,11 +7,11 @@ use tracing::info; use crate::{lock_args::LockArgs, Locker}; #[derive(Debug, Clone)] -pub struct RemoteClinet { +pub struct RemoteClient { url: url::Url, } -impl RemoteClinet { +impl RemoteClient { pub fn new(url: url::Url) -> Self { Self { url } } @@ -24,7 +24,7 @@ impl RemoteClinet { } #[async_trait] -impl Locker for RemoteClinet { +impl Locker for RemoteClient { async fn lock(&mut self, args: &LockArgs) -> Result { info!("remote lock"); let args = serde_json::to_string(args)?; diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 90f99158..114ea8c1 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -11,7 +11,9 @@ rust-version.workspace = true [dependencies] ecstore.workspace = true flatbuffers.workspace = true +lock.workspace = true protos.workspace = true serde_json.workspace = true tonic = { version = "0.12.1", features = ["gzip"] } -tokio = { workspace = true } \ No newline at end of file +tokio = { workspace = true } +url.workspace = true \ No newline at end of file diff --git a/e2e_test/src/reliant/lock.rs b/e2e_test/src/reliant/lock.rs new file mode 100644 index 00000000..f88830f9 --- /dev/null +++ b/e2e_test/src/reliant/lock.rs @@ -0,0 +1,66 @@ +#![cfg(test)] + +use std::{error::Error, sync::Arc, time::Duration, vec}; + +use lock::{ + drwmutex::Options, + lock_args::LockArgs, + namespace_lock::{new_nslock, NsLockMap}, + new_lock_api, +}; +use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest}; +use tokio::sync::RwLock; +use tonic::Request; + +async fn get_client() -> Result, Box> { + // Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?) + Ok(NodeServiceClient::connect("http://localhost:9000").await?) +} + +#[tokio::test] +async fn test_lock_unlock_rpc() -> Result<(), Box> { + let args = LockArgs { + uid: "1111".to_string(), + resources: vec!["dandan".to_string()], + owner: "dd".to_string(), + source: "".to_string(), + quorum: 3, + }; + let args = serde_json::to_string(&args)?; + + let mut client = get_client().await?; + let request = Request::new(GenerallyLockRequest { args }); + + let response = client.lock(request).await?.into_inner(); + if let Some(error_info) = response.error_info { + assert!(false, "can not get lock: {}", error_info); + } + Ok(()) +} + +#[tokio::test] +async fn test_lock_unlock_ns_lock() -> Result<(), Box> { + let url = url::Url::parse("http://127.0.0.1:9000/data")?; + let locker = new_lock_api(false, Some(url)); + let ns_mutex = Arc::new(RwLock::new(NsLockMap::new(true))); + let mut ns = new_nslock( + Arc::clone(&ns_mutex), + "local".to_string(), + "dandan".to_string(), + vec!["foo".to_string()], + vec![locker], + ) + .await; + assert_eq!( + ns.get_lock(&Options { + timeout: Duration::from_secs(5), + retry_interval: Duration::from_secs(1), + }) + .await + .unwrap(), + true + ); + + ns.un_lock().await.unwrap(); + Ok(()) +} diff --git a/e2e_test/src/reliant/mod.rs b/e2e_test/src/reliant/mod.rs index 67d6e260..897f8ffc 100644 --- a/e2e_test/src/reliant/mod.rs +++ b/e2e_test/src/reliant/mod.rs @@ -1 +1,2 @@ +mod lock; mod node_interact_test; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 69aefce4..3102db44 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -54,11 +54,15 @@ impl Sets { let mut lockers: Vec> = vec![vec![]; set_count]; endpoints.endpoints.as_ref().iter().enumerate().for_each(|(idx, endpoint)| { let set_idx = idx / set_drive_count; - if !unique[set_idx].contains(&endpoint.url.host_str().unwrap().to_string()) { - if endpoint.is_local { - unique[set_idx].push(endpoint.url.host_str().unwrap().to_string()); - lockers[set_idx].push(new_lock_api(true, None)); - } else { + if endpoint.is_local && !unique[set_idx].contains(&"local".to_string()) { + unique[set_idx].push("local".to_string()); + lockers[set_idx].push(new_lock_api(true, None)); + } + + if !endpoint.is_local { + let host_port = format!("{}:{}", endpoint.url.host_str().unwrap(), endpoint.url.port().unwrap().to_string()); + if !unique[set_idx].contains(&host_port) { + unique[set_idx].push(host_port); lockers[set_idx].push(new_lock_api(false, Some(endpoint.url.clone()))); } } @@ -103,7 +107,7 @@ impl Sets { disks: set_drive, lockers: lockers[i].clone(), locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(), - ns_mutex: Arc::new(RwLock::new(NsLockMap::default())), + ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))), set_drive_count, parity_count: partiy_count, set_index: i, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 58b924e3..6f6f2302 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -814,9 +814,11 @@ async fn init_local_peer(endpoint_pools: &EndpointServerPools, host: &String, po if peer_set.is_empty() { if !host.is_empty() { *GLOBAL_Local_Node_Name.write().await = format!("{}:{}", host, port); + return; } *GLOBAL_Local_Node_Name.write().await = format!("127.0.0.1:{}", port); + return; } *GLOBAL_Local_Node_Name.write().await = peer_set[0].clone();