mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
@@ -33,8 +33,8 @@ impl Granted {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_locked(uid: &String) -> bool {
|
||||
uid.len() > 0
|
||||
fn is_locked(uid: &str) -> bool {
|
||||
!uid.is_empty()
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -59,6 +59,14 @@ impl DRWMutex {
|
||||
lock_retry_min_interval: LOCK_RETRY_MIN_INTERVAL,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_locked(&self) -> bool {
|
||||
self.write_locks.iter().any(|w_lock| is_locked(w_lock))
|
||||
}
|
||||
|
||||
fn is_r_locked(&self) -> bool {
|
||||
self.read_locks.iter().any(|r_lock| is_locked(r_lock))
|
||||
}
|
||||
}
|
||||
|
||||
impl DRWMutex {
|
||||
@@ -134,7 +142,7 @@ impl DRWMutex {
|
||||
|
||||
async fn inner_lock(
|
||||
&mut self,
|
||||
locks: &mut Vec<String>,
|
||||
locks: &mut [String],
|
||||
id: &String,
|
||||
source: &String,
|
||||
is_read_lock: bool,
|
||||
@@ -201,7 +209,7 @@ impl DRWMutex {
|
||||
}
|
||||
|
||||
pub async fn un_lock(&mut self) {
|
||||
if self.write_locks.is_empty() || !self.write_locks.iter().any(|w_lock| is_locked(w_lock)) {
|
||||
if self.write_locks.is_empty() || !self.is_locked() {
|
||||
panic!("Trying to un_lock() while no lock() is active, write_locks: {:?}", self.write_locks)
|
||||
}
|
||||
|
||||
@@ -222,7 +230,7 @@ impl DRWMutex {
|
||||
}
|
||||
|
||||
pub async fn un_r_lock(&mut self) {
|
||||
if self.read_locks.is_empty() || !self.read_locks.iter().any(|r_lock| is_locked(r_lock)) {
|
||||
if self.read_locks.is_empty() || !self.is_r_locked() {
|
||||
panic!("Trying to un_r_lock() while no r_lock() is active, read_locks: {:?}", self.read_locks)
|
||||
}
|
||||
|
||||
@@ -242,14 +250,14 @@ impl DRWMutex {
|
||||
}
|
||||
}
|
||||
|
||||
async fn release_all(&mut self, tolerance: usize, locks: &mut Vec<String>, is_read_lock: bool) -> bool {
|
||||
async fn release_all(&mut self, tolerance: usize, locks: &mut [String], is_read_lock: bool) -> bool {
|
||||
for (index, locker) in self.lockers.iter_mut().enumerate() {
|
||||
if send_release(locker, &locks[index], &self.owner, &self.names, is_read_lock).await {
|
||||
locks[index] = "".to_string();
|
||||
}
|
||||
}
|
||||
|
||||
!check_failed_unlocks(&locks, tolerance)
|
||||
!check_failed_unlocks(locks, tolerance)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -277,7 +285,7 @@ impl DRWMutex {
|
||||
// });
|
||||
// }
|
||||
|
||||
fn check_failed_unlocks(locks: &Vec<String>, tolerance: usize) -> bool {
|
||||
fn check_failed_unlocks(locks: &[String], tolerance: usize) -> bool {
|
||||
let mut un_locks_failed = 0;
|
||||
locks.iter().for_each(|lock| {
|
||||
if is_locked(lock) {
|
||||
@@ -292,15 +300,15 @@ fn check_failed_unlocks(locks: &Vec<String>, tolerance: usize) -> bool {
|
||||
un_locks_failed > tolerance
|
||||
}
|
||||
|
||||
async fn send_release(locker: &mut LockApi, uid: &String, owner: &String, names: &Vec<String>, is_read_lock: bool) -> bool {
|
||||
async fn send_release(locker: &mut LockApi, uid: &String, owner: &str, names: &[String], is_read_lock: bool) -> bool {
|
||||
if uid.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let args = LockArgs {
|
||||
uid: uid.to_string(),
|
||||
owner: owner.clone(),
|
||||
resources: names.clone(),
|
||||
owner: owner.to_owned(),
|
||||
resources: names.to_owned(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -335,7 +343,7 @@ async fn send_release(locker: &mut LockApi, uid: &String, owner: &String, names:
|
||||
true
|
||||
}
|
||||
|
||||
fn check_quorum_locked(locks: &Vec<String>, quorum: usize) -> bool {
|
||||
fn check_quorum_locked(locks: &[String], quorum: usize) -> bool {
|
||||
let mut count = 0;
|
||||
locks.iter().for_each(|lock| {
|
||||
if is_locked(lock) {
|
||||
|
||||
@@ -18,7 +18,7 @@ pub mod namespace_lock;
|
||||
pub mod remote_client;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_LOCAL_SERVER: Arc<Box<RwLock<LocalLocker>>> = Arc::new(Box::new(RwLock::new(LocalLocker::new())));
|
||||
pub static ref GLOBAL_LOCAL_SERVER: Arc<RwLock<LocalLocker>> = Arc::new(RwLock::new(LocalLocker::new()));
|
||||
}
|
||||
|
||||
type LockClient = dyn Locker;
|
||||
|
||||
@@ -75,7 +75,7 @@ impl LocalLocker {
|
||||
};
|
||||
|
||||
self.lock_map.iter().for_each(|(_, value)| {
|
||||
if value.len() > 0 {
|
||||
if !value.is_empty() {
|
||||
if value[0].writer {
|
||||
st.writes += 1;
|
||||
} else {
|
||||
@@ -84,7 +84,7 @@ impl LocalLocker {
|
||||
}
|
||||
});
|
||||
|
||||
return st;
|
||||
st
|
||||
}
|
||||
|
||||
fn dump_lock_map(&mut self) -> HashMap<String, Vec<LockRequesterInfo>> {
|
||||
@@ -93,7 +93,7 @@ impl LocalLocker {
|
||||
lock_copy.insert(key.to_string(), value.to_vec());
|
||||
});
|
||||
|
||||
return lock_copy;
|
||||
lock_copy
|
||||
}
|
||||
|
||||
fn expire_old_locks(&mut self, interval: Duration) {
|
||||
@@ -109,8 +109,6 @@ impl LocalLocker {
|
||||
true
|
||||
});
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,9 +163,9 @@ impl Locker for LocalLocker {
|
||||
for resource in args.resources.iter() {
|
||||
match self.lock_map.get_mut(resource) {
|
||||
Some(lris) => {
|
||||
if !is_write_lock(&lris) {
|
||||
if !is_write_lock(lris) {
|
||||
if err_info.is_empty() {
|
||||
err_info = String::from(format!("unlock attempted on a read locked entity: {}", resource));
|
||||
err_info = format!("unlock attempted on a read locked entity: {}", resource);
|
||||
} else {
|
||||
err_info.push_str(&format!(", {}", resource));
|
||||
}
|
||||
@@ -184,7 +182,7 @@ impl Locker for LocalLocker {
|
||||
true
|
||||
});
|
||||
}
|
||||
if lris.len() == 0 {
|
||||
if lris.is_empty() {
|
||||
self.lock_map.remove(resource);
|
||||
}
|
||||
}
|
||||
@@ -250,7 +248,7 @@ impl Locker for LocalLocker {
|
||||
let resource = &args.resources[0];
|
||||
match self.lock_map.get_mut(resource) {
|
||||
Some(lris) => {
|
||||
if is_write_lock(&lris) {
|
||||
if is_write_lock(lris) {
|
||||
return Err(Error::from_string(format!("runlock attempted on a write locked entity: {}", resource)));
|
||||
} else {
|
||||
lris.retain(|lri| {
|
||||
@@ -265,12 +263,12 @@ impl Locker for LocalLocker {
|
||||
true
|
||||
});
|
||||
}
|
||||
if lris.len() == 0 {
|
||||
if lris.is_empty() {
|
||||
self.lock_map.remove(resource);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
return Ok(reply || true);
|
||||
return Ok(reply);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -291,18 +289,17 @@ impl Locker for LocalLocker {
|
||||
async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
|
||||
let mut reply: bool;
|
||||
if args.uid.is_empty() {
|
||||
args.resources.iter().for_each(|resource| match self.lock_map.get(resource) {
|
||||
Some(lris) => {
|
||||
args.resources.iter().for_each(|resource| {
|
||||
if let Some(lris) = self.lock_map.get(resource) {
|
||||
lris.iter().for_each(|lri| {
|
||||
let mut key = lri.uid.to_string();
|
||||
format_uuid(&mut key, &lri.idx);
|
||||
self.lock_uid.remove(&key);
|
||||
});
|
||||
if lris.len() == 0 {
|
||||
if lris.is_empty() {
|
||||
self.lock_map.remove(resource);
|
||||
}
|
||||
}
|
||||
None => (),
|
||||
});
|
||||
|
||||
return Ok(true);
|
||||
@@ -330,7 +327,7 @@ impl Locker for LocalLocker {
|
||||
});
|
||||
}
|
||||
idx += 1;
|
||||
if lris.len() == 0 {
|
||||
if lris.is_empty() {
|
||||
need_remove_resource.push(resource.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,13 +17,11 @@ impl LRWMutex {
|
||||
let id = self.id.read().await.clone();
|
||||
let source = self.source.read().await.clone();
|
||||
let timeout = Duration::from_secs(10000);
|
||||
let x = self
|
||||
.look_loop(
|
||||
&id, &source, &timeout, // big enough
|
||||
is_write,
|
||||
)
|
||||
.await;
|
||||
x
|
||||
self.look_loop(
|
||||
&id, &source, &timeout, // big enough
|
||||
is_write,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_lock(&self, id: &str, source: &str, timeout: &Duration) -> bool {
|
||||
@@ -36,13 +34,11 @@ impl LRWMutex {
|
||||
let id = self.id.read().await.clone();
|
||||
let source = self.source.read().await.clone();
|
||||
let timeout = Duration::from_secs(10000);
|
||||
let x = self
|
||||
.look_loop(
|
||||
&id, &source, &timeout, // big enough
|
||||
is_write,
|
||||
)
|
||||
.await;
|
||||
x
|
||||
self.look_loop(
|
||||
&id, &source, &timeout, // big enough
|
||||
is_write,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_r_lock(&self, id: &str, source: &str, timeout: &Duration) -> bool {
|
||||
@@ -61,11 +57,9 @@ impl LRWMutex {
|
||||
*self.is_write.write().await = true;
|
||||
locked = true;
|
||||
}
|
||||
} else {
|
||||
if !*self.is_write.read().await {
|
||||
*self.refrence.write().await += 1;
|
||||
locked = true;
|
||||
}
|
||||
} else if !*self.is_write.read().await {
|
||||
*self.refrence.write().await += 1;
|
||||
locked = true;
|
||||
}
|
||||
|
||||
locked
|
||||
@@ -112,13 +106,9 @@ impl LRWMutex {
|
||||
*self.is_write.write().await = false;
|
||||
unlocked = true;
|
||||
}
|
||||
} else {
|
||||
if !*self.is_write.read().await {
|
||||
if *self.refrence.read().await > 0 {
|
||||
*self.refrence.write().await -= 1;
|
||||
unlocked = true;
|
||||
}
|
||||
}
|
||||
} else if !*self.is_write.read().await && *self.refrence.read().await > 0 {
|
||||
*self.refrence.write().await -= 1;
|
||||
unlocked = true;
|
||||
}
|
||||
|
||||
unlocked
|
||||
|
||||
@@ -45,8 +45,8 @@ impl NsLockMap {
|
||||
&mut self,
|
||||
volume: &String,
|
||||
path: &String,
|
||||
lock_source: &String,
|
||||
ops_id: &String,
|
||||
lock_source: &str,
|
||||
ops_id: &str,
|
||||
read_lock: bool,
|
||||
timeout: Duration,
|
||||
) -> bool {
|
||||
@@ -58,12 +58,11 @@ impl NsLockMap {
|
||||
});
|
||||
nslk.reference += 1;
|
||||
|
||||
let locked: bool;
|
||||
if read_lock {
|
||||
locked = nslk.lock.get_r_lock(ops_id, lock_source, &timeout).await;
|
||||
let locked = if read_lock {
|
||||
nslk.lock.get_r_lock(ops_id, lock_source, &timeout).await
|
||||
} else {
|
||||
locked = nslk.lock.get_lock(ops_id, lock_source, &timeout).await;
|
||||
}
|
||||
nslk.lock.get_lock(ops_id, lock_source, &timeout).await
|
||||
};
|
||||
|
||||
if !locked {
|
||||
nslk.reference -= 1;
|
||||
@@ -72,7 +71,7 @@ impl NsLockMap {
|
||||
}
|
||||
}
|
||||
|
||||
return locked;
|
||||
locked
|
||||
}
|
||||
|
||||
async fn un_lock(&mut self, volume: &String, path: &String, read_lock: bool) {
|
||||
@@ -90,8 +89,6 @@ impl NsLockMap {
|
||||
if nslk.reference == 0 {
|
||||
w_lock_map.remove(&resource);
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -138,7 +135,8 @@ impl RWLocker for DistLockInstance {
|
||||
}
|
||||
|
||||
async fn un_lock(&mut self) -> Result<()> {
|
||||
Ok(self.lock.un_lock().await)
|
||||
self.lock.un_lock().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_u_lock(&mut self, opts: &Options) -> Result<bool> {
|
||||
@@ -148,7 +146,8 @@ impl RWLocker for DistLockInstance {
|
||||
}
|
||||
|
||||
async fn un_r_lock(&mut self) -> Result<()> {
|
||||
Ok(self.lock.un_r_lock().await)
|
||||
self.lock.un_r_lock().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ impl Locker for RemoteClient {
|
||||
let args = serde_json::to_string(args)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GenerallyLockRequest { args });
|
||||
|
||||
let response = client.lock(request).await?.into_inner();
|
||||
@@ -42,7 +42,7 @@ impl Locker for RemoteClient {
|
||||
let args = serde_json::to_string(args)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GenerallyLockRequest { args });
|
||||
|
||||
let response = client.un_lock(request).await?.into_inner();
|
||||
@@ -59,7 +59,7 @@ impl Locker for RemoteClient {
|
||||
let args = serde_json::to_string(args)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GenerallyLockRequest { args });
|
||||
|
||||
let response = client.r_lock(request).await?.into_inner();
|
||||
@@ -76,7 +76,7 @@ impl Locker for RemoteClient {
|
||||
let args = serde_json::to_string(args)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GenerallyLockRequest { args });
|
||||
|
||||
let response = client.r_un_lock(request).await?.into_inner();
|
||||
@@ -93,7 +93,7 @@ impl Locker for RemoteClient {
|
||||
let args = serde_json::to_string(args)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GenerallyLockRequest { args });
|
||||
|
||||
let response = client.force_un_lock(request).await?.into_inner();
|
||||
@@ -110,7 +110,7 @@ impl Locker for RemoteClient {
|
||||
let args = serde_json::to_string(args)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GenerallyLockRequest { args });
|
||||
|
||||
let response = client.refresh(request).await?.into_inner();
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// This file is @generated by prost-build.
|
||||
/// --------------------------------------------------------------------
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct PingRequest {
|
||||
#[prost(uint64, tag = "1")]
|
||||
@@ -8,7 +7,6 @@ pub struct PingRequest {
|
||||
#[prost(bytes = "vec", tag = "2")]
|
||||
pub body: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct PingResponse {
|
||||
#[prost(uint64, tag = "1")]
|
||||
@@ -16,13 +14,11 @@ pub struct PingResponse {
|
||||
#[prost(bytes = "vec", tag = "2")]
|
||||
pub body: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListBucketRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub options: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListBucketResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -32,7 +28,6 @@ pub struct ListBucketResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MakeBucketRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -40,7 +35,6 @@ pub struct MakeBucketRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub options: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MakeBucketResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -48,7 +42,6 @@ pub struct MakeBucketResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GetBucketInfoRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -56,7 +49,6 @@ pub struct GetBucketInfoRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub options: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GetBucketInfoResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -66,13 +58,11 @@ pub struct GetBucketInfoResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteBucketRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub bucket: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteBucketResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -80,7 +70,6 @@ pub struct DeleteBucketResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadAllRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -91,7 +80,6 @@ pub struct ReadAllRequest {
|
||||
#[prost(string, tag = "3")]
|
||||
pub path: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadAllResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -101,7 +89,6 @@ pub struct ReadAllResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WriteAllRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -114,7 +101,6 @@ pub struct WriteAllRequest {
|
||||
#[prost(bytes = "vec", tag = "4")]
|
||||
pub data: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WriteAllResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -122,7 +108,6 @@ pub struct WriteAllResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -135,7 +120,6 @@ pub struct DeleteRequest {
|
||||
#[prost(string, tag = "4")]
|
||||
pub options: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -143,7 +127,6 @@ pub struct DeleteResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct RenamePartRequst {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -159,7 +142,6 @@ pub struct RenamePartRequst {
|
||||
#[prost(bytes = "vec", tag = "6")]
|
||||
pub meta: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct RenamePartResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -167,7 +149,6 @@ pub struct RenamePartResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct RenameFileRequst {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -181,7 +162,6 @@ pub struct RenameFileRequst {
|
||||
#[prost(string, tag = "5")]
|
||||
pub dst_path: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct RenameFileResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -189,7 +169,6 @@ pub struct RenameFileResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WriteRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -204,7 +183,6 @@ pub struct WriteRequest {
|
||||
#[prost(bytes = "vec", tag = "5")]
|
||||
pub data: ::prost::alloc::vec::Vec<u8>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WriteResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -212,7 +190,6 @@ pub struct WriteResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadAtRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -227,7 +204,6 @@ pub struct ReadAtRequest {
|
||||
#[prost(int64, tag = "5")]
|
||||
pub length: i64,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadAtResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -239,7 +215,6 @@ pub struct ReadAtResponse {
|
||||
#[prost(string, optional, tag = "4")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListDirRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -248,7 +223,6 @@ pub struct ListDirRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub volume: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListDirResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -258,7 +232,6 @@ pub struct ListDirResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WalkDirRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -267,7 +240,6 @@ pub struct WalkDirRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub walk_dir_options: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WalkDirResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -277,7 +249,6 @@ pub struct WalkDirResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct RenameDataRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -294,7 +265,6 @@ pub struct RenameDataRequest {
|
||||
#[prost(string, tag = "6")]
|
||||
pub dst_path: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct RenameDataResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -304,7 +274,6 @@ pub struct RenameDataResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MakeVolumesRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -313,7 +282,6 @@ pub struct MakeVolumesRequest {
|
||||
#[prost(string, repeated, tag = "2")]
|
||||
pub volumes: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MakeVolumesResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -321,7 +289,6 @@ pub struct MakeVolumesResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MakeVolumeRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -330,7 +297,6 @@ pub struct MakeVolumeRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub volume: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MakeVolumeResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -338,14 +304,12 @@ pub struct MakeVolumeResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListVolumesRequest {
|
||||
/// indicate which one in the disks
|
||||
#[prost(string, tag = "1")]
|
||||
pub disk: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListVolumesResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -355,7 +319,6 @@ pub struct ListVolumesResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct StatVolumeRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -364,7 +327,6 @@ pub struct StatVolumeRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub volume: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct StatVolumeResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -374,7 +336,6 @@ pub struct StatVolumeResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeletePathsRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -384,7 +345,6 @@ pub struct DeletePathsRequest {
|
||||
#[prost(string, repeated, tag = "3")]
|
||||
pub paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeletePathsResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -392,7 +352,6 @@ pub struct DeletePathsResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct UpdateMetadataRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -406,7 +365,6 @@ pub struct UpdateMetadataRequest {
|
||||
#[prost(string, tag = "5")]
|
||||
pub opts: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct UpdateMetadataResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -414,7 +372,6 @@ pub struct UpdateMetadataResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WriteMetadataRequest {
|
||||
/// indicate which one in the disks
|
||||
@@ -427,7 +384,6 @@ pub struct WriteMetadataRequest {
|
||||
#[prost(string, tag = "4")]
|
||||
pub file_info: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct WriteMetadataResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -435,7 +391,6 @@ pub struct WriteMetadataResponse {
|
||||
#[prost(string, optional, tag = "2")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadVersionRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -449,7 +404,6 @@ pub struct ReadVersionRequest {
|
||||
#[prost(string, tag = "5")]
|
||||
pub opts: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadVersionResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -459,7 +413,6 @@ pub struct ReadVersionResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadXlRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -471,7 +424,6 @@ pub struct ReadXlRequest {
|
||||
#[prost(bool, tag = "4")]
|
||||
pub read_data: bool,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadXlResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -481,7 +433,6 @@ pub struct ReadXlResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteVersionRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -497,7 +448,6 @@ pub struct DeleteVersionRequest {
|
||||
#[prost(string, tag = "6")]
|
||||
pub opts: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteVersionResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -507,7 +457,6 @@ pub struct DeleteVersionResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteVersionsRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -519,7 +468,6 @@ pub struct DeleteVersionsRequest {
|
||||
#[prost(string, tag = "4")]
|
||||
pub opts: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteVersionsResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -529,7 +477,6 @@ pub struct DeleteVersionsResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadMultipleRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -537,7 +484,6 @@ pub struct ReadMultipleRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub read_multiple_req: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ReadMultipleResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -547,7 +493,6 @@ pub struct ReadMultipleResponse {
|
||||
#[prost(string, optional, tag = "3")]
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteVolumeRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
@@ -555,7 +500,6 @@ pub struct DeleteVolumeRequest {
|
||||
#[prost(string, tag = "2")]
|
||||
pub volume: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct DeleteVolumeResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
@@ -564,13 +508,11 @@ pub struct DeleteVolumeResponse {
|
||||
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
/// lock api have same argument type
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GenerallyLockRequest {
|
||||
#[prost(string, tag = "1")]
|
||||
pub args: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GenerallyLockResponse {
|
||||
#[prost(bool, tag = "1")]
|
||||
|
||||
@@ -27,8 +27,7 @@ pub async fn node_service_time_out_client(
|
||||
Some(channel) => channel.clone(),
|
||||
None => {
|
||||
let connector = Endpoint::from_shared(addr.to_string())?.connect_timeout(Duration::from_secs(60));
|
||||
let channel = connector.connect().await?;
|
||||
channel
|
||||
connector.connect().await?
|
||||
}
|
||||
};
|
||||
GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone());
|
||||
|
||||
@@ -1,55 +1,18 @@
|
||||
#![cfg(test)]
|
||||
|
||||
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration, vec};
|
||||
use std::{error::Error, sync::Arc, time::Duration};
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use lock::{
|
||||
drwmutex::Options,
|
||||
lock_args::LockArgs,
|
||||
namespace_lock::{new_nslock, NsLockMap},
|
||||
new_lock_api,
|
||||
};
|
||||
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest};
|
||||
use protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
|
||||
use tokio::sync::RwLock;
|
||||
use tonic::{
|
||||
metadata::MetadataValue,
|
||||
service::interceptor::InterceptedService,
|
||||
transport::{Channel, Endpoint},
|
||||
Request, Status,
|
||||
};
|
||||
use tonic::Request;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_Conn_Map: RwLock<HashMap<String, Channel>> = RwLock::new(HashMap::new());
|
||||
}
|
||||
|
||||
async fn get_client() -> Result<
|
||||
NodeServiceClient<
|
||||
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
|
||||
>,
|
||||
Box<dyn Error>,
|
||||
> {
|
||||
let token: MetadataValue<_> = "rustfs rpc".parse()?;
|
||||
let channel = match GLOBAL_Conn_Map.read().await.get("local") {
|
||||
Some(channel) => channel.clone(),
|
||||
None => {
|
||||
println!("get channel start");
|
||||
let connector = Endpoint::from_static("http://localhost:9000").connect_timeout(Duration::from_secs(60));
|
||||
let channel = connector.connect().await?;
|
||||
// let channel = Channel::from_static("http://localhost:9000").connect().await?;
|
||||
channel
|
||||
}
|
||||
};
|
||||
GLOBAL_Conn_Map.write().await.insert("local".to_owned(), channel.clone());
|
||||
|
||||
// let timeout_channel = Timeout::new(channel, Duration::from_secs(60));
|
||||
Ok(NodeServiceClient::with_interceptor(
|
||||
channel,
|
||||
Box::new(move |mut req: Request<()>| {
|
||||
req.metadata_mut().insert("authorization", token.clone());
|
||||
Ok(req)
|
||||
}),
|
||||
))
|
||||
}
|
||||
const CLUSTER_ADDR: &str = "http://localhost:9000";
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
|
||||
@@ -62,7 +25,7 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
|
||||
};
|
||||
let args = serde_json::to_string(&args)?;
|
||||
|
||||
let mut client = get_client().await?;
|
||||
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
|
||||
println!("got client");
|
||||
let request = Request::new(GenerallyLockRequest { args: args.clone() });
|
||||
|
||||
|
||||
@@ -3,17 +3,13 @@
|
||||
use ecstore::disk::VolumeInfo;
|
||||
use protos::{
|
||||
models::{PingBody, PingBodyBuilder},
|
||||
proto_gen::node_service::{
|
||||
node_service_client::NodeServiceClient, ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
|
||||
},
|
||||
node_service_time_out_client,
|
||||
proto_gen::node_service::{ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest},
|
||||
};
|
||||
use std::error::Error;
|
||||
use tonic::Request;
|
||||
|
||||
async fn get_client() -> Result<NodeServiceClient<tonic::transport::Channel>, Box<dyn Error>> {
|
||||
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
|
||||
Ok(NodeServiceClient::connect("http://localhost:9000").await?)
|
||||
}
|
||||
const CLUSTER_ADDR: &str = "http://localhost:9000";
|
||||
|
||||
#[tokio::test]
|
||||
async fn ping() -> Result<(), Box<dyn Error>> {
|
||||
@@ -31,7 +27,7 @@ async fn ping() -> Result<(), Box<dyn Error>> {
|
||||
assert!(decoded_payload.is_ok());
|
||||
|
||||
// 创建客户端
|
||||
let mut client = get_client().await?;
|
||||
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
|
||||
|
||||
// 构造 PingRequest
|
||||
let request = Request::new(PingRequest {
|
||||
@@ -55,7 +51,7 @@ async fn ping() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn make_volume() -> Result<(), Box<dyn Error>> {
|
||||
let mut client = get_client().await?;
|
||||
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
|
||||
let request = Request::new(MakeVolumeRequest {
|
||||
disk: "data".to_string(),
|
||||
volume: "dandan".to_string(),
|
||||
@@ -72,7 +68,7 @@ async fn make_volume() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_volumes() -> Result<(), Box<dyn Error>> {
|
||||
let mut client = get_client().await?;
|
||||
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
|
||||
let request = Request::new(ListVolumesRequest {
|
||||
disk: "data".to_string(),
|
||||
});
|
||||
@@ -90,7 +86,7 @@ async fn list_volumes() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_all() -> Result<(), Box<dyn Error>> {
|
||||
let mut client = get_client().await?;
|
||||
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
|
||||
let request = Request::new(ReadAllRequest {
|
||||
disk: "data".to_string(),
|
||||
volume: "ff".to_string(),
|
||||
|
||||
@@ -17,12 +17,12 @@ struct Common {
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
struct Queue {
|
||||
pub common: Common,
|
||||
pub arn: ARN,
|
||||
pub arn: Arn,
|
||||
}
|
||||
|
||||
// 定义ARN结构体
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
pub struct ARN {
|
||||
pub struct Arn {
|
||||
pub target_id: TargetID,
|
||||
pub region: String,
|
||||
}
|
||||
|
||||
@@ -367,7 +367,7 @@ async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketMetad
|
||||
return Err(Error::msg("invalid argument"));
|
||||
}
|
||||
|
||||
let bm = BucketMetadata::new(&bucket);
|
||||
let bm = BucketMetadata::new(bucket);
|
||||
let file_path = bm.save_file_path();
|
||||
|
||||
let data = read_config(api, &file_path).await?;
|
||||
@@ -391,7 +391,7 @@ where
|
||||
buf[1] = 0x0c; // 长度
|
||||
buf[2] = 0x05; // 时间扩展类型
|
||||
BigEndian::write_u64(&mut buf[3..], sec as u64);
|
||||
BigEndian::write_u32(&mut buf[11..], nsec as u32);
|
||||
BigEndian::write_u32(&mut buf[11..], nsec);
|
||||
s.serialize_bytes(&buf)
|
||||
}
|
||||
|
||||
|
||||
@@ -167,10 +167,11 @@ impl BucketMetadataSys {
|
||||
|
||||
if !meta.lifecycle_config_xml.is_empty() {
|
||||
let cfg = Lifecycle::unmarshal(&meta.lifecycle_config_xml)?;
|
||||
for _v in cfg.rules.iter() {
|
||||
// TODO: FIXME:
|
||||
break;
|
||||
}
|
||||
// TODO: FIXME:
|
||||
// for _v in cfg.rules.iter() {
|
||||
// break;
|
||||
// }
|
||||
if let Some(_v) = cfg.rules.first() {}
|
||||
}
|
||||
|
||||
// TODO: other lifecycle handle
|
||||
@@ -187,15 +188,15 @@ impl BucketMetadataSys {
|
||||
None => return Err(Error::msg("errServerNotInitialized")),
|
||||
};
|
||||
|
||||
if is_meta_bucketname(&bucket) {
|
||||
if is_meta_bucketname(bucket) {
|
||||
return Err(Error::msg("errInvalidArgument"));
|
||||
}
|
||||
|
||||
let mut bm = match load_bucket_metadata_parse(store, &bucket, parse).await {
|
||||
let mut bm = match load_bucket_metadata_parse(store, bucket, parse).await {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
if !is_erasure().await && !is_dist_erasure().await && DiskError::VolumeNotFound.is(&err) {
|
||||
BucketMetadata::new(&bucket)
|
||||
BucketMetadata::new(bucket)
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
@@ -238,7 +239,7 @@ impl BucketMetadataSys {
|
||||
}
|
||||
|
||||
if let Some(api) = self.api.as_ref() {
|
||||
load_bucket_metadata(&api, bucket).await
|
||||
load_bucket_metadata(api, bucket).await
|
||||
} else {
|
||||
Err(Error::msg("errBucketMetadataNotInitialized"))
|
||||
}
|
||||
@@ -248,17 +249,13 @@ impl BucketMetadataSys {
|
||||
if let Some(api) = self.api.as_ref() {
|
||||
let has_bm = {
|
||||
let map = self.metadata_map.read().await;
|
||||
if let Some(bm) = map.get(&bucket.to_string()) {
|
||||
Some(bm.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
map.get(&bucket.to_string()).cloned()
|
||||
};
|
||||
|
||||
if let Some(bm) = has_bm {
|
||||
return Ok((bm, false));
|
||||
Ok((bm, false))
|
||||
} else {
|
||||
let bm = match load_bucket_metadata(&api, bucket).await {
|
||||
let bm = match load_bucket_metadata(api, bucket).await {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
if *self.initialized.read().await {
|
||||
|
||||
@@ -16,19 +16,14 @@ use super::condition::{
|
||||
pub struct ActionSet(HashSet<Action>);
|
||||
|
||||
impl ActionSet {
|
||||
pub fn as_ref(&self) -> &HashSet<Action> {
|
||||
&self.0
|
||||
}
|
||||
pub fn is_match(&self, act: &Action) -> bool {
|
||||
for item in self.0.iter() {
|
||||
if item.is_match(act) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if item == &Action::GetObjectVersion {
|
||||
if act == &Action::GetObjectVersion {
|
||||
return true;
|
||||
}
|
||||
if item == &Action::GetObjectVersion && act == &Action::GetObjectVersion {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +35,12 @@ impl ActionSet {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<HashSet<Action>> for ActionSet {
|
||||
fn as_ref(&self) -> &HashSet<Action> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
// 定义Action枚举类型
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Default, Hash)]
|
||||
pub enum Action {
|
||||
@@ -212,10 +213,10 @@ impl Action {
|
||||
false
|
||||
}
|
||||
pub fn is_match(&self, a: &Action) -> bool {
|
||||
utils::wildcard::match_pattern(&self.clone().as_str(), &a.clone().as_str())
|
||||
utils::wildcard::match_pattern(self.clone().as_str(), a.clone().as_str())
|
||||
}
|
||||
|
||||
fn as_str(self) -> &'static str {
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Action::AbortMultipartUpload => "s3:AbortMultipartUpload",
|
||||
Action::CreateBucket => "s3:CreateBucket",
|
||||
|
||||
@@ -97,13 +97,11 @@ impl BPStatement {
|
||||
self.resources, act
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
if !self.resources.bucket_resource_exists() {
|
||||
return Err(Error::msg(format!(
|
||||
"unsupported bucket Resource found {:?} for action {:?}",
|
||||
self.resources, act
|
||||
)));
|
||||
}
|
||||
} else if !self.resources.bucket_resource_exists() {
|
||||
return Err(Error::msg(format!(
|
||||
"unsupported bucket Resource found {:?} for action {:?}",
|
||||
self.resources, act
|
||||
)));
|
||||
}
|
||||
|
||||
let key_diff = self.conditions.keys().difference(&IAMActionConditionKeyMap.lookup(act));
|
||||
@@ -129,7 +127,7 @@ impl BPStatement {
|
||||
let mut resource = args.bucket_name.clone();
|
||||
if !args.object_name.is_empty() {
|
||||
if !args.object_name.starts_with("/") {
|
||||
resource.push_str("/");
|
||||
resource.push('/');
|
||||
}
|
||||
|
||||
resource.push_str(&args.object_name);
|
||||
@@ -160,10 +158,8 @@ pub struct BucketPolicy {
|
||||
impl BucketPolicy {
|
||||
pub fn is_allowed(&self, args: &BucketPolicyArgs) -> bool {
|
||||
for statement in self.statements.iter() {
|
||||
if statement.effect == Effect::Deny {
|
||||
if !statement.is_allowed(args) {
|
||||
return false;
|
||||
}
|
||||
if statement.effect == Effect::Deny && !statement.is_allowed(args) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,10 +168,8 @@ impl BucketPolicy {
|
||||
}
|
||||
|
||||
for statement in self.statements.iter() {
|
||||
if statement.effect == Effect::Allow {
|
||||
if statement.is_allowed(args) {
|
||||
return true;
|
||||
}
|
||||
if statement.effect == Effect::Allow && statement.is_allowed(args) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,9 +190,7 @@ impl BucketPolicy {
|
||||
}
|
||||
|
||||
for statement in self.statements.iter() {
|
||||
if let Err(err) = statement.is_valid() {
|
||||
return Err(err);
|
||||
}
|
||||
statement.is_valid()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -219,7 +211,7 @@ impl BucketPolicy {
|
||||
}
|
||||
|
||||
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
|
||||
let mut p = serde_json::from_slice::<BucketPolicy>(&buf)?;
|
||||
let mut p = serde_json::from_slice::<BucketPolicy>(buf)?;
|
||||
p.drop_duplicate_statements();
|
||||
Ok(p)
|
||||
|
||||
|
||||
@@ -24,14 +24,6 @@ impl Key {
|
||||
self.name == *name
|
||||
}
|
||||
|
||||
pub fn to_string(&self) -> String {
|
||||
if !self.variable.is_empty() {
|
||||
format!("{}/{}", self.name.as_str(), self.variable)
|
||||
} else {
|
||||
self.name.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// VarName - returns variable key name, such as "${aws:username}"
|
||||
pub fn var_name(&self) -> String {
|
||||
self.name.var_name()
|
||||
@@ -93,7 +85,11 @@ impl<'de> Deserialize<'de> for Key {
|
||||
|
||||
impl fmt::Display for Key {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_string())
|
||||
if !self.variable.is_empty() {
|
||||
write!(f, "{}/{}", self.name.as_str(), self.variable)
|
||||
} else {
|
||||
write!(f, "{}", self.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,10 +16,10 @@ impl Principal {
|
||||
}
|
||||
pub fn is_match(&self, parincipal: &str) -> bool {
|
||||
for pattern in self.aws.iter() {
|
||||
if utils::wildcard::match_simple(&pattern, parincipal) {
|
||||
if utils::wildcard::match_simple(pattern, parincipal) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::{
|
||||
bucket::policy::condition::keyname::COMMOM_KEYS,
|
||||
utils::{self, wildcard},
|
||||
};
|
||||
use core::fmt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -18,12 +19,12 @@ pub enum ResourceARNType {
|
||||
ResourceARNKMS,
|
||||
}
|
||||
|
||||
impl ResourceARNType {
|
||||
pub fn to_string(&self) -> String {
|
||||
impl fmt::Display for ResourceARNType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ResourceARNType::UnknownARN => "".to_string(),
|
||||
ResourceARNType::ResourceARNS3 => RESOURCE_ARN_PREFIX.to_string(),
|
||||
ResourceARNType::ResourceARNKMS => RESOURCE_ARN_KMS_PREFIX.to_string(),
|
||||
ResourceARNType::UnknownARN => write!(f, ""),
|
||||
ResourceARNType::ResourceARNS3 => write!(f, "{}", RESOURCE_ARN_PREFIX),
|
||||
ResourceARNType::ResourceARNKMS => write!(f, "{}", RESOURCE_ARN_KMS_PREFIX),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,15 +61,11 @@ impl Resource {
|
||||
if self.rtype == ResourceARNType::UnknownARN {
|
||||
return false;
|
||||
}
|
||||
if self.is_s3() {
|
||||
if self.pattern.starts_with("/") {
|
||||
return false;
|
||||
}
|
||||
if self.is_s3() && self.pattern.starts_with("/") {
|
||||
return false;
|
||||
}
|
||||
if self.is_kms() {
|
||||
if self.pattern.as_bytes().iter().any(|&v| v == b'/' || v == b'\\' || v == b'.') {
|
||||
return false;
|
||||
}
|
||||
if self.is_kms() && self.pattern.as_bytes().iter().any(|&v| v == b'/' || v == b'\\' || v == b'.') {
|
||||
return false;
|
||||
}
|
||||
|
||||
!self.pattern.is_empty()
|
||||
@@ -90,8 +87,8 @@ impl Resource {
|
||||
if !condition_values.is_empty() {
|
||||
for key in COMMOM_KEYS.iter() {
|
||||
if let Some(vals) = condition_values.get(key.name()) {
|
||||
if let Some(v0) = vals.get(0) {
|
||||
pattern = pattern.replace(key.name(), &v0);
|
||||
if let Some(v0) = vals.first() {
|
||||
pattern = pattern.replace(key.name(), v0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -105,9 +102,11 @@ impl Resource {
|
||||
|
||||
wildcard::match_pattern(&pattern, res)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_string(&self) -> String {
|
||||
format!("{}{}", self.rtype.to_string(), self.pattern)
|
||||
impl fmt::Display for Resource {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}{}", self.rtype, self.pattern)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,15 +192,11 @@ pub struct ResourceSet(HashSet<Resource>);
|
||||
impl ResourceSet {
|
||||
pub fn validate_bucket(&self, bucket: &str) -> Result<()> {
|
||||
for res in self.0.iter() {
|
||||
if let Err(err) = res.validate_bucket(bucket) {
|
||||
return Err(err);
|
||||
}
|
||||
res.validate_bucket(bucket)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub fn as_ref(&self) -> &HashSet<Resource> {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn is_match(&self, res: &str, condition_values: &HashMap<String, Vec<String>>) -> bool {
|
||||
for item in self.0.iter() {
|
||||
if item.is_match(res, condition_values) {
|
||||
@@ -229,6 +224,12 @@ impl ResourceSet {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<HashSet<Resource>> for ResourceSet {
|
||||
fn as_ref(&self) -> &HashSet<Resource> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
// impl Serialize for ResourceSet {
|
||||
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
// where
|
||||
|
||||
@@ -16,7 +16,6 @@ impl PolicySys {
|
||||
if !BucketMetadataError::BucketPolicyNotFound.is(&err) {
|
||||
warn!("config get err {:?}", err);
|
||||
}
|
||||
()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ impl Versioning {
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
match self.status {
|
||||
State::Suspended => {
|
||||
if self.excluded_prefixes.len() > 0 {
|
||||
if !self.excluded_prefixes.is_empty() {
|
||||
return Err(Error::new(VersioningErr::ExcludedPrefixNotSupported));
|
||||
}
|
||||
}
|
||||
@@ -103,7 +103,7 @@ impl Versioning {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
true
|
||||
}
|
||||
|
||||
pub fn suspended(&self) -> bool {
|
||||
@@ -131,10 +131,10 @@ impl Versioning {
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
false
|
||||
}
|
||||
|
||||
pub fn prefixes_excluded(&self) -> bool {
|
||||
self.excluded_prefixes.len() > 0 || self.exclude_folders
|
||||
!self.excluded_prefixes.is_empty() || self.exclude_folders
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ pub async fn read_config(api: &ECStore, file: &str) -> Result<Vec<u8>> {
|
||||
async fn read_config_with_metadata(api: &ECStore, file: &str, opts: &ObjectOptions) -> Result<(Vec<u8>, ObjectInfo)> {
|
||||
let range = HTTPRangeSpec::nil();
|
||||
let h = HeaderMap::new();
|
||||
let mut rd = api.get_object_reader(RUSTFS_META_BUCKET, file, range, h, &opts).await?;
|
||||
let mut rd = api.get_object_reader(RUSTFS_META_BUCKET, file, range, h, opts).await?;
|
||||
|
||||
let data = rd.read_all().await?;
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ impl ConfigError {
|
||||
|
||||
pub fn is_not_found(err: &Error) -> bool {
|
||||
if let Some(e) = err.downcast_ref::<ConfigError>() {
|
||||
ConfigError::is_not_found(&e)
|
||||
ConfigError::is_not_found(e)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
|
||||
@@ -324,10 +324,8 @@ pub fn is_sys_err_path_not_found(e: &io::Error) -> bool {
|
||||
if no == 3 {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if no == 2 {
|
||||
return true;
|
||||
}
|
||||
} else if no == 2 {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
|
||||
@@ -206,7 +206,7 @@ impl FormatV3 {
|
||||
}
|
||||
|
||||
for j in 0..self.erasure.sets[i].len() {
|
||||
if self.erasure.sets[i][j] != self.erasure.sets[i][j] {
|
||||
if self.erasure.sets[i][j] != other.erasure.sets[i][j] {
|
||||
return Err(Error::from_string(format!(
|
||||
"UUID on positions {}:{} do not match with, expected {:?} got {:?}: (%w)",
|
||||
i,
|
||||
|
||||
@@ -105,7 +105,7 @@ impl LocalDisk {
|
||||
let disk = Self {
|
||||
root,
|
||||
endpoint: ep.clone(),
|
||||
format_path: format_path,
|
||||
format_path,
|
||||
format_info: RwLock::new(format_info),
|
||||
// // format_legacy,
|
||||
// format_file_info: Mutex::new(format_meta),
|
||||
@@ -260,35 +260,31 @@ impl LocalDisk {
|
||||
|
||||
if recursive {
|
||||
self.move_to_trash(delete_path, recursive, immediate_purge).await?;
|
||||
} else {
|
||||
if delete_path.is_dir() {
|
||||
// debug!("delete_file remove_dir {:?}", &delete_path);
|
||||
if let Err(err) = fs::remove_dir(&delete_path).await {
|
||||
// debug!("remove_dir err {:?} when {:?}", &err, &delete_path);
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => (),
|
||||
// ErrorKind::DirectoryNotEmpty => (),
|
||||
kind => {
|
||||
if kind.to_string() != "directory not empty" {
|
||||
warn!("delete_file remove_dir {:?} err {}", &delete_path, kind.to_string());
|
||||
return Err(Error::from(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// debug!("delete_file remove_dir done {:?}", &delete_path);
|
||||
} else {
|
||||
if let Err(err) = fs::remove_file(&delete_path).await {
|
||||
// debug!("remove_file err {:?} when {:?}", &err, &delete_path);
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => (),
|
||||
_ => {
|
||||
warn!("delete_file remove_file {:?} err {:?}", &delete_path, &err);
|
||||
} else if delete_path.is_dir() {
|
||||
// debug!("delete_file remove_dir {:?}", &delete_path);
|
||||
if let Err(err) = fs::remove_dir(&delete_path).await {
|
||||
// debug!("remove_dir err {:?} when {:?}", &err, &delete_path);
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => (),
|
||||
// ErrorKind::DirectoryNotEmpty => (),
|
||||
kind => {
|
||||
if kind.to_string() != "directory not empty" {
|
||||
warn!("delete_file remove_dir {:?} err {}", &delete_path, kind.to_string());
|
||||
return Err(Error::from(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// debug!("delete_file remove_dir done {:?}", &delete_path);
|
||||
} else if let Err(err) = fs::remove_file(&delete_path).await {
|
||||
// debug!("remove_file err {:?} when {:?}", &err, &delete_path);
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => (),
|
||||
_ => {
|
||||
warn!("delete_file remove_file {:?} err {:?}", &delete_path, &err);
|
||||
return Err(Error::from(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(dir_path) = delete_path.parent() {
|
||||
@@ -325,7 +321,7 @@ impl LocalDisk {
|
||||
|
||||
// FIXME: read_metadata only suport
|
||||
async fn read_metadata_with_dmtime(&self, file_path: impl AsRef<Path>) -> Result<(Vec<u8>, Option<OffsetDateTime>)> {
|
||||
check_path_length(&file_path.as_ref().to_string_lossy().to_string())?;
|
||||
check_path_length(file_path.as_ref().to_string_lossy().as_ref())?;
|
||||
|
||||
let mut f = utils::fs::open_file(file_path, O_RDONLY).await?;
|
||||
|
||||
@@ -382,9 +378,7 @@ impl LocalDisk {
|
||||
return Err(Error::new(DiskError::FileNotFound));
|
||||
} else if os_is_permission(&e) {
|
||||
return Err(Error::new(DiskError::FileAccessDenied));
|
||||
} else if is_sys_err_not_dir(&e) || is_sys_err_is_dir(&e) {
|
||||
return Err(Error::new(DiskError::FileNotFound));
|
||||
} else if is_sys_err_handle_invalid(&e) {
|
||||
} else if is_sys_err_not_dir(&e) || is_sys_err_is_dir(&e) || is_sys_err_handle_invalid(&e) {
|
||||
return Err(Error::new(DiskError::FileNotFound));
|
||||
} else if is_sys_err_io(&e) {
|
||||
return Err(Error::new(DiskError::FaultyDisk));
|
||||
@@ -444,7 +438,7 @@ impl LocalDisk {
|
||||
let data_dir = fm.delete_version(fi)?;
|
||||
warn!("删除版本号 对应data_dir {:?}", &data_dir);
|
||||
if data_dir.is_some() {
|
||||
let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap().to_string()).as_str())?;
|
||||
let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap()).as_str())?;
|
||||
self.move_to_trash(&dir_path, true, false).await?;
|
||||
}
|
||||
}
|
||||
@@ -460,7 +454,7 @@ impl LocalDisk {
|
||||
// 更新xl.meta
|
||||
let buf = fm.marshal_msg()?;
|
||||
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
|
||||
self.write_all_private(
|
||||
volume,
|
||||
@@ -475,9 +469,9 @@ impl LocalDisk {
|
||||
}
|
||||
|
||||
async fn write_all_meta(&self, volume: &str, path: &str, buf: &[u8], sync: bool) -> Result<()> {
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
let file_path = volume_dir.join(Path::new(&path));
|
||||
check_path_length(&file_path.to_string_lossy().to_string())?;
|
||||
check_path_length(file_path.to_string_lossy().as_ref())?;
|
||||
|
||||
let tmp_volume_dir = self.get_bucket_path(super::RUSTFS_META_TMP_BUCKET)?;
|
||||
let tmp_file_path = tmp_volume_dir.join(Path::new(Uuid::new_v4().to_string().as_str()));
|
||||
@@ -494,9 +488,9 @@ impl LocalDisk {
|
||||
format_info.data = data.clone();
|
||||
}
|
||||
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
|
||||
self.write_all_private(&volume, &path, &data, true, volume_dir).await?;
|
||||
self.write_all_private(volume, path, &data, true, volume_dir).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -510,9 +504,9 @@ impl LocalDisk {
|
||||
sync: bool,
|
||||
skip_parent: impl AsRef<Path>,
|
||||
) -> Result<()> {
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
let file_path = volume_dir.join(Path::new(&path));
|
||||
check_path_length(&file_path.to_string_lossy().to_string())?;
|
||||
check_path_length(file_path.to_string_lossy().as_ref())?;
|
||||
|
||||
self.write_all_internal(file_path, buf, sync, skip_parent).await
|
||||
}
|
||||
@@ -553,9 +547,7 @@ impl LocalDisk {
|
||||
let f = utils::fs::open_file(path.as_ref(), mode).await.map_err(|e| {
|
||||
if is_sys_err_io(&e) {
|
||||
Error::new(DiskError::IsNotRegular)
|
||||
} else if os_is_permission(&e) {
|
||||
Error::new(DiskError::FileAccessDenied)
|
||||
} else if is_sys_err_not_dir(&e) {
|
||||
} else if os_is_permission(&e) || is_sys_err_not_dir(&e) {
|
||||
Error::new(DiskError::FileAccessDenied)
|
||||
} else if is_sys_err_io(&e) {
|
||||
Error::new(DiskError::FaultyDisk)
|
||||
@@ -669,7 +661,7 @@ impl DiskAPI for LocalDisk {
|
||||
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
|
||||
let mut format_info = self.format_info.write().await;
|
||||
|
||||
let id = format_info.id.clone();
|
||||
let id = format_info.id;
|
||||
|
||||
if format_info.last_check_valid() {
|
||||
return Ok(id);
|
||||
@@ -732,7 +724,7 @@ impl DiskAPI for LocalDisk {
|
||||
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>> {
|
||||
if volume == super::RUSTFS_META_BUCKET && path == super::FORMAT_CONFIG_FILE {
|
||||
let format_info = self.format_info.read().await;
|
||||
if format_info.data.len() > 0 {
|
||||
if !format_info.data.is_empty() {
|
||||
return Ok(format_info.data.clone());
|
||||
}
|
||||
}
|
||||
@@ -774,18 +766,18 @@ impl DiskAPI for LocalDisk {
|
||||
utils::fs::access(&dst_volume_dir).await.map_err(map_err_not_exists)?
|
||||
}
|
||||
|
||||
let src_is_dir = has_suffix(&src_path, SLASH_SEPARATOR);
|
||||
let dst_is_dir = has_suffix(&dst_path, SLASH_SEPARATOR);
|
||||
let src_is_dir = has_suffix(src_path, SLASH_SEPARATOR);
|
||||
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR);
|
||||
|
||||
if !(src_is_dir && dst_is_dir || !src_is_dir && !dst_is_dir) {
|
||||
if !src_is_dir && dst_is_dir || src_is_dir && !dst_is_dir {
|
||||
return Err(Error::from(DiskError::FileAccessDenied));
|
||||
}
|
||||
|
||||
let src_file_path = src_volume_dir.join(Path::new(src_path));
|
||||
let dst_file_path = dst_volume_dir.join(Path::new(dst_path));
|
||||
|
||||
check_path_length(&src_file_path.to_string_lossy().to_string())?;
|
||||
check_path_length(&dst_file_path.to_string_lossy().to_string())?;
|
||||
check_path_length(src_file_path.to_string_lossy().as_ref())?;
|
||||
check_path_length(dst_file_path.to_string_lossy().as_ref())?;
|
||||
|
||||
if src_is_dir {
|
||||
let meta_op = match lstat(&src_file_path).await {
|
||||
@@ -831,7 +823,7 @@ impl DiskAPI for LocalDisk {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
if let Err(err) = self.write_all(&dst_volume, format!("{}.meta", dst_path).as_str(), meta).await {
|
||||
if let Err(err) = self.write_all(dst_volume, format!("{}.meta", dst_path).as_str(), meta).await {
|
||||
if let Some(e) = err.to_io_err() {
|
||||
return Err(os_err_to_file_err(e));
|
||||
}
|
||||
@@ -871,9 +863,9 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
let src_is_dir = has_suffix(&src_path, SLASH_SEPARATOR);
|
||||
let dst_is_dir = has_suffix(&dst_path, SLASH_SEPARATOR);
|
||||
if !(src_is_dir && dst_is_dir || !src_is_dir && !dst_is_dir) {
|
||||
let src_is_dir = has_suffix(src_path, SLASH_SEPARATOR);
|
||||
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR);
|
||||
if (dst_is_dir || src_is_dir) && (!dst_is_dir || !src_is_dir) {
|
||||
return Err(Error::from(DiskError::FileAccessDenied));
|
||||
}
|
||||
|
||||
@@ -937,7 +929,7 @@ impl DiskAPI for LocalDisk {
|
||||
// TODO: use io.reader
|
||||
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
|
||||
if !origvolume.is_empty() {
|
||||
let origvolume_dir = self.get_bucket_path(&origvolume)?;
|
||||
let origvolume_dir = self.get_bucket_path(origvolume)?;
|
||||
if !skip_access_checks(origvolume) {
|
||||
if let Err(e) = utils::fs::access(origvolume_dir).await {
|
||||
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
|
||||
@@ -945,7 +937,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
let file_path = volume_dir.join(Path::new(&path));
|
||||
check_path_length(file_path.to_string_lossy().to_string().as_str())?;
|
||||
|
||||
@@ -963,7 +955,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
// async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<File> {
|
||||
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
if !skip_access_checks(volume) {
|
||||
if let Err(e) = utils::fs::access(&volume_dir).await {
|
||||
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
|
||||
@@ -980,7 +972,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
// TODO: io verifier
|
||||
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
if !skip_access_checks(volume) {
|
||||
if let Err(e) = utils::fs::access(&volume_dir).await {
|
||||
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
|
||||
@@ -994,9 +986,7 @@ impl DiskAPI for LocalDisk {
|
||||
if let Some(e) = err.to_io_err() {
|
||||
if os_is_not_exist(&e) {
|
||||
Error::new(DiskError::FileNotFound)
|
||||
} else if os_is_permission(&e) {
|
||||
Error::new(DiskError::FileAccessDenied)
|
||||
} else if is_sys_err_not_dir(&e) {
|
||||
} else if os_is_permission(&e) || is_sys_err_not_dir(&e) {
|
||||
Error::new(DiskError::FileAccessDenied)
|
||||
} else if is_sys_err_io(&e) {
|
||||
Error::new(DiskError::FaultyDisk)
|
||||
@@ -1015,7 +1005,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result<Vec<String>> {
|
||||
if !origvolume.is_empty() {
|
||||
let origvolume_dir = self.get_bucket_path(&origvolume)?;
|
||||
let origvolume_dir = self.get_bucket_path(origvolume)?;
|
||||
if !skip_access_checks(origvolume) {
|
||||
if let Err(e) = utils::fs::access(origvolume_dir).await {
|
||||
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
|
||||
@@ -1023,13 +1013,13 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
}
|
||||
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
let dir_path_abs = volume_dir.join(Path::new(&dir_path));
|
||||
|
||||
let entries = match os::read_dir(&dir_path_abs, count).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if DiskError::FileNotFound.is(&e) && !skip_access_checks(&volume) {
|
||||
if DiskError::FileNotFound.is(&e) && !skip_access_checks(volume) {
|
||||
if let Err(e) = utils::fs::access(&volume_dir).await {
|
||||
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
|
||||
}
|
||||
@@ -1129,11 +1119,7 @@ impl DiskAPI for LocalDisk {
|
||||
let has_data_dir_path = {
|
||||
let has_data_dir = {
|
||||
if !fi.is_remote() {
|
||||
if let Some(dir) = fi.data_dir {
|
||||
Some(utils::path::retain_slash(dir.to_string().as_str()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
fi.data_dir.map(|dir| utils::path::retain_slash(dir.to_string().as_str()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -1153,8 +1139,8 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
};
|
||||
|
||||
check_path_length(&src_file_path.to_string_lossy().to_string().as_str())?;
|
||||
check_path_length(&dst_file_path.to_string_lossy().to_string().as_str())?;
|
||||
check_path_length(src_file_path.to_string_lossy().to_string().as_str())?;
|
||||
check_path_length(dst_file_path.to_string_lossy().to_string().as_str())?;
|
||||
|
||||
// 读旧xl.meta
|
||||
|
||||
@@ -1178,8 +1164,8 @@ impl DiskAPI for LocalDisk {
|
||||
let mut xlmeta = FileMeta::new();
|
||||
|
||||
if let Some(dst_buf) = has_dst_buf.as_ref() {
|
||||
if FileMeta::is_xl_format(&dst_buf) {
|
||||
if let Ok(nmeta) = FileMeta::load(&dst_buf) {
|
||||
if FileMeta::is_xl_format(dst_buf) {
|
||||
if let Ok(nmeta) = FileMeta::load(dst_buf) {
|
||||
xlmeta = nmeta
|
||||
}
|
||||
}
|
||||
@@ -1198,7 +1184,7 @@ impl DiskAPI for LocalDisk {
|
||||
if let Ok((_, ver)) = xlmeta.find_version(fi.version_id) {
|
||||
let has_data_dir = ver.get_data_dir();
|
||||
if let Some(data_dir) = has_data_dir {
|
||||
if xlmeta.shard_data_dir_count(&fi.version_id, &Some(data_dir.clone())) == 0 {
|
||||
if xlmeta.shard_data_dir_count(&fi.version_id, &Some(data_dir)) == 0 {
|
||||
// TODO: Healing
|
||||
// remove inlinedata\
|
||||
Some(data_dir)
|
||||
@@ -1221,7 +1207,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
let new_dst_buf = xlmeta.marshal_msg()?;
|
||||
|
||||
self.write_all(&src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str(), new_dst_buf)
|
||||
self.write_all(src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str(), new_dst_buf)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
if let Some(e) = err.to_io_err() {
|
||||
@@ -1235,7 +1221,7 @@ impl DiskAPI for LocalDisk {
|
||||
let no_inline = fi.data.is_none() && fi.size > 0;
|
||||
if no_inline {
|
||||
if let Err(err) = os::rename_all(&src_data_path, &dst_data_path, &skip_parent).await {
|
||||
let _ = self.delete_file(&dst_volume_dir, &dst_data_path, false, false).await;
|
||||
let _ = self.delete_file(&dst_volume_dir, dst_data_path, false, false).await;
|
||||
|
||||
return Err({
|
||||
if let Some(e) = err.to_io_err() {
|
||||
@@ -1253,7 +1239,7 @@ impl DiskAPI for LocalDisk {
|
||||
if let Some(dst_buf) = has_dst_buf {
|
||||
if let Err(err) = self
|
||||
.write_all_private(
|
||||
&dst_volume,
|
||||
dst_volume,
|
||||
format!("{}/{}/{}", &dst_path, &old_data_dir.to_string(), super::STORAGE_FORMAT_FILE).as_str(),
|
||||
&dst_buf,
|
||||
true,
|
||||
@@ -1274,7 +1260,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
if let Err(err) = os::rename_all(&src_file_path, &dst_file_path, &skip_parent).await {
|
||||
if let Some((_, dst_data_path)) = has_data_dir_path.as_ref() {
|
||||
let _ = self.delete_file(&dst_volume_dir, &dst_data_path, false, false).await;
|
||||
let _ = self.delete_file(&dst_volume_dir, dst_data_path, false, false).await;
|
||||
}
|
||||
return Err({
|
||||
if let Some(e) = err.to_io_err() {
|
||||
@@ -1339,9 +1325,7 @@ impl DiskAPI for LocalDisk {
|
||||
let mut volumes = Vec::new();
|
||||
|
||||
let entries = os::read_dir(&self.root, -1).await.map_err(|e| {
|
||||
if DiskError::FileAccessDenied.is(&e) {
|
||||
Error::new(DiskError::DiskAccessDenied)
|
||||
} else if DiskError::FileNotFound.is(&e) {
|
||||
if DiskError::FileAccessDenied.is(&e) || DiskError::FileNotFound.is(&e) {
|
||||
Error::new(DiskError::DiskAccessDenied)
|
||||
} else {
|
||||
e
|
||||
@@ -1399,7 +1383,7 @@ impl DiskAPI for LocalDisk {
|
||||
for path in paths.iter() {
|
||||
let file_path = volume_dir.join(Path::new(path));
|
||||
|
||||
check_path_length(&file_path.to_string_lossy().to_string())?;
|
||||
check_path_length(file_path.to_string_lossy().as_ref())?;
|
||||
|
||||
self.move_to_trash(&file_path, false, false).await?;
|
||||
}
|
||||
@@ -1407,14 +1391,14 @@ impl DiskAPI for LocalDisk {
|
||||
Ok(())
|
||||
}
|
||||
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: UpdateMetadataOpts) -> Result<()> {
|
||||
if let Some(_) = &fi.metadata {
|
||||
let volume_dir = self.get_bucket_path(&volume)?;
|
||||
if fi.metadata.is_some() {
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
let file_path = volume_dir.join(Path::new(&path));
|
||||
|
||||
check_path_length(&file_path.to_string_lossy().to_string())?;
|
||||
check_path_length(file_path.to_string_lossy().as_ref())?;
|
||||
|
||||
let buf = self
|
||||
.read_all(&volume, format!("{}/{}", &path, super::STORAGE_FORMAT_FILE).as_str())
|
||||
.read_all(volume, format!("{}/{}", &path, super::STORAGE_FORMAT_FILE).as_str())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if DiskError::FileNotFound.is(&e) && fi.version_id.is_some() {
|
||||
@@ -1436,7 +1420,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
return self
|
||||
.write_all_meta(
|
||||
&volume,
|
||||
volume,
|
||||
format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str(),
|
||||
&wbuf,
|
||||
!opts.no_persistence,
|
||||
@@ -1457,7 +1441,6 @@ impl DiskAPI for LocalDisk {
|
||||
if !buf.is_empty() {
|
||||
let _ = meta.unmarshal_msg(&buf).map_err(|_| {
|
||||
meta = FileMeta::new();
|
||||
()
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1509,7 +1492,7 @@ impl DiskAPI for LocalDisk {
|
||||
_force_del_marker: bool,
|
||||
_opts: DeleteOptions,
|
||||
) -> Result<RawFileInfo> {
|
||||
let _volume_dir = self.get_bucket_path(&volume)?;
|
||||
let _volume_dir = self.get_bucket_path(volume)?;
|
||||
|
||||
// self.read_all_data(bucket, volume_dir, path);
|
||||
|
||||
|
||||
@@ -399,7 +399,7 @@ impl LocalFileWriter {
|
||||
#[async_trait::async_trait]
|
||||
impl Write for LocalFileWriter {
|
||||
async fn write(&mut self, buf: &[u8]) -> Result<()> {
|
||||
self.inner.write(buf).await?;
|
||||
let _ = self.inner.write(buf).await?;
|
||||
self.inner.flush().await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -55,9 +55,7 @@ pub async fn make_dir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Path>) ->
|
||||
check_path_length(path.as_ref().to_string_lossy().to_string().as_str())?;
|
||||
|
||||
if let Err(e) = reliable_mkdir_all(path.as_ref(), base_dir.as_ref()).await {
|
||||
if is_sys_err_not_dir(&e) {
|
||||
return Err(Error::new(DiskError::FileAccessDenied));
|
||||
} else if is_sys_err_path_not_found(&e) {
|
||||
if is_sys_err_not_dir(&e) || is_sys_err_path_not_found(&e) {
|
||||
return Err(Error::new(DiskError::FileAccessDenied));
|
||||
}
|
||||
|
||||
@@ -78,7 +76,7 @@ pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> Result<Vec<String>>
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
|
||||
if name == "" || name == "." || name == ".." {
|
||||
if name.is_empty() || name == "." || name == ".." {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -103,9 +101,7 @@ pub async fn rename_all(
|
||||
base_dir: impl AsRef<Path>,
|
||||
) -> Result<()> {
|
||||
reliable_rename(src_file_path, dst_file_path, base_dir).await.map_err(|e| {
|
||||
if is_sys_err_not_dir(&e) || !os_is_not_exist(&e) {
|
||||
Error::new(DiskError::FileAccessDenied)
|
||||
} else if is_sys_err_path_not_found(&e) {
|
||||
if is_sys_err_not_dir(&e) || !os_is_not_exist(&e) || is_sys_err_path_not_found(&e) {
|
||||
Error::new(DiskError::FileAccessDenied)
|
||||
} else if os_is_not_exist(&e) {
|
||||
Error::new(DiskError::FileNotFound)
|
||||
@@ -174,10 +170,8 @@ pub async fn reliable_mkdir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Pat
|
||||
}
|
||||
|
||||
pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>) -> io::Result<()> {
|
||||
if !base_dir.as_ref().to_string_lossy().is_empty() {
|
||||
if base_dir.as_ref().starts_with(dir_path.as_ref()) {
|
||||
return Ok(());
|
||||
}
|
||||
if !base_dir.as_ref().to_string_lossy().is_empty() && base_dir.as_ref().starts_with(dir_path.as_ref()) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(parent) = dir_path.as_ref().parent() {
|
||||
|
||||
@@ -66,7 +66,7 @@ impl DiskAPI for RemoteDisk {
|
||||
}
|
||||
async fn is_online(&self) -> bool {
|
||||
// TODO: 连接状态
|
||||
if let Ok(_) = node_service_time_out_client(&self.addr).await {
|
||||
if (node_service_time_out_client(&self.addr).await).is_ok() {
|
||||
return true;
|
||||
}
|
||||
false
|
||||
@@ -90,7 +90,7 @@ impl DiskAPI for RemoteDisk {
|
||||
}
|
||||
|
||||
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
|
||||
Ok(self.id.lock().await.clone())
|
||||
Ok(*self.id.lock().await)
|
||||
}
|
||||
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
|
||||
let mut lock = self.id.lock().await;
|
||||
@@ -103,7 +103,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("read_all");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadAllRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -125,7 +125,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("write_all");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WriteAllRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -147,7 +147,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let options = serde_json::to_string(&opt)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -167,7 +167,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("rename_part");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(RenamePartRequst {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
src_volume: src_volume.to_string(),
|
||||
@@ -189,7 +189,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("rename_file");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(RenameFileRequst {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
src_volume: src_volume.to_string(),
|
||||
@@ -217,7 +217,7 @@ impl DiskAPI for RemoteDisk {
|
||||
false,
|
||||
node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?,
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
@@ -233,7 +233,7 @@ impl DiskAPI for RemoteDisk {
|
||||
true,
|
||||
node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?,
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
@@ -248,7 +248,7 @@ impl DiskAPI for RemoteDisk {
|
||||
path.to_string(),
|
||||
node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?,
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?,
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
@@ -258,7 +258,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("list_dir");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ListDirRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -278,7 +278,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let walk_dir_options = serde_json::to_string(&opts)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WalkDirRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
walk_dir_options,
|
||||
@@ -311,7 +311,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let file_info = serde_json::to_string(&fi)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(RenameDataRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
src_volume: src_volume.to_string(),
|
||||
@@ -336,7 +336,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("make_volumes");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(MakeVolumesRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
|
||||
@@ -355,7 +355,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("make_volume");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(MakeVolumeRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -374,7 +374,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("list_volumes");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ListVolumesRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
});
|
||||
@@ -398,7 +398,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("stat_volume");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(StatVolumeRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -420,7 +420,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let paths = paths.iter().map(|s| s.to_string()).collect::<Vec<String>>();
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeletePathsRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -442,7 +442,7 @@ impl DiskAPI for RemoteDisk {
|
||||
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(UpdateMetadataRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -465,7 +465,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let file_info = serde_json::to_string(&fi)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WriteMetadataRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -494,7 +494,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let opts = serde_json::to_string(opts)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadVersionRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -518,7 +518,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("read_xl");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadXlRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -550,7 +550,7 @@ impl DiskAPI for RemoteDisk {
|
||||
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteVersionRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -584,7 +584,7 @@ impl DiskAPI for RemoteDisk {
|
||||
}
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteVersionsRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
@@ -619,7 +619,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let read_multiple_req = serde_json::to_string(&req)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadMultipleRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
read_multiple_req,
|
||||
@@ -644,7 +644,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("delete_volume");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteVolumeRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
volume: volume.to_string(),
|
||||
|
||||
@@ -98,7 +98,7 @@ impl DisksLayout {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
debug!("{} not set use default:0, {:?}", ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT, err);
|
||||
format!("0")
|
||||
"0".to_string()
|
||||
}
|
||||
};
|
||||
let set_drive_count: usize = set_drive_count_env.parse()?;
|
||||
@@ -425,8 +425,7 @@ fn get_set_indexes<T: AsRef<str>>(
|
||||
)));
|
||||
}
|
||||
// Final set size with all the symmetry accounted for.
|
||||
let set_size = common_set_drive_count(common_size, &set_counts);
|
||||
set_size
|
||||
common_set_drive_count(common_size, &set_counts)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -444,7 +444,7 @@ impl EndpointServerPools {
|
||||
}
|
||||
|
||||
pub fn es_count(&self) -> usize {
|
||||
self.0.iter().map(|v| v.set_count).count()
|
||||
self.0.iter().map(|v| v.set_count).sum()
|
||||
}
|
||||
|
||||
/// add pool endpoints
|
||||
|
||||
@@ -67,11 +67,8 @@ impl Error {
|
||||
}
|
||||
|
||||
pub fn to_io_err(&self) -> Option<io::Error> {
|
||||
if let Some(e) = self.downcast_ref::<io::Error>() {
|
||||
Some(io::Error::new(e.kind(), e.to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
self.downcast_ref::<io::Error>()
|
||||
.map(|e| io::Error::new(e.kind(), e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,12 +35,12 @@ pub async fn set_object_layer(o: ECStore) {
|
||||
|
||||
pub async fn is_dist_erasure() -> bool {
|
||||
let lock = GLOBAL_IsDistErasure.read().await;
|
||||
*lock == true
|
||||
*lock
|
||||
}
|
||||
|
||||
pub async fn is_erasure() -> bool {
|
||||
let lock = GLOBAL_IsErasure.read().await;
|
||||
*lock == true
|
||||
*lock
|
||||
}
|
||||
|
||||
pub async fn update_erasure_type(setup_type: SetupType) {
|
||||
|
||||
@@ -145,7 +145,7 @@ impl S3PeerSys {
|
||||
pub async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
|
||||
let mut futures = Vec::with_capacity(self.clients.len());
|
||||
for cli in self.clients.iter() {
|
||||
futures.push(cli.delete_bucket(bucket, &opts));
|
||||
futures.push(cli.delete_bucket(bucket, opts));
|
||||
}
|
||||
|
||||
let mut errors = Vec::with_capacity(self.clients.len());
|
||||
@@ -385,13 +385,10 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
|
||||
// errVolumeNotEmpty 不删除,把已经删除的重新创建
|
||||
|
||||
let mut idx = 0;
|
||||
for err in errs {
|
||||
for (idx, err) in errs.into_iter().enumerate() {
|
||||
if err.is_none() && recreate {
|
||||
let _ = local_disks[idx].make_volume(bucket).await;
|
||||
}
|
||||
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
if recreate {
|
||||
@@ -413,7 +410,7 @@ pub struct RemotePeerS3Client {
|
||||
|
||||
impl RemotePeerS3Client {
|
||||
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
|
||||
let addr = format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
|
||||
let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string();
|
||||
Self { node, pools, addr }
|
||||
}
|
||||
}
|
||||
@@ -427,7 +424,7 @@ impl PeerS3Client for RemotePeerS3Client {
|
||||
let options = serde_json::to_string(opts)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ListBucketRequest { options });
|
||||
let response = client.list_bucket(request).await?.into_inner();
|
||||
let bucket_infos = response
|
||||
@@ -442,7 +439,7 @@ impl PeerS3Client for RemotePeerS3Client {
|
||||
let options = serde_json::to_string(opts)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(MakeBucketRequest {
|
||||
name: bucket.to_string(),
|
||||
options,
|
||||
@@ -460,7 +457,7 @@ impl PeerS3Client for RemotePeerS3Client {
|
||||
let options = serde_json::to_string(opts)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(GetBucketInfoRequest {
|
||||
bucket: bucket.to_string(),
|
||||
options,
|
||||
@@ -474,7 +471,7 @@ impl PeerS3Client for RemotePeerS3Client {
|
||||
async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?;
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
|
||||
let request = Request::new(DeleteBucketRequest {
|
||||
bucket: bucket.to_string(),
|
||||
|
||||
@@ -41,12 +41,12 @@ pub fn object_op_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
|
||||
}
|
||||
|
||||
// 用于检查错误是否被忽略的函数
|
||||
fn is_err_ignored(err: &Error, ignored_errs: &Vec<Box<dyn CheckErrorFn>>) -> bool {
|
||||
fn is_err_ignored(err: &Error, ignored_errs: &[Box<dyn CheckErrorFn>]) -> bool {
|
||||
ignored_errs.iter().any(|ignored_err| ignored_err.is(err))
|
||||
}
|
||||
|
||||
// 减少错误数量并返回出现次数最多的错误
|
||||
fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorFn>>) -> (usize, Option<Error>) {
|
||||
fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -> (usize, Option<Error>) {
|
||||
let mut error_counts: HashMap<String, usize> = HashMap::new();
|
||||
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存err位置
|
||||
let nil = "nil".to_string();
|
||||
@@ -59,7 +59,7 @@ fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorF
|
||||
|
||||
let err = operr.as_ref().unwrap();
|
||||
|
||||
if is_err_ignored(err, &ignored_errs) {
|
||||
if is_err_ignored(err, ignored_errs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorF
|
||||
|
||||
let mut max = 0;
|
||||
let mut max_err = nil.clone();
|
||||
for (&ref err, &count) in error_counts.iter() {
|
||||
for (err, &count) in error_counts.iter() {
|
||||
if count > max || (count == max && *err == nil) {
|
||||
max = count;
|
||||
max_err = err.clone();
|
||||
@@ -93,8 +93,8 @@ fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorF
|
||||
|
||||
// 根据quorum验证错误数量
|
||||
fn reduce_quorum_errs(
|
||||
errs: &Vec<Option<Error>>,
|
||||
ignored_errs: &Vec<Box<dyn CheckErrorFn>>,
|
||||
errs: &[Option<Error>],
|
||||
ignored_errs: &[Box<dyn CheckErrorFn>],
|
||||
quorum: usize,
|
||||
quorum_err: QuorumError,
|
||||
) -> Option<Error> {
|
||||
@@ -109,8 +109,8 @@ fn reduce_quorum_errs(
|
||||
// 根据读quorum验证错误数量
|
||||
// 返回最大错误数量的下标,或QuorumError
|
||||
pub fn reduce_read_quorum_errs(
|
||||
errs: &Vec<Option<Error>>,
|
||||
ignored_errs: &Vec<Box<dyn CheckErrorFn>>,
|
||||
errs: &[Option<Error>],
|
||||
ignored_errs: &[Box<dyn CheckErrorFn>],
|
||||
read_quorum: usize,
|
||||
) -> Option<Error> {
|
||||
reduce_quorum_errs(errs, ignored_errs, read_quorum, QuorumError::Read)
|
||||
@@ -119,8 +119,8 @@ pub fn reduce_read_quorum_errs(
|
||||
// 根据写quorum验证错误数量
|
||||
// 返回最大错误数量的下标,或QuorumError
|
||||
pub fn reduce_write_quorum_errs(
|
||||
errs: &Vec<Option<Error>>,
|
||||
ignored_errs: &Vec<Box<dyn CheckErrorFn>>,
|
||||
errs: &[Option<Error>],
|
||||
ignored_errs: &[Box<dyn CheckErrorFn>],
|
||||
write_quorum: usize,
|
||||
) -> Option<Error> {
|
||||
reduce_quorum_errs(errs, ignored_errs, write_quorum, QuorumError::Write)
|
||||
|
||||
@@ -68,7 +68,7 @@ impl Sets {
|
||||
}
|
||||
|
||||
if !endpoint.is_local {
|
||||
let host_port = format!("{}:{}", endpoint.url.host_str().unwrap(), endpoint.url.port().unwrap().to_string());
|
||||
let host_port = format!("{}:{}", endpoint.url.host_str().unwrap(), endpoint.url.port().unwrap());
|
||||
if !unique[set_idx].contains(&host_port) {
|
||||
unique[set_idx].push(host_port);
|
||||
lockers[set_idx].push(new_lock_api(false, Some(endpoint.url.clone())));
|
||||
|
||||
@@ -194,7 +194,7 @@ impl ECStore {
|
||||
let mut uniq = HashSet::new();
|
||||
|
||||
for (disks_ress, _disks_errs) in results {
|
||||
for (_i, disks_res) in disks_ress.iter().enumerate() {
|
||||
for disks_res in disks_ress.iter() {
|
||||
if disks_res.is_none() {
|
||||
// TODO handle errs
|
||||
continue;
|
||||
|
||||
@@ -198,12 +198,11 @@ async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Ve
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
let mut i = 0;
|
||||
for result in results {
|
||||
for (i, result) in results.into_iter().enumerate() {
|
||||
match result {
|
||||
Ok(s) => {
|
||||
if !heal {
|
||||
let _ = disks[i].as_ref().unwrap().set_disk_id(Some(s.erasure.this.clone())).await;
|
||||
let _ = disks[i].as_ref().unwrap().set_disk_id(Some(s.erasure.this)).await;
|
||||
}
|
||||
|
||||
datas.push(Some(s));
|
||||
@@ -214,8 +213,6 @@ async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Ve
|
||||
errors.push(Some(e));
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
(datas, errors)
|
||||
|
||||
@@ -27,13 +27,10 @@ fn deep_match_rune(str_: &[u8], pattern: &[u8], simple: bool) -> bool {
|
||||
'*' => {
|
||||
return if pattern.len() == 1 {
|
||||
true
|
||||
} else if deep_match_rune(&str_[..], &pattern[1..], simple)
|
||||
|| (!str_.is_empty() && deep_match_rune(&str_[1..], pattern, simple))
|
||||
{
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
deep_match_rune(str_, &pattern[1..], simple)
|
||||
|| (!str_.is_empty() && deep_match_rune(&str_[1..], pattern, simple))
|
||||
}
|
||||
}
|
||||
'?' => {
|
||||
if str_.is_empty() {
|
||||
|
||||
@@ -115,7 +115,7 @@ impl Node for NodeService {
|
||||
return Ok(tonic::Response::new(ListBucketResponse {
|
||||
success: false,
|
||||
bucket_infos: Vec::new(),
|
||||
error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())),
|
||||
error_info: Some(format!("decode BucketOptions failed: {}", err)),
|
||||
}))
|
||||
}
|
||||
};
|
||||
@@ -135,7 +135,7 @@ impl Node for NodeService {
|
||||
Err(err) => Ok(tonic::Response::new(ListBucketResponse {
|
||||
success: false,
|
||||
bucket_infos: Vec::new(),
|
||||
error_info: Some(format!("make failed: {}", err.to_string())),
|
||||
error_info: Some(format!("make failed: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -149,7 +149,7 @@ impl Node for NodeService {
|
||||
Err(err) => {
|
||||
return Ok(tonic::Response::new(MakeBucketResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("decode MakeBucketOptions failed: {}", err.to_string())),
|
||||
error_info: Some(format!("decode MakeBucketOptions failed: {}", err)),
|
||||
}))
|
||||
}
|
||||
};
|
||||
@@ -160,7 +160,7 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(MakeBucketResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("make failed: {}", err.to_string())),
|
||||
error_info: Some(format!("make failed: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -175,7 +175,7 @@ impl Node for NodeService {
|
||||
return Ok(tonic::Response::new(GetBucketInfoResponse {
|
||||
success: false,
|
||||
bucket_info: String::new(),
|
||||
error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())),
|
||||
error_info: Some(format!("decode BucketOptions failed: {}", err)),
|
||||
}))
|
||||
}
|
||||
};
|
||||
@@ -187,7 +187,7 @@ impl Node for NodeService {
|
||||
return Ok(tonic::Response::new(GetBucketInfoResponse {
|
||||
success: false,
|
||||
bucket_info: String::new(),
|
||||
error_info: Some(format!("encode BucketInfo failed: {}", err.to_string())),
|
||||
error_info: Some(format!("encode BucketInfo failed: {}", err)),
|
||||
}));
|
||||
}
|
||||
};
|
||||
@@ -201,7 +201,7 @@ impl Node for NodeService {
|
||||
Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse {
|
||||
success: false,
|
||||
bucket_info: String::new(),
|
||||
error_info: Some(format!("make failed: {}", err.to_string())),
|
||||
error_info: Some(format!("make failed: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -221,7 +221,7 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(DeleteBucketResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("make failed: {}", err.to_string())),
|
||||
error_info: Some(format!("make failed: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -767,7 +767,7 @@ impl Node for NodeService {
|
||||
Err(err) => Ok(tonic::Response::new(StatVolumeResponse {
|
||||
success: false,
|
||||
volume_info: String::new(),
|
||||
error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())),
|
||||
error_info: Some(format!("encode VolumeInfo failed, {}", err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(StatVolumeResponse {
|
||||
@@ -855,7 +855,7 @@ impl Node for NodeService {
|
||||
Err(err) => {
|
||||
return Ok(tonic::Response::new(WriteMetadataResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("decode FileInfo failed, {}", err.to_string())),
|
||||
error_info: Some(format!("decode FileInfo failed, {}", err)),
|
||||
}));
|
||||
}
|
||||
};
|
||||
@@ -903,7 +903,7 @@ impl Node for NodeService {
|
||||
Err(err) => Ok(tonic::Response::new(ReadVersionResponse {
|
||||
success: false,
|
||||
file_info: String::new(),
|
||||
error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())),
|
||||
error_info: Some(format!("encode VolumeInfo failed, {}", err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(ReadVersionResponse {
|
||||
@@ -934,7 +934,7 @@ impl Node for NodeService {
|
||||
Err(err) => Ok(tonic::Response::new(ReadXlResponse {
|
||||
success: false,
|
||||
raw_file_info: String::new(),
|
||||
error_info: Some(format!("encode RawFileInfo failed, {}", err.to_string())),
|
||||
error_info: Some(format!("encode RawFileInfo failed, {}", err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(ReadXlResponse {
|
||||
@@ -1011,7 +1011,7 @@ impl Node for NodeService {
|
||||
if let Some(disk) = self.find_disk(&request.disk).await {
|
||||
let mut versions = Vec::with_capacity(request.versions.len());
|
||||
for version in request.versions.iter() {
|
||||
match serde_json::from_str::<FileInfoVersions>(&version) {
|
||||
match serde_json::from_str::<FileInfoVersions>(version) {
|
||||
Ok(version) => versions.push(version),
|
||||
Err(_) => {
|
||||
return Ok(tonic::Response::new(DeleteVersionsResponse {
|
||||
@@ -1135,12 +1135,12 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not lock, args: {}, err: {}", args, err.to_string())),
|
||||
error_info: Some(format!("can not lock, args: {}, err: {}", args, err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
|
||||
error_info: Some(format!("can not decode args, err: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -1155,12 +1155,12 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not unlock, args: {}, err: {}", args, err.to_string())),
|
||||
error_info: Some(format!("can not unlock, args: {}, err: {}", args, err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
|
||||
error_info: Some(format!("can not decode args, err: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -1175,12 +1175,12 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not rlock, args: {}, err: {}", args, err.to_string())),
|
||||
error_info: Some(format!("can not rlock, args: {}, err: {}", args, err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
|
||||
error_info: Some(format!("can not decode args, err: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -1195,12 +1195,12 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not runlock, args: {}, err: {}", args, err.to_string())),
|
||||
error_info: Some(format!("can not runlock, args: {}, err: {}", args, err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
|
||||
error_info: Some(format!("can not decode args, err: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -1215,12 +1215,12 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not force_unlock, args: {}, err: {}", args, err.to_string())),
|
||||
error_info: Some(format!("can not force_unlock, args: {}, err: {}", args, err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
|
||||
error_info: Some(format!("can not decode args, err: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -1235,12 +1235,12 @@ impl Node for NodeService {
|
||||
})),
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not refresh, args: {}, err: {}", args, err.to_string())),
|
||||
error_info: Some(format!("can not refresh, args: {}, err: {}", args, err)),
|
||||
})),
|
||||
},
|
||||
Err(err) => Ok(tonic::Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!("can not decode args, err: {}", err.to_string())),
|
||||
error_info: Some(format!("can not decode args, err: {}", err)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
try_!(
|
||||
@@ -112,7 +112,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
try_!(
|
||||
store
|
||||
@@ -147,7 +147,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
let (dobjs, _errs) = try_!(store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
|
||||
|
||||
@@ -205,7 +205,7 @@ impl S3 for FS {
|
||||
.unwrap_or_default();
|
||||
ObjectToDelete {
|
||||
object_name: v.key.clone(),
|
||||
version_id: version_id,
|
||||
version_id,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -214,7 +214,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let (dobjs, _errs) = try_!(store.delete_objects(&bucket, objects, ObjectOptions::default()).await);
|
||||
@@ -255,7 +255,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions::default()).await {
|
||||
@@ -289,7 +289,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let reader = try_!(store.get_object_reader(bucket.as_str(), key.as_str(), range, h, opts).await);
|
||||
@@ -319,7 +319,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions::default()).await {
|
||||
@@ -343,7 +343,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let info = try_!(store.get_object_info(&bucket, &key, &ObjectOptions::default()).await);
|
||||
@@ -370,7 +370,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let bucket_infos = try_!(store.list_bucket(&BucketOptions::default()).await);
|
||||
@@ -425,7 +425,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let object_infos = try_!(
|
||||
@@ -449,10 +449,12 @@ impl S3 for FS {
|
||||
.iter()
|
||||
.filter(|v| !v.name.is_empty())
|
||||
.map(|v| {
|
||||
let mut obj = Object::default();
|
||||
obj.key = Some(v.name.to_owned());
|
||||
obj.last_modified = v.mod_time.map(Timestamp::from);
|
||||
obj.size = Some(v.size as i64);
|
||||
let mut obj = Object {
|
||||
key: Some(v.name.to_owned()),
|
||||
last_modified: v.mod_time.map(Timestamp::from),
|
||||
size: Some(v.size as i64),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if fetch_owner.is_some_and(|v| v) {
|
||||
obj.owner = Some(Owner {
|
||||
@@ -512,7 +514,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
try_!(store.put_object(&bucket, &key, reader, &ObjectOptions::default()).await);
|
||||
@@ -540,7 +542,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let MultipartUploadResult { upload_id, .. } =
|
||||
@@ -583,7 +585,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
try_!(store.put_object_part(&bucket, &key, &upload_id, part_id, data, &opts).await);
|
||||
@@ -646,7 +648,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
try_!(
|
||||
@@ -676,7 +678,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
let opts = &ObjectOptions::default();
|
||||
@@ -819,7 +821,7 @@ impl S3 for FS {
|
||||
tag_set: object_info
|
||||
.tags
|
||||
.map(|tags| tags.into_iter().map(|(key, value)| Tag { key, value }).collect())
|
||||
.unwrap_or_else(|| vec![]),
|
||||
.unwrap_or_else(Vec::new),
|
||||
version_id: None,
|
||||
}))
|
||||
}
|
||||
@@ -859,7 +861,7 @@ impl S3 for FS {
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
|
||||
};
|
||||
|
||||
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
|
||||
|
||||
Reference in New Issue
Block a user