put_object func add lock

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-09-27 10:31:16 +08:00
parent ca3dac9a25
commit a65b074e9f
11 changed files with 107 additions and 21 deletions

2
Cargo.lock generated
View File

@@ -435,10 +435,12 @@ version = "0.0.1"
dependencies = [
"ecstore",
"flatbuffers",
"lock",
"protos",
"serde_json",
"tokio",
"tonic",
"url",
]
[[package]]

View File

@@ -107,7 +107,7 @@ impl DRWMutex {
tolerance = locker_len - quorum;
let mut attempt = 0;
let mut locks = Vec::with_capacity(self.lockers.len());
let mut locks = vec!["".to_string(); self.lockers.len()];
loop {
if self.inner_lock(&mut locks, id, source, is_read_lock, tolerance, quorum).await {
@@ -202,12 +202,12 @@ impl DRWMutex {
pub async fn un_lock(&mut self) {
if self.write_locks.is_empty() || !self.write_locks.iter().any(|w_lock| is_locked(w_lock)) {
panic!("Trying to un_lock() while no lock() is active")
panic!("Trying to un_lock() while no lock() is active, write_locks: {:?}", self.write_locks)
}
let tolerance = self.lockers.len() / 2;
let is_read_lock = false;
let mut locks = self.write_locks.clone();
let mut locks = std::mem::take(&mut self.write_locks);
let start = Instant::now();
loop {
if self.release_all(tolerance, &mut locks, is_read_lock).await {
@@ -222,13 +222,13 @@ impl DRWMutex {
}
pub async fn un_r_lock(&mut self) {
if self.write_locks.is_empty() || !self.write_locks.iter().any(|w_lock| is_locked(w_lock)) {
panic!("Trying to un_r_lock() while no r_lock() is active")
if self.read_locks.is_empty() || !self.read_locks.iter().any(|r_lock| is_locked(r_lock)) {
panic!("Trying to un_r_lock() while no r_lock() is active, read_locks: {:?}", self.read_locks)
}
let tolerance = self.lockers.len() / 2;
let is_read_lock = true;
let mut locks = self.write_locks.clone();
let mut locks = std::mem::take(&mut self.read_locks);
let start = Instant::now();
loop {
if self.release_all(tolerance, &mut locks, is_read_lock).await {
@@ -249,7 +249,7 @@ impl DRWMutex {
}
}
check_failed_unlocks(&locks, tolerance)
!check_failed_unlocks(&locks, tolerance)
}
}

View File

@@ -1,3 +1,5 @@
#![allow(dead_code)]
use std::sync::Arc;
use async_trait::async_trait;
@@ -5,7 +7,7 @@ use common::error::Result;
use lazy_static::lazy_static;
use local_locker::LocalLocker;
use lock_args::LockArgs;
use remote_client::RemoteClinet;
use remote_client::RemoteClient;
use tokio::sync::RwLock;
pub mod drwmutex;
@@ -37,7 +39,7 @@ pub trait Locker {
#[derive(Debug, Clone)]
pub enum LockApi {
Local,
Remote(RemoteClinet),
Remote(RemoteClient),
}
#[async_trait]
@@ -111,5 +113,5 @@ pub fn new_lock_api(is_local: bool, url: Option<url::Url>) -> LockApi {
return LockApi::Local;
}
LockApi::Remote(RemoteClinet::new(url.unwrap()))
LockApi::Remote(RemoteClient::new(url.unwrap()))
}

View File

@@ -289,7 +289,7 @@ impl Locker for LocalLocker {
// TODO: need add timeout mechanism
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
let mut reply = false;
let mut reply: bool;
if args.uid.is_empty() {
args.resources.iter().for_each(|resource| match self.lock_map.get(resource) {
Some(lris) => {

View File

@@ -34,6 +34,13 @@ pub struct NsLockMap {
}
impl NsLockMap {
pub fn new(is_dist_erasure: bool) -> Self {
Self {
is_dist_erasure,
..Default::default()
}
}
async fn lock(
&mut self,
volume: &String,

View File

@@ -7,11 +7,11 @@ use tracing::info;
use crate::{lock_args::LockArgs, Locker};
#[derive(Debug, Clone)]
pub struct RemoteClinet {
pub struct RemoteClient {
url: url::Url,
}
impl RemoteClinet {
impl RemoteClient {
pub fn new(url: url::Url) -> Self {
Self { url }
}
@@ -24,7 +24,7 @@ impl RemoteClinet {
}
#[async_trait]
impl Locker for RemoteClinet {
impl Locker for RemoteClient {
async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
info!("remote lock");
let args = serde_json::to_string(args)?;

View File

@@ -11,7 +11,9 @@ rust-version.workspace = true
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
lock.workspace = true
protos.workspace = true
serde_json.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }
tokio = { workspace = true }
url.workspace = true

View File

@@ -0,0 +1,66 @@
#![cfg(test)]
use std::{error::Error, sync::Arc, time::Duration, vec};
use lock::{
drwmutex::Options,
lock_args::LockArgs,
namespace_lock::{new_nslock, NsLockMap},
new_lock_api,
};
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest};
use tokio::sync::RwLock;
use tonic::Request;
async fn get_client() -> Result<NodeServiceClient<tonic::transport::Channel>, Box<dyn Error>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
Ok(NodeServiceClient::connect("http://localhost:9000").await?)
}
#[tokio::test]
async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
let args = LockArgs {
uid: "1111".to_string(),
resources: vec!["dandan".to_string()],
owner: "dd".to_string(),
source: "".to_string(),
quorum: 3,
};
let args = serde_json::to_string(&args)?;
let mut client = get_client().await?;
let request = Request::new(GenerallyLockRequest { args });
let response = client.lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
assert!(false, "can not get lock: {}", error_info);
}
Ok(())
}
#[tokio::test]
async fn test_lock_unlock_ns_lock() -> Result<(), Box<dyn Error>> {
let url = url::Url::parse("http://127.0.0.1:9000/data")?;
let locker = new_lock_api(false, Some(url));
let ns_mutex = Arc::new(RwLock::new(NsLockMap::new(true)));
let mut ns = new_nslock(
Arc::clone(&ns_mutex),
"local".to_string(),
"dandan".to_string(),
vec!["foo".to_string()],
vec![locker],
)
.await;
assert_eq!(
ns.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await
.unwrap(),
true
);
ns.un_lock().await.unwrap();
Ok(())
}

View File

@@ -1 +1,2 @@
mod lock;
mod node_interact_test;

View File

@@ -54,11 +54,15 @@ impl Sets {
let mut lockers: Vec<Vec<LockApi>> = vec![vec![]; set_count];
endpoints.endpoints.as_ref().iter().enumerate().for_each(|(idx, endpoint)| {
let set_idx = idx / set_drive_count;
if !unique[set_idx].contains(&endpoint.url.host_str().unwrap().to_string()) {
if endpoint.is_local {
unique[set_idx].push(endpoint.url.host_str().unwrap().to_string());
lockers[set_idx].push(new_lock_api(true, None));
} else {
if endpoint.is_local && !unique[set_idx].contains(&"local".to_string()) {
unique[set_idx].push("local".to_string());
lockers[set_idx].push(new_lock_api(true, None));
}
if !endpoint.is_local {
let host_port = format!("{}:{}", endpoint.url.host_str().unwrap(), endpoint.url.port().unwrap().to_string());
if !unique[set_idx].contains(&host_port) {
unique[set_idx].push(host_port);
lockers[set_idx].push(new_lock_api(false, Some(endpoint.url.clone())));
}
}
@@ -103,7 +107,7 @@ impl Sets {
disks: set_drive,
lockers: lockers[i].clone(),
locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(),
ns_mutex: Arc::new(RwLock::new(NsLockMap::default())),
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))),
set_drive_count,
parity_count: partiy_count,
set_index: i,

View File

@@ -814,9 +814,11 @@ async fn init_local_peer(endpoint_pools: &EndpointServerPools, host: &String, po
if peer_set.is_empty() {
if !host.is_empty() {
*GLOBAL_Local_Node_Name.write().await = format!("{}:{}", host, port);
return;
}
*GLOBAL_Local_Node_Name.write().await = format!("127.0.0.1:{}", port);
return;
}
*GLOBAL_Local_Node_Name.write().await = peer_set[0].clone();