add base lock

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-09-16 13:08:27 +08:00
parent c25cf508ff
commit 5e70d8b841
8 changed files with 1542 additions and 359 deletions

8
Cargo.lock generated
View File

@@ -945,6 +945,14 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock"
version = "0.0.1"
dependencies = [
"ecstore",
"tracing",
]
[[package]]
name = "lock_api"
version = "0.4.12"

View File

@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["rustfs", "ecstore", "e2e_test", "common/protos"]
members = ["rustfs", "ecstore", "e2e_test", "common/lock", "common/protos"]
[workspace.package]
edition = "2021"

8
common/lock/Cargo.toml Normal file
View File

@@ -0,0 +1,8 @@
[package]
name = "lock"
version.workspace = true
edition.workspace = true
[dependencies]
ecstore.workspace = true
tracing.workspace = true

2
common/lock/src/lib.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod local_disk;
pub mod lock_args;

View File

@@ -0,0 +1,420 @@
use ecstore::error::{Error, Result};
use std::{collections::HashMap, time::{Duration, Instant}};
use crate::lock_args::LockArgs;
const MAX_DELETE_LIST: usize = 1000;
#[derive(Clone, Debug)]
struct LockRequesterInfo {
name: String,
writer: bool,
uid: String,
time_stamp: Instant,
time_last_refresh: Instant,
source: String,
group: bool,
owner: String,
quorum: usize,
idx: usize,
}
impl Default for LockRequesterInfo {
fn default() -> Self {
Self {
name: Default::default(),
writer: Default::default(),
uid: Default::default(),
time_stamp: Instant::now(),
time_last_refresh: Instant::now(),
source: Default::default(),
group: Default::default(),
owner: Default::default(),
quorum: Default::default(),
idx: Default::default(),
}
}
}
fn is_write_lock(lri: &[LockRequesterInfo]) -> bool {
lri.len() == 1 && lri[0].writer
}
#[derive(Debug, Default)]
pub struct LockStats {
total: usize,
writes: usize,
reads: usize,
}
#[derive(Debug, Default)]
pub struct LocalLocker {
lock_map: HashMap<String, Vec<LockRequesterInfo>>,
lock_uid: HashMap<String, String>,
}
impl LocalLocker {
fn new() -> Self {
LocalLocker::default()
}
}
impl LocalLocker {
fn can_take_lock(&self, resource: &[String]) -> bool {
resource.iter().fold(true, |acc, x| !self.lock_map.contains_key(x) && acc)
}
pub fn lock(&mut self, args: LockArgs) -> Result<bool> {
if args.resources.len() > MAX_DELETE_LIST {
return Err(Error::from_string(format!(
"internal error: LocalLocker.lock called with more than {} resources",
MAX_DELETE_LIST
)));
}
if !self.can_take_lock(&args.resources) {
return Ok(false);
}
args.resources.iter().enumerate().for_each(|(idx, resource)| {
self.lock_map.insert(
resource.to_string(),
vec![LockRequesterInfo {
name: resource.to_string(),
writer: true,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
group: args.resources.len() > 1,
quorum: args.quorum,
idx,
..Default::default()
}],
);
let mut uuid = args.uid.to_string();
format_uuid(&mut uuid, &idx);
self.lock_uid.insert(uuid, resource.to_string());
});
Ok(true)
}
pub fn unlock(&mut self, args: LockArgs) -> Result<bool> {
if args.resources.len() > MAX_DELETE_LIST {
return Err(Error::from_string(format!(
"internal error: LocalLocker.unlock called with more than {} resources",
MAX_DELETE_LIST
)));
}
let mut reply = false;
let mut err_info = String::new();
for resource in args.resources.iter() {
match self.lock_map.get_mut(resource) {
Some(lris) => {
if !is_write_lock(&lris) {
if err_info.is_empty() {
err_info = String::from(format!("unlock attempted on a read locked entity: {}", resource));
} else {
err_info.push_str(&format!(", {}", resource));
}
} else {
lris.retain(|lri| {
if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
let mut key = args.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key).unwrap();
reply |= true;
return false;
}
true
});
}
if lris.len() == 0 {
self.lock_map.remove(resource);
}
},
None => {
continue;
}
};
};
Ok(reply)
}
pub fn rlock(&mut self, args: LockArgs) -> Result<bool> {
if args.resources.len() != 1 {
return Err(Error::from_string("internal error: localLocker.RLock called with more than one resource"));
}
let resource = &args.resources[0];
match self.lock_map.get_mut(resource) {
Some(lri) => {
if !is_write_lock(lri) {
lri.push(LockRequesterInfo {
name: resource.to_string(),
writer: false,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
quorum: args.quorum,
..Default::default()
});
} else {
return Ok(false);
}
},
None => {
self.lock_map.insert(resource.to_string(), vec![LockRequesterInfo {
name: resource.to_string(),
writer: false,
source: args.source.to_string(),
owner: args.owner.to_string(),
uid: args.uid.to_string(),
quorum: args.quorum,
..Default::default()
}]);
}
}
let mut uuid = args.uid.to_string();
format_uuid(&mut uuid, &0);
self.lock_uid.insert(uuid, resource.to_string());
Ok(true)
}
pub fn runlock(&mut self, args: LockArgs) -> Result<bool> {
if args.resources.len() != 1 {
return Err(Error::from_string("internal error: localLocker.RLock called with more than one resource"));
}
let mut reply = false;
let resource = &args.resources[0];
match self.lock_map.get_mut(resource) {
Some(lris) => {
if is_write_lock(&lris) {
return Err(Error::from_string(format!("runlock attempted on a write locked entity: {}", resource)));
} else {
lris.retain(|lri| {
if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
let mut key = args.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key).unwrap();
reply |= true;
return false;
}
true
});
}
if lris.len() == 0 {
self.lock_map.remove(resource);
}
},
None => {
return Ok(reply || true);
}
};
Ok(reply)
}
pub fn stats(&self) -> LockStats {
let mut st = LockStats {
total: self.lock_map.len(),
..Default::default()
};
self.lock_map.iter().for_each(|(_, value)| {
if value.len() > 0 {
if value[0].writer {
st.writes += 1;
} else {
st.reads += 1;
}
}
});
return st;
}
pub fn dump_lock_map(&mut self) -> HashMap<String, Vec<LockRequesterInfo>> {
let mut lock_copy = HashMap::new();
self.lock_map.iter().for_each(|(key, value)| {
lock_copy.insert(key.to_string(), value.to_vec());
});
return lock_copy;
}
pub fn close(&self) {
}
pub fn is_online(&self) ->bool {
true
}
pub fn is_local(&self) -> bool {
true
}
// TODO: need add timeout mechanism
pub fn force_unlock(&mut self, args: LockArgs) -> Result<bool> {
let mut reply = false;
if args.uid.is_empty() {
args.resources.iter().for_each(|resource| {
match self.lock_map.get(resource) {
Some(lris) => {
lris.iter().for_each(|lri| {
let mut key = lri.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key);
});
if lris.len() == 0 {
self.lock_map.remove(resource);
}
},
None => (),
}
});
return Ok(true);
}
let mut idx = 0;
let mut need_remove_resource = Vec::new();
let mut need_remove_map_id = Vec::new();
loop {
let mut map_id = args.uid.to_string();
format_uuid(&mut map_id, &idx);
match self.lock_uid.get(&map_id) {
Some(resource) => {
match self.lock_map.get_mut(resource) {
Some(lris) => {
reply = true;
{
lris.retain(|lri| {
if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
let mut key = args.uid.to_string();
format_uuid(&mut key, &lri.idx);
need_remove_map_id.push(key);
return false;
}
true
});
}
idx += 1;
if lris.len() == 0 {
need_remove_resource.push(resource.to_string());
}
},
None => {
need_remove_map_id.push(map_id);
idx += 1;
continue;
}
}
},
None => {
reply = idx > 0;
break;
}
}
}
need_remove_resource.into_iter().for_each(|resource| {
self.lock_map.remove(&resource);
});
need_remove_map_id.into_iter().for_each(|map_id| {
self.lock_uid.remove(&map_id);
});
Ok(reply)
}
pub fn refresh(&mut self, args: LockArgs) -> Result<bool> {
let mut idx = 0;
let mut key = args.uid.to_string();
format_uuid(&mut key, &idx);
match self.lock_uid.get(&key) {
Some(resource) => {
let mut resource = resource;
loop {
match self.lock_map.get_mut(resource) {
Some(lris) => {
},
None => {
let mut key = args.uid.to_string();
format_uuid(&mut key, &0);
self.lock_uid.remove(&key);
return Ok(idx > 0);
}
}
idx += 1;
let mut key = args.uid.to_string();
format_uuid(&mut key, &idx);
resource = match self.lock_uid.get(&key) {
Some(resource) => resource,
None => return Ok(true),
};
}
},
None => {
return Ok(false);
}
}
}
fn expire_old_locks(&mut self, interval: Duration) {
self.lock_map.iter_mut().for_each(|(_, lris)| {
lris.retain(|lri| {
if Instant::now().duration_since(lri.time_last_refresh) > interval {
let mut key = lri.uid.to_string();
format_uuid(&mut key, &lri.idx);
self.lock_uid.remove(&key);
return false;
}
true
});
});
return;
}
}
fn format_uuid(s: &mut String, idx: &usize) {
s.push_str(&idx.to_string());
}
#[cfg(test)]
mod test {
use crate::lock_args::LockArgs;
use ecstore::error::Result;
use super::LocalLocker;
#[test]
fn test_lock_unlock() -> Result<()> {
let mut local_locker = LocalLocker::new();
let args = LockArgs {
uid: "1111".to_string(),
resources: vec!["dandan".to_string()],
owner: "dd".to_string(),
source: "".to_string(),
quorum: 3,
};
local_locker.lock(args.clone())?;
println!("lock local_locker: {:?} \n", local_locker);
local_locker.unlock(args)?;
println!("unlock local_locker: {:?}", local_locker);
Ok(())
}
}

View File

@@ -0,0 +1,8 @@
#[derive(Clone, Debug, Default)]
pub struct LockArgs {
pub uid: String,
pub resources: Vec<String>,
pub owner: String,
pub source: String,
pub quorum: usize,
}

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff