diff --git a/common/common/src/last_minute.rs b/common/common/src/last_minute.rs new file mode 100644 index 00000000..6141a318 --- /dev/null +++ b/common/common/src/last_minute.rs @@ -0,0 +1,106 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +#[derive(Clone, Debug, Default)] +pub struct AccElem { + pub total: u64, + pub size: u64, + pub n: u64, +} + +impl AccElem { + pub fn add(&mut self, dur: &Duration) { + let dur = dur.as_secs(); + self.total += dur; + self.n += 1; + } + + pub fn merge(&mut self, b: &AccElem) { + self.n += b.n; + self.total += b.total; + self.size += b.size; + } + + pub fn avg(&self) -> Duration { + if self.n >= 1 && self.total > 0 { + return Duration::from_secs(self.total / self.n); + } + Duration::from_secs(0) + } +} + +#[derive(Clone)] +pub struct LastMinuteLatency { + pub totals: Vec, + pub last_sec: u64, +} + +impl Default for LastMinuteLatency { + fn default() -> Self { + Self { + totals: vec![AccElem::default(); 60], + last_sec: Default::default(), + } + } +} + +impl LastMinuteLatency { + pub fn merge(&mut self, o: &mut LastMinuteLatency) -> LastMinuteLatency { + let mut merged = LastMinuteLatency::default(); + if self.last_sec > o.last_sec { + o.forward_to(self.last_sec); + merged.last_sec = self.last_sec; + } else { + self.forward_to(o.last_sec); + merged.last_sec = o.last_sec; + } + + for i in 0..merged.totals.len() { + merged.totals[i] = AccElem { + total: self.totals[i].total + o.totals[i].total, + n: self.totals[i].n + o.totals[i].n, + size: self.totals[i].size + o.totals[i].size, + } + } + merged + } + + pub fn add(&mut self, t: &Duration) { + let sec = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs(); + self.forward_to(sec); + let win_idx = sec % 60; + self.totals[win_idx as usize].add(t); + self.last_sec = sec; + } + + pub fn add_all(&mut self, sec: u64, a: &AccElem) { + self.forward_to(sec); + let win_idx = sec % 60; + self.totals[win_idx as usize].merge(a); + self.last_sec = sec; + } + + pub fn get_total(&mut self) -> AccElem { + let mut res = AccElem::default(); + let sec = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs(); + self.forward_to(sec); + for elem in self.totals.iter() { + res.merge(elem); + } + res + } + + pub fn forward_to(&mut self, t: u64) { + if self.last_sec >= t { + return; + } + if t - self.last_sec >= 60 { + self.totals = vec![AccElem::default(); 60]; + return; + } + while self.last_sec != t { + let idx = (self.last_sec + 1) % 60; + self.totals[idx as usize] = AccElem::default(); + self.last_sec += 1; + } + } +} diff --git a/common/common/src/lib.rs b/common/common/src/lib.rs index dc9ee425..a8998fee 100644 --- a/common/common/src/lib.rs +++ b/common/common/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; pub mod globals; +pub mod last_minute; /// Defers evaluation of a block of code until the end of the scope. #[macro_export] diff --git a/common/protos/src/generated/flatbuffers_generated/models.rs b/common/protos/src/generated/flatbuffers_generated/models.rs index e4949fdc..aa1f6ae2 100644 --- a/common/protos/src/generated/flatbuffers_generated/models.rs +++ b/common/protos/src/generated/flatbuffers_generated/models.rs @@ -1,9 +1,10 @@ // automatically generated by the FlatBuffers compiler, do not modify + // @generated -use core::cmp::Ordering; use core::mem; +use core::cmp::Ordering; extern crate flatbuffers; use self::flatbuffers::{EndianScalar, Follow}; @@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow}; #[allow(unused_imports, dead_code)] pub mod models { - use core::cmp::Ordering; - use core::mem; + use core::mem; + use core::cmp::Ordering; - extern crate flatbuffers; - use self::flatbuffers::{EndianScalar, Follow}; + extern crate flatbuffers; + use self::flatbuffers::{EndianScalar, Follow}; - pub enum PingBodyOffset {} - #[derive(Copy, Clone, PartialEq)] +pub enum PingBodyOffset {} +#[derive(Copy, Clone, PartialEq)] - pub struct PingBody<'a> { - pub _tab: flatbuffers::Table<'a>, +pub struct PingBody<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for PingBody<'a> { + type Inner = PingBody<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> PingBody<'a> { + pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4; + + pub const fn get_fully_qualified_name() -> &'static str { + "models.PingBody" + } + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + PingBody { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args PingBodyArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = PingBodyBuilder::new(_fbb); + if let Some(x) = args.payload { builder.add_payload(x); } + builder.finish() + } + + + #[inline] + pub fn payload(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>>(PingBody::VT_PAYLOAD, None)} + } +} + +impl flatbuffers::Verifiable for PingBody<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>>("payload", Self::VT_PAYLOAD, false)? + .finish(); + Ok(()) + } +} +pub struct PingBodyArgs<'a> { + pub payload: Option>>, +} +impl<'a> Default for PingBodyArgs<'a> { + #[inline] + fn default() -> Self { + PingBodyArgs { + payload: None, } + } +} - impl<'a> flatbuffers::Follow<'a> for PingBody<'a> { - type Inner = PingBody<'a>; - #[inline] - unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { - _tab: flatbuffers::Table::new(buf, loc), - } - } +pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> { + #[inline] + pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(PingBody::VT_PAYLOAD, payload); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + PingBodyBuilder { + fbb_: _fbb, + start_: start, } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} - impl<'a> PingBody<'a> { - pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4; +impl core::fmt::Debug for PingBody<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("PingBody"); + ds.field("payload", &self.payload()); + ds.finish() + } +} +} // pub mod models - pub const fn get_fully_qualified_name() -> &'static str { - "models.PingBody" - } - - #[inline] - pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - PingBody { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, - args: &'args PingBodyArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = PingBodyBuilder::new(_fbb); - if let Some(x) = args.payload { - builder.add_payload(x); - } - builder.finish() - } - - #[inline] - pub fn payload(&self) -> Option> { - // Safety: - // Created from valid Table for this object - // which contains a valid value in this slot - unsafe { - self._tab - .get::>>(PingBody::VT_PAYLOAD, None) - } - } - } - - impl flatbuffers::Verifiable for PingBody<'_> { - #[inline] - fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use self::flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::>>("payload", Self::VT_PAYLOAD, false)? - .finish(); - Ok(()) - } - } - pub struct PingBodyArgs<'a> { - pub payload: Option>>, - } - impl<'a> Default for PingBodyArgs<'a> { - #[inline] - fn default() -> Self { - PingBodyArgs { payload: None } - } - } - - pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, - start_: flatbuffers::WIPOffset, - } - impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> { - #[inline] - pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset>) { - self.fbb_ - .push_slot_always::>(PingBody::VT_PAYLOAD, payload); - } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> { - let start = _fbb.start_table(); - PingBodyBuilder { - fbb_: _fbb, - start_: start, - } - } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) - } - } - - impl core::fmt::Debug for PingBody<'_> { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let mut ds = f.debug_struct("PingBody"); - ds.field("payload", &self.payload()); - ds.finish() - } - } -} // pub mod models diff --git a/common/protos/src/generated/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs index 2a4fcf8e..9d13aa58 100644 --- a/common/protos/src/generated/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -580,9 +580,15 @@ pub struct GenerallyLockResponse { } /// Generated client implementations. pub mod node_service_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::wildcard_imports, clippy::let_unit_value)] - use tonic::codegen::http::Uri; + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct NodeServiceClient { inner: tonic::client::Grpc, @@ -613,16 +619,22 @@ pub mod node_service_client { let inner = tonic::client::Grpc::with_origin(inner, origin); Self { inner } } - pub fn with_interceptor(inner: T, interceptor: F) -> NodeServiceClient> + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NodeServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, - Response = http::Response<>::ResponseBody>, + Response = http::Response< + >::ResponseBody, + >, >, - >>::Error: - Into + std::marker::Send + std::marker::Sync, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, { NodeServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -665,9 +677,15 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Ping"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Ping", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Ping")); @@ -676,13 +694,22 @@ pub mod node_service_client { pub async fn list_bucket( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListBucket"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ListBucket", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ListBucket")); @@ -691,13 +718,22 @@ pub mod node_service_client { pub async fn make_bucket( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeBucket"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/MakeBucket", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "MakeBucket")); @@ -706,13 +742,22 @@ pub mod node_service_client { pub async fn get_bucket_info( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/GetBucketInfo"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/GetBucketInfo", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "GetBucketInfo")); @@ -721,13 +766,22 @@ pub mod node_service_client { pub async fn delete_bucket( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteBucket"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteBucket", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteBucket")); @@ -736,13 +790,22 @@ pub mod node_service_client { pub async fn read_all( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAll"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadAll", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadAll")); @@ -751,13 +814,22 @@ pub mod node_service_client { pub async fn write_all( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteAll"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WriteAll", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WriteAll")); @@ -770,9 +842,15 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Delete"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Delete", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Delete")); @@ -781,13 +859,22 @@ pub mod node_service_client { pub async fn verify_file( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/VerifyFile"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/VerifyFile", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "VerifyFile")); @@ -796,13 +883,22 @@ pub mod node_service_client { pub async fn check_parts( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/CheckParts"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/CheckParts", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "CheckParts")); @@ -811,13 +907,22 @@ pub mod node_service_client { pub async fn rename_part( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenamePart"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RenamePart", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RenamePart")); @@ -826,13 +931,22 @@ pub mod node_service_client { pub async fn rename_file( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenameFile"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RenameFile", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RenameFile")); @@ -845,9 +959,15 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Write"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Write", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Write")); @@ -856,13 +976,22 @@ pub mod node_service_client { pub async fn write_stream( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> std::result::Result>, tonic::Status> { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteStream"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WriteStream", + ); let mut req = request.into_streaming_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WriteStream")); @@ -872,13 +1001,22 @@ pub mod node_service_client { pub async fn read_at( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> std::result::Result>, tonic::Status> { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAt"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadAt", + ); let mut req = request.into_streaming_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadAt")); @@ -887,13 +1025,22 @@ pub mod node_service_client { pub async fn list_dir( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListDir"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ListDir", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ListDir")); @@ -902,13 +1049,22 @@ pub mod node_service_client { pub async fn walk_dir( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WalkDir"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WalkDir", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WalkDir")); @@ -917,13 +1073,22 @@ pub mod node_service_client { pub async fn rename_data( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenameData"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RenameData", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RenameData")); @@ -932,13 +1097,22 @@ pub mod node_service_client { pub async fn make_volumes( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeVolumes"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/MakeVolumes", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "MakeVolumes")); @@ -947,13 +1121,22 @@ pub mod node_service_client { pub async fn make_volume( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeVolume"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/MakeVolume", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "MakeVolume")); @@ -962,13 +1145,22 @@ pub mod node_service_client { pub async fn list_volumes( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListVolumes"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ListVolumes", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ListVolumes")); @@ -977,13 +1169,22 @@ pub mod node_service_client { pub async fn stat_volume( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/StatVolume"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/StatVolume", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "StatVolume")); @@ -992,13 +1193,22 @@ pub mod node_service_client { pub async fn delete_paths( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeletePaths"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeletePaths", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeletePaths")); @@ -1007,13 +1217,22 @@ pub mod node_service_client { pub async fn update_metadata( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/UpdateMetadata"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/UpdateMetadata", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "UpdateMetadata")); @@ -1022,13 +1241,22 @@ pub mod node_service_client { pub async fn write_metadata( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteMetadata"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WriteMetadata", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WriteMetadata")); @@ -1037,13 +1265,22 @@ pub mod node_service_client { pub async fn read_version( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadVersion"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadVersion", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadVersion")); @@ -1056,9 +1293,15 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadXL"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadXL", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadXL")); @@ -1067,13 +1310,22 @@ pub mod node_service_client { pub async fn delete_version( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteVersion"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteVersion", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteVersion")); @@ -1082,13 +1334,22 @@ pub mod node_service_client { pub async fn delete_versions( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteVersions"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteVersions", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteVersions")); @@ -1097,13 +1358,22 @@ pub mod node_service_client { pub async fn read_multiple( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadMultiple"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadMultiple", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadMultiple")); @@ -1112,13 +1382,22 @@ pub mod node_service_client { pub async fn delete_volume( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteVolume"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteVolume", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteVolume")); @@ -1127,13 +1406,22 @@ pub mod node_service_client { pub async fn disk_info( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DiskInfo"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DiskInfo", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DiskInfo")); @@ -1142,13 +1430,22 @@ pub mod node_service_client { pub async fn lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Lock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Lock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Lock")); @@ -1157,13 +1454,22 @@ pub mod node_service_client { pub async fn un_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/UnLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/UnLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "UnLock")); @@ -1172,13 +1478,22 @@ pub mod node_service_client { pub async fn r_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RLock")); @@ -1187,13 +1502,22 @@ pub mod node_service_client { pub async fn r_un_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RUnLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RUnLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RUnLock")); @@ -1202,13 +1526,22 @@ pub mod node_service_client { pub async fn force_un_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ForceUnLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ForceUnLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ForceUnLock")); @@ -1217,13 +1550,22 @@ pub mod node_service_client { pub async fn refresh( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::unknown(format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Refresh"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Refresh", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Refresh")); @@ -1233,7 +1575,13 @@ pub mod node_service_client { } /// Generated server implementations. pub mod node_service_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::wildcard_imports, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with NodeServiceServer. #[async_trait] @@ -1246,19 +1594,31 @@ pub mod node_service_server { async fn list_bucket( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn make_bucket( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_bucket_info( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete_bucket( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_all( &self, request: tonic::Request, @@ -1266,7 +1626,10 @@ pub mod node_service_server { async fn write_all( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete( &self, request: tonic::Request, @@ -1274,33 +1637,52 @@ pub mod node_service_server { async fn verify_file( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn check_parts( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn rename_part( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn rename_file( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn write( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the WriteStream method. - type WriteStreamStream: tonic::codegen::tokio_stream::Stream> + type WriteStreamStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + 'static; async fn write_stream( &self, request: tonic::Request>, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the ReadAt method. - type ReadAtStream: tonic::codegen::tokio_stream::Stream> + type ReadAtStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + 'static; /// rpc Append(AppendRequest) returns (AppendResponse) {}; @@ -1319,39 +1701,66 @@ pub mod node_service_server { async fn rename_data( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn make_volumes( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn make_volume( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn list_volumes( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn stat_volume( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete_paths( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn update_metadata( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn write_metadata( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_version( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_xl( &self, request: tonic::Request, @@ -1359,47 +1768,80 @@ pub mod node_service_server { async fn delete_version( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete_versions( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_multiple( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete_volume( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn disk_info( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn un_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn r_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn r_un_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn force_un_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn refresh( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct NodeServiceServer { @@ -1422,7 +1864,10 @@ pub mod node_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -1466,7 +1911,10 @@ pub mod node_service_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -1474,12 +1922,21 @@ pub mod node_service_server { "/node_service.NodeService/Ping" => { #[allow(non_camel_case_types)] struct PingSvc(pub Arc); - impl tonic::server::UnaryService for PingSvc { + impl tonic::server::UnaryService + for PingSvc { type Response = super::PingResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::ping(&inner, request).await }; + let fut = async move { + ::ping(&inner, request).await + }; Box::pin(fut) } } @@ -1492,8 +1949,14 @@ pub mod node_service_server { let method = PingSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1502,12 +1965,23 @@ pub mod node_service_server { "/node_service.NodeService/ListBucket" => { #[allow(non_camel_case_types)] struct ListBucketSvc(pub Arc); - impl tonic::server::UnaryService for ListBucketSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ListBucketSvc { type Response = super::ListBucketResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::list_bucket(&inner, request).await }; + let fut = async move { + ::list_bucket(&inner, request).await + }; Box::pin(fut) } } @@ -1520,8 +1994,14 @@ pub mod node_service_server { let method = ListBucketSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1530,12 +2010,23 @@ pub mod node_service_server { "/node_service.NodeService/MakeBucket" => { #[allow(non_camel_case_types)] struct MakeBucketSvc(pub Arc); - impl tonic::server::UnaryService for MakeBucketSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for MakeBucketSvc { type Response = super::MakeBucketResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::make_bucket(&inner, request).await }; + let fut = async move { + ::make_bucket(&inner, request).await + }; Box::pin(fut) } } @@ -1548,8 +2039,14 @@ pub mod node_service_server { let method = MakeBucketSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1558,12 +2055,23 @@ pub mod node_service_server { "/node_service.NodeService/GetBucketInfo" => { #[allow(non_camel_case_types)] struct GetBucketInfoSvc(pub Arc); - impl tonic::server::UnaryService for GetBucketInfoSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for GetBucketInfoSvc { type Response = super::GetBucketInfoResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::get_bucket_info(&inner, request).await }; + let fut = async move { + ::get_bucket_info(&inner, request).await + }; Box::pin(fut) } } @@ -1576,8 +2084,14 @@ pub mod node_service_server { let method = GetBucketInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1586,12 +2100,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteBucket" => { #[allow(non_camel_case_types)] struct DeleteBucketSvc(pub Arc); - impl tonic::server::UnaryService for DeleteBucketSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteBucketSvc { type Response = super::DeleteBucketResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_bucket(&inner, request).await }; + let fut = async move { + ::delete_bucket(&inner, request).await + }; Box::pin(fut) } } @@ -1604,8 +2129,14 @@ pub mod node_service_server { let method = DeleteBucketSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1614,12 +2145,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadAll" => { #[allow(non_camel_case_types)] struct ReadAllSvc(pub Arc); - impl tonic::server::UnaryService for ReadAllSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadAllSvc { type Response = super::ReadAllResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_all(&inner, request).await }; + let fut = async move { + ::read_all(&inner, request).await + }; Box::pin(fut) } } @@ -1632,8 +2174,14 @@ pub mod node_service_server { let method = ReadAllSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1642,12 +2190,23 @@ pub mod node_service_server { "/node_service.NodeService/WriteAll" => { #[allow(non_camel_case_types)] struct WriteAllSvc(pub Arc); - impl tonic::server::UnaryService for WriteAllSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for WriteAllSvc { type Response = super::WriteAllResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write_all(&inner, request).await }; + let fut = async move { + ::write_all(&inner, request).await + }; Box::pin(fut) } } @@ -1660,8 +2219,14 @@ pub mod node_service_server { let method = WriteAllSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1670,12 +2235,23 @@ pub mod node_service_server { "/node_service.NodeService/Delete" => { #[allow(non_camel_case_types)] struct DeleteSvc(pub Arc); - impl tonic::server::UnaryService for DeleteSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteSvc { type Response = super::DeleteResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete(&inner, request).await }; + let fut = async move { + ::delete(&inner, request).await + }; Box::pin(fut) } } @@ -1688,8 +2264,14 @@ pub mod node_service_server { let method = DeleteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1698,12 +2280,23 @@ pub mod node_service_server { "/node_service.NodeService/VerifyFile" => { #[allow(non_camel_case_types)] struct VerifyFileSvc(pub Arc); - impl tonic::server::UnaryService for VerifyFileSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for VerifyFileSvc { type Response = super::VerifyFileResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::verify_file(&inner, request).await }; + let fut = async move { + ::verify_file(&inner, request).await + }; Box::pin(fut) } } @@ -1716,8 +2309,14 @@ pub mod node_service_server { let method = VerifyFileSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1726,12 +2325,23 @@ pub mod node_service_server { "/node_service.NodeService/CheckParts" => { #[allow(non_camel_case_types)] struct CheckPartsSvc(pub Arc); - impl tonic::server::UnaryService for CheckPartsSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for CheckPartsSvc { type Response = super::CheckPartsResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::check_parts(&inner, request).await }; + let fut = async move { + ::check_parts(&inner, request).await + }; Box::pin(fut) } } @@ -1744,8 +2354,14 @@ pub mod node_service_server { let method = CheckPartsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1754,12 +2370,23 @@ pub mod node_service_server { "/node_service.NodeService/RenamePart" => { #[allow(non_camel_case_types)] struct RenamePartSvc(pub Arc); - impl tonic::server::UnaryService for RenamePartSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RenamePartSvc { type Response = super::RenamePartResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::rename_part(&inner, request).await }; + let fut = async move { + ::rename_part(&inner, request).await + }; Box::pin(fut) } } @@ -1772,8 +2399,14 @@ pub mod node_service_server { let method = RenamePartSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1782,12 +2415,23 @@ pub mod node_service_server { "/node_service.NodeService/RenameFile" => { #[allow(non_camel_case_types)] struct RenameFileSvc(pub Arc); - impl tonic::server::UnaryService for RenameFileSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RenameFileSvc { type Response = super::RenameFileResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::rename_file(&inner, request).await }; + let fut = async move { + ::rename_file(&inner, request).await + }; Box::pin(fut) } } @@ -1800,8 +2444,14 @@ pub mod node_service_server { let method = RenameFileSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1810,12 +2460,21 @@ pub mod node_service_server { "/node_service.NodeService/Write" => { #[allow(non_camel_case_types)] struct WriteSvc(pub Arc); - impl tonic::server::UnaryService for WriteSvc { + impl tonic::server::UnaryService + for WriteSvc { type Response = super::WriteResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write(&inner, request).await }; + let fut = async move { + ::write(&inner, request).await + }; Box::pin(fut) } } @@ -1828,8 +2487,14 @@ pub mod node_service_server { let method = WriteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1838,13 +2503,26 @@ pub mod node_service_server { "/node_service.NodeService/WriteStream" => { #[allow(non_camel_case_types)] struct WriteStreamSvc(pub Arc); - impl tonic::server::StreamingService for WriteStreamSvc { + impl< + T: NodeService, + > tonic::server::StreamingService + for WriteStreamSvc { type Response = super::WriteResponse; type ResponseStream = T::WriteStreamStream; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request>) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write_stream(&inner, request).await }; + let fut = async move { + ::write_stream(&inner, request).await + }; Box::pin(fut) } } @@ -1857,8 +2535,14 @@ pub mod node_service_server { let method = WriteStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -1867,13 +2551,26 @@ pub mod node_service_server { "/node_service.NodeService/ReadAt" => { #[allow(non_camel_case_types)] struct ReadAtSvc(pub Arc); - impl tonic::server::StreamingService for ReadAtSvc { + impl< + T: NodeService, + > tonic::server::StreamingService + for ReadAtSvc { type Response = super::ReadAtResponse; type ResponseStream = T::ReadAtStream; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request>) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_at(&inner, request).await }; + let fut = async move { + ::read_at(&inner, request).await + }; Box::pin(fut) } } @@ -1886,8 +2583,14 @@ pub mod node_service_server { let method = ReadAtSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -1896,12 +2599,23 @@ pub mod node_service_server { "/node_service.NodeService/ListDir" => { #[allow(non_camel_case_types)] struct ListDirSvc(pub Arc); - impl tonic::server::UnaryService for ListDirSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ListDirSvc { type Response = super::ListDirResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::list_dir(&inner, request).await }; + let fut = async move { + ::list_dir(&inner, request).await + }; Box::pin(fut) } } @@ -1914,8 +2628,14 @@ pub mod node_service_server { let method = ListDirSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1924,12 +2644,23 @@ pub mod node_service_server { "/node_service.NodeService/WalkDir" => { #[allow(non_camel_case_types)] struct WalkDirSvc(pub Arc); - impl tonic::server::UnaryService for WalkDirSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for WalkDirSvc { type Response = super::WalkDirResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::walk_dir(&inner, request).await }; + let fut = async move { + ::walk_dir(&inner, request).await + }; Box::pin(fut) } } @@ -1942,8 +2673,14 @@ pub mod node_service_server { let method = WalkDirSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1952,12 +2689,23 @@ pub mod node_service_server { "/node_service.NodeService/RenameData" => { #[allow(non_camel_case_types)] struct RenameDataSvc(pub Arc); - impl tonic::server::UnaryService for RenameDataSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RenameDataSvc { type Response = super::RenameDataResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::rename_data(&inner, request).await }; + let fut = async move { + ::rename_data(&inner, request).await + }; Box::pin(fut) } } @@ -1970,8 +2718,14 @@ pub mod node_service_server { let method = RenameDataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1980,12 +2734,23 @@ pub mod node_service_server { "/node_service.NodeService/MakeVolumes" => { #[allow(non_camel_case_types)] struct MakeVolumesSvc(pub Arc); - impl tonic::server::UnaryService for MakeVolumesSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for MakeVolumesSvc { type Response = super::MakeVolumesResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::make_volumes(&inner, request).await }; + let fut = async move { + ::make_volumes(&inner, request).await + }; Box::pin(fut) } } @@ -1998,8 +2763,14 @@ pub mod node_service_server { let method = MakeVolumesSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2008,12 +2779,23 @@ pub mod node_service_server { "/node_service.NodeService/MakeVolume" => { #[allow(non_camel_case_types)] struct MakeVolumeSvc(pub Arc); - impl tonic::server::UnaryService for MakeVolumeSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for MakeVolumeSvc { type Response = super::MakeVolumeResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::make_volume(&inner, request).await }; + let fut = async move { + ::make_volume(&inner, request).await + }; Box::pin(fut) } } @@ -2026,8 +2808,14 @@ pub mod node_service_server { let method = MakeVolumeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2036,12 +2824,23 @@ pub mod node_service_server { "/node_service.NodeService/ListVolumes" => { #[allow(non_camel_case_types)] struct ListVolumesSvc(pub Arc); - impl tonic::server::UnaryService for ListVolumesSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ListVolumesSvc { type Response = super::ListVolumesResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::list_volumes(&inner, request).await }; + let fut = async move { + ::list_volumes(&inner, request).await + }; Box::pin(fut) } } @@ -2054,8 +2853,14 @@ pub mod node_service_server { let method = ListVolumesSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2064,12 +2869,23 @@ pub mod node_service_server { "/node_service.NodeService/StatVolume" => { #[allow(non_camel_case_types)] struct StatVolumeSvc(pub Arc); - impl tonic::server::UnaryService for StatVolumeSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for StatVolumeSvc { type Response = super::StatVolumeResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::stat_volume(&inner, request).await }; + let fut = async move { + ::stat_volume(&inner, request).await + }; Box::pin(fut) } } @@ -2082,8 +2898,14 @@ pub mod node_service_server { let method = StatVolumeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2092,12 +2914,23 @@ pub mod node_service_server { "/node_service.NodeService/DeletePaths" => { #[allow(non_camel_case_types)] struct DeletePathsSvc(pub Arc); - impl tonic::server::UnaryService for DeletePathsSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeletePathsSvc { type Response = super::DeletePathsResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_paths(&inner, request).await }; + let fut = async move { + ::delete_paths(&inner, request).await + }; Box::pin(fut) } } @@ -2110,8 +2943,14 @@ pub mod node_service_server { let method = DeletePathsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2120,12 +2959,23 @@ pub mod node_service_server { "/node_service.NodeService/UpdateMetadata" => { #[allow(non_camel_case_types)] struct UpdateMetadataSvc(pub Arc); - impl tonic::server::UnaryService for UpdateMetadataSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for UpdateMetadataSvc { type Response = super::UpdateMetadataResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::update_metadata(&inner, request).await }; + let fut = async move { + ::update_metadata(&inner, request).await + }; Box::pin(fut) } } @@ -2138,8 +2988,14 @@ pub mod node_service_server { let method = UpdateMetadataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2148,12 +3004,23 @@ pub mod node_service_server { "/node_service.NodeService/WriteMetadata" => { #[allow(non_camel_case_types)] struct WriteMetadataSvc(pub Arc); - impl tonic::server::UnaryService for WriteMetadataSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for WriteMetadataSvc { type Response = super::WriteMetadataResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write_metadata(&inner, request).await }; + let fut = async move { + ::write_metadata(&inner, request).await + }; Box::pin(fut) } } @@ -2166,8 +3033,14 @@ pub mod node_service_server { let method = WriteMetadataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2176,12 +3049,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadVersion" => { #[allow(non_camel_case_types)] struct ReadVersionSvc(pub Arc); - impl tonic::server::UnaryService for ReadVersionSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadVersionSvc { type Response = super::ReadVersionResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_version(&inner, request).await }; + let fut = async move { + ::read_version(&inner, request).await + }; Box::pin(fut) } } @@ -2194,8 +3078,14 @@ pub mod node_service_server { let method = ReadVersionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2204,12 +3094,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadXL" => { #[allow(non_camel_case_types)] struct ReadXLSvc(pub Arc); - impl tonic::server::UnaryService for ReadXLSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadXLSvc { type Response = super::ReadXlResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_xl(&inner, request).await }; + let fut = async move { + ::read_xl(&inner, request).await + }; Box::pin(fut) } } @@ -2222,8 +3123,14 @@ pub mod node_service_server { let method = ReadXLSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2232,12 +3139,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteVersion" => { #[allow(non_camel_case_types)] struct DeleteVersionSvc(pub Arc); - impl tonic::server::UnaryService for DeleteVersionSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteVersionSvc { type Response = super::DeleteVersionResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_version(&inner, request).await }; + let fut = async move { + ::delete_version(&inner, request).await + }; Box::pin(fut) } } @@ -2250,8 +3168,14 @@ pub mod node_service_server { let method = DeleteVersionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2260,12 +3184,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteVersions" => { #[allow(non_camel_case_types)] struct DeleteVersionsSvc(pub Arc); - impl tonic::server::UnaryService for DeleteVersionsSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteVersionsSvc { type Response = super::DeleteVersionsResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_versions(&inner, request).await }; + let fut = async move { + ::delete_versions(&inner, request).await + }; Box::pin(fut) } } @@ -2278,8 +3213,14 @@ pub mod node_service_server { let method = DeleteVersionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2288,12 +3229,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadMultiple" => { #[allow(non_camel_case_types)] struct ReadMultipleSvc(pub Arc); - impl tonic::server::UnaryService for ReadMultipleSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadMultipleSvc { type Response = super::ReadMultipleResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_multiple(&inner, request).await }; + let fut = async move { + ::read_multiple(&inner, request).await + }; Box::pin(fut) } } @@ -2306,8 +3258,14 @@ pub mod node_service_server { let method = ReadMultipleSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2316,12 +3274,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteVolume" => { #[allow(non_camel_case_types)] struct DeleteVolumeSvc(pub Arc); - impl tonic::server::UnaryService for DeleteVolumeSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteVolumeSvc { type Response = super::DeleteVolumeResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_volume(&inner, request).await }; + let fut = async move { + ::delete_volume(&inner, request).await + }; Box::pin(fut) } } @@ -2334,8 +3303,14 @@ pub mod node_service_server { let method = DeleteVolumeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2344,12 +3319,23 @@ pub mod node_service_server { "/node_service.NodeService/DiskInfo" => { #[allow(non_camel_case_types)] struct DiskInfoSvc(pub Arc); - impl tonic::server::UnaryService for DiskInfoSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DiskInfoSvc { type Response = super::DiskInfoResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::disk_info(&inner, request).await }; + let fut = async move { + ::disk_info(&inner, request).await + }; Box::pin(fut) } } @@ -2362,8 +3348,14 @@ pub mod node_service_server { let method = DiskInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2372,12 +3364,23 @@ pub mod node_service_server { "/node_service.NodeService/Lock" => { #[allow(non_camel_case_types)] struct LockSvc(pub Arc); - impl tonic::server::UnaryService for LockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for LockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::lock(&inner, request).await }; + let fut = async move { + ::lock(&inner, request).await + }; Box::pin(fut) } } @@ -2390,8 +3393,14 @@ pub mod node_service_server { let method = LockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2400,12 +3409,23 @@ pub mod node_service_server { "/node_service.NodeService/UnLock" => { #[allow(non_camel_case_types)] struct UnLockSvc(pub Arc); - impl tonic::server::UnaryService for UnLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for UnLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::un_lock(&inner, request).await }; + let fut = async move { + ::un_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2418,8 +3438,14 @@ pub mod node_service_server { let method = UnLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2428,12 +3454,23 @@ pub mod node_service_server { "/node_service.NodeService/RLock" => { #[allow(non_camel_case_types)] struct RLockSvc(pub Arc); - impl tonic::server::UnaryService for RLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::r_lock(&inner, request).await }; + let fut = async move { + ::r_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2446,8 +3483,14 @@ pub mod node_service_server { let method = RLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2456,12 +3499,23 @@ pub mod node_service_server { "/node_service.NodeService/RUnLock" => { #[allow(non_camel_case_types)] struct RUnLockSvc(pub Arc); - impl tonic::server::UnaryService for RUnLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RUnLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::r_un_lock(&inner, request).await }; + let fut = async move { + ::r_un_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2474,8 +3528,14 @@ pub mod node_service_server { let method = RUnLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2484,12 +3544,23 @@ pub mod node_service_server { "/node_service.NodeService/ForceUnLock" => { #[allow(non_camel_case_types)] struct ForceUnLockSvc(pub Arc); - impl tonic::server::UnaryService for ForceUnLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ForceUnLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::force_un_lock(&inner, request).await }; + let fut = async move { + ::force_un_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2502,8 +3573,14 @@ pub mod node_service_server { let method = ForceUnLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2512,12 +3589,23 @@ pub mod node_service_server { "/node_service.NodeService/Refresh" => { #[allow(non_camel_case_types)] struct RefreshSvc(pub Arc); - impl tonic::server::UnaryService for RefreshSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RefreshSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::refresh(&inner, request).await }; + let fut = async move { + ::refresh(&inner, request).await + }; Box::pin(fut) } } @@ -2530,20 +3618,36 @@ pub mod node_service_server { let method = RefreshSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - let mut response = http::Response::new(empty_body()); - let headers = response.headers_mut(); - headers.insert(tonic::Status::GRPC_STATUS, (tonic::Code::Unimplemented as i32).into()); - headers.insert(http::header::CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE); - Ok(response) - }), + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } } } } diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 75ebc264..d70c04a1 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -19,15 +19,17 @@ use crate::disk::os::{check_path_length, is_empty_dir}; use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE}; use crate::error::{Error, Result}; use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; -use crate::heal::data_scanner::has_active_rules; -use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry}; +use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, SizeSummary}; +use crate::heal::data_scanner_metric::{globalScannerMetrics, ScannerMetric, ScannerMetrics}; +use crate::heal::data_usage_cache::{self, DataUsageCache, DataUsageEntry}; +use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE}; use crate::heal::heal_commands::HealScanMode; use crate::new_object_layer_fn; use crate::set_disk::{ conv_part_err_to_int, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, CHECK_PART_VOLUME_NOT_FOUND, }; -use crate::store_api::BitrotAlgorithm; +use crate::store_api::{BitrotAlgorithm, StorageAPI}; use crate::utils::fs::{access, lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; use crate::utils::os::get_info; use crate::utils::path::{clean, has_suffix, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR}; @@ -39,13 +41,13 @@ use crate::{ use common::defer; use path_absolutize::Absolutize; use s3s::dto::{ReplicationConfiguration, ReplicationRuleStatus}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::io::Cursor; use std::os::unix::fs::MetadataExt; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use std::{ fs::Metadata, path::{Path, PathBuf}, @@ -1878,7 +1880,7 @@ impl DiskAPI for LocalDisk { } async fn ns_scanner( - &self, + self: Arc, cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, @@ -1904,7 +1906,112 @@ impl DiskAPI for LocalDisk { Some(s) => s, None => return Err(Error::msg("errServerNotInitialized")), }; - todo!() + let loc = self.get_disk_location(); + let disks = store.get_disks(loc.pool_idx.unwrap(), loc.disk_idx.unwrap()).await?; + let disk = self.clone(); + let disk_clone = disk.clone(); + let mut cache = cache.clone(); + cache.info.updates = Some(updates.clone()); + let mut data_usage_info = scan_data_folder( + &disks, + disk, + &cache, + Box::new(move |item: &ScannerItem| { + let mut item = item.clone(); + let disk = disk_clone.clone(); + Box::pin(async move { + if item.path.ends_with(&format!("{}{}", SLASH_SEPARATOR, STORAGE_FORMAT_FILE)) { + return Err(Error::from_string(ERR_SKIP_FILE)); + } + let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject); + let mut res = HashMap::new(); + let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata); + let buf = match disk.read_metadata(item.path.clone()).await { + Ok(buf) => buf, + Err(err) => { + res.insert("err".to_string(), err.to_string()); + stop_fn(&res).await; + return Err(Error::from_string(ERR_SKIP_FILE)); + } + }; + done_sz(buf.len() as u64).await; + res.insert("metasize".to_string(), buf.len().to_string()); + item.transform_meda_dir(); + let meta_cache = MetaCacheEntry { + name: item.object_path().to_string_lossy().to_string(), + metadata: buf, + ..Default::default() + }; + let fivs = match meta_cache.file_info_versions(&item.bucket) { + Ok(fivs) => fivs, + Err(err) => { + res.insert("err".to_string(), err.to_string()); + stop_fn(&res).await; + return Err(Error::from_string(ERR_SKIP_FILE)); + } + }; + let mut size_s = SizeSummary::default(); + let done = ScannerMetrics::time(ScannerMetric::ApplyAll); + let obj_infos = match item.apply_versions_actions(&fivs.versions).await { + Ok(obj_infos) => obj_infos, + Err(err) => { + res.insert("err".to_string(), err.to_string()); + stop_fn(&res).await; + return Err(Error::from_string(ERR_SKIP_FILE)); + } + }; + + let versioned = false; + let mut obj_deleted = false; + for info in obj_infos.iter() { + let done = ScannerMetrics::time(ScannerMetric::ApplyVersion); + let mut sz = 0; + (obj_deleted, sz) = item.apply_actions(&info, &size_s).await; + done().await; + + if obj_deleted { + break; + } + + let actual_sz = match info.get_actual_size() { + Ok(size) => size, + Err(_) => continue, + }; + + if info.delete_marker { + size_s.delete_markers += 1; + } + + if info.version_id.is_some() && sz == actual_sz { + size_s.versions += 1; + } + + size_s.total_size += sz; + + if info.delete_marker { + continue; + } + } + + for frer_version in fivs.free_versions.iter() { + let _obj_info = frer_version.to_object_info(&item.bucket, &item.object_path().to_string_lossy(), versioned); + let done = ScannerMetrics::time(ScannerMetric::TierObjSweep); + done().await; + } + + // todo: global trace + if obj_deleted { + return Err(Error::from_string(ERR_IGNORE_FILE_CONTRIB)); + } + done().await; + Ok(size_s) + }) + }), + scan_mode, + ) + .await?; + data_usage_info.info.last_update = Some(SystemTime::now()); + Ok(data_usage_info) } } diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index a9056d52..880ffd59 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -16,7 +16,7 @@ const STORAGE_FORMAT_FILE: &str = "xl.meta"; use crate::{ erasure::Writer, error::{Error, Result}, - file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion}, + file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion, FileMetaVersion}, heal::{ data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::HealScanMode, @@ -344,14 +344,14 @@ impl DiskAPI for Disk { } async fn ns_scanner( - &self, + self: Arc, cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, ) -> Result { - match self { - Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await, - Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await, + match &*self { + Disk::Local(local_disk) => Arc::new(local_disk).ns_scanner(cache, updates, scan_mode).await, + Disk::Remote(remote_disk) => Arc::new(remote_disk).ns_scanner(cache, updates, scan_mode).await, } } } @@ -453,7 +453,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn read_all(&self, volume: &str, path: &str) -> Result>; async fn disk_info(&self, opts: &DiskInfoOptions) -> Result; async fn ns_scanner( - &self, + self: Arc, cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 4eba96bf..fe7df93b 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use futures::lock::Mutex; use protos::{ @@ -736,7 +736,7 @@ impl DiskAPI for RemoteDisk { } async fn ns_scanner( - &self, + self: Arc, cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index b9d82f2e..0ad14e23 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -12,7 +12,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use byteorder::{LittleEndian, ReadBytesExt}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use lazy_static::lazy_static; use rand::Rng; use rmp_serde::{Deserializer, Serializer}; @@ -29,12 +29,11 @@ use tokio::{ use tracing::{error, info}; use super::{ - data_scanner_metric::globalScannerMetrics, + data_scanner_metric::{globalScannerMetrics, ScannerMetric, ScannerMetrics}, data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, }; -use crate::disk::DiskAPI; use crate::heal::data_scanner_metric::current_path_updater; use crate::heal::data_usage::DATA_USAGE_ROOT; use crate::{ @@ -58,6 +57,10 @@ use crate::{ store::{ECStore, ListPathOptions}, utils::path::{path_join, path_to_bucket_object, path_to_bucket_object_with_base_path, SLASH_SEPARATOR}, }; +use crate::{ + disk::DiskAPI, + store_api::{FileInfo, ObjectInfo}, +}; const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders. const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles. @@ -132,6 +135,7 @@ async fn run_data_scanner() { } loop { + let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); cycle_info.current = cycle_info.next; cycle_info.started = SystemTime::now(); { @@ -155,6 +159,26 @@ async fn run_data_scanner() { tokio::spawn(async { store_data_usage_in_backend(rx).await; }); + let mut res = HashMap::new(); + res.insert("cycle".to_string(), cycle_info.current.to_string()); + match store.ns_scanner(tx, cycle_info.current as usize, scan_mode).await { + Ok(_) => { + cycle_info.next += 1; + cycle_info.current = 0; + cycle_info.cycle_completed.push(SystemTime::now()); + if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { + cycle_info.cycle_completed = cycle_info.cycle_completed[cycle_info.cycle_completed.len() - DATA_USAGE_UPDATE_DIR_CYCLES as usize..].to_vec(); + } + globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; + let mut tmp = Vec::new(); + tmp.write_u64::(cycle_info.next).unwrap(); + let _ = save_config(store, &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await; + }, + Err(err) => { + res.insert("error".to_string(), err.to_string()); + }, + } + stop_fn(&res).await; sleep(Duration::from_secs(SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst))).await; } } @@ -241,7 +265,7 @@ impl Default for CurrentScannerCycle { } impl CurrentScannerCycle { - pub fn marshal_msg(&self) -> Result> { + pub fn marshal_msg(&self, buf: &[u8]) -> Result> { let len: u32 = 4; let mut wr = Vec::new(); @@ -267,8 +291,9 @@ impl CurrentScannerCycle { .serialize(&mut Serializer::new(&mut buf)) .expect("Serialization failed"); rmp::encode::write_bin(&mut wr, &buf)?; - - Ok(wr) + let mut result = buf.to_vec(); + result.extend(wr.iter()); + Ok(result) } #[tracing::instrument] @@ -295,10 +320,10 @@ impl CurrentScannerCycle { self.current = u; } - "next" => { - let u: u64 = rmp::decode::read_int(&mut cur)?; - self.next = u; - } + // "next" => { + // let u: u64 = rmp::decode::read_int(&mut cur)?; + // self.next = u; + // } "started" => { let u: u64 = rmp::decode::read_int(&mut cur)?; let started = timestamp_to_system_time(u); @@ -329,22 +354,23 @@ fn timestamp_to_system_time(timestamp: u64) -> SystemTime { UNIX_EPOCH + std::time::Duration::new(timestamp, 0) } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] struct Heal { enabled: bool, bitrot: bool, } +#[derive(Clone)] pub struct ScannerItem { - path: String, - bucket: String, - prefix: String, - object_name: String, - replication: Option, + pub path: String, + pub bucket: String, + pub prefix: String, + pub object_name: String, + pub replication: Option, // todo: lifecycle // typ: fs::Permissions, - heal: Heal, - debug: bool, + pub heal: Heal, + pub debug: bool, } impl ScannerItem { @@ -365,6 +391,43 @@ impl ScannerItem { pub fn object_path(&self) -> PathBuf { path_join(&[PathBuf::from(self.prefix.clone()), PathBuf::from(self.object_name.clone())]) } + + pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result> { + let obj_infos = self.apply_newer_noncurrent_version_limit(fivs).await?; + if obj_infos.len() >= SCANNER_EXCESS_OBJECT_VERSIONS.load(Ordering::SeqCst).try_into().unwrap() { + // todo + } + + let mut cumulative_size = 0; + for obj_info in obj_infos.iter() { + cumulative_size += obj_info.size; + } + + if cumulative_size >= SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE.load(Ordering::SeqCst).try_into().unwrap() { + //todo + } + + Ok(obj_infos) + } + + pub async fn apply_newer_noncurrent_version_limit(&self, fivs: &[FileInfo]) -> Result> { + let done = ScannerMetrics::time(ScannerMetric::ApplyNonCurrent); + let mut object_infos = Vec::new(); + for info in fivs.iter() { + object_infos.push(info.to_object_info(&self.bucket, &self.object_path().to_string_lossy(), false)); + } + done().await; + + Ok(object_infos) + } + + pub async fn apply_actions(&self, oi: &ObjectInfo, size_s: &SizeSummary) -> (bool, usize) { + let done = ScannerMetrics::time(ScannerMetric::Ilm); + //todo: lifecycle + done().await; + + (false, 0) + } } #[derive(Debug, Default)] @@ -420,7 +483,7 @@ struct FolderScanner { last_update: SystemTime, update_current_path: UpdateCurrentPathFn, skip_heal: AtomicBool, - drive: DiskStore, + drive: LocalDrive, } impl FolderScanner { @@ -951,9 +1014,10 @@ pub fn has_active_rules(config: &ReplicationConfiguration, prefix: &str, recursi false } +pub type LocalDrive = Arc; pub async fn scan_data_folder( disks: &[Option], - drive: &DiskStore, + drive: LocalDrive, cache: &DataUsageCache, get_size_fn: GetSizeFn, heal_scan_mode: HealScanMode, @@ -964,6 +1028,11 @@ pub async fn scan_data_folder( let base_path = drive.to_string(); let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name); + let skip_heal = if *GLOBAL_IsErasure.read().await || cache.info.skip_healing { + AtomicBool::new(true) + } else { + AtomicBool::new(false) + }; let mut s = FolderScanner { root: base_path, get_size: get_size_fn, @@ -978,11 +1047,7 @@ pub async fn scan_data_folder( update_current_path: update_path, disks: disks.to_vec(), disks_quorum: disks.len() / 2, - skip_heal: if *GLOBAL_IsErasure.read().await || cache.info.skip_healing { - AtomicBool::new(true) - } else { - AtomicBool::new(false) - }, + skip_heal, drive: drive.clone(), }; @@ -1002,7 +1067,7 @@ pub async fn scan_data_folder( } s.new_cache .force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN.try_into().unwrap()); - s.new_cache.info.last_update = SystemTime::now(); + s.new_cache.info.last_update = Some(SystemTime::now()); s.new_cache.info.next_cycle = cache.info.next_cycle; close_disk().await; Ok(s.new_cache) diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index 4947aa81..325beaa5 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -1,6 +1,10 @@ +use common::last_minute::{AccElem, LastMinuteLatency}; use lazy_static::lazy_static; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::AtomicU64; +use std::sync::Once; +use std::time::{Duration, UNIX_EPOCH}; use std::{ collections::HashMap, sync::{ @@ -53,8 +57,75 @@ pub enum ScannerMetric { Last, } +static INIT: Once = Once::new(); + +#[derive(Default)] +pub struct LockedLastMinuteLatency { + cached_sec: AtomicU64, + cached: AccElem, + mu: RwLock, + latency: LastMinuteLatency, +} + +impl Clone for LockedLastMinuteLatency { + fn clone(&self) -> Self { + Self { + cached_sec: AtomicU64::new(0), + cached: self.cached.clone(), + mu: RwLock::new(true), + latency: self.latency.clone(), + } + } +} + +impl LockedLastMinuteLatency { + pub fn add(&mut self, value: &Duration) { + self.add_size(value, 0); + } + + pub async fn add_size(&mut self, value: &Duration, sz: u64) { + let t = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + INIT.call_once(|| { + self.cached = AccElem::default(); + self.cached_sec.store(t, Ordering::SeqCst); + }); + let last_t = self.cached_sec.load(Ordering::SeqCst); + if last_t != t + && self + .cached_sec + .compare_exchange(last_t, t, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + let old = self.cached.clone(); + self.cached = AccElem::default(); + let mut a = AccElem::default(); + a.size = old.size; + a.total = old.total; + a.n = old.n; + self.mu.write().await; + self.latency.add_all(t - 1, &a); + } + self.cached.n += 1; + self.cached.total += value.as_secs(); + self.cached.size != sz; + } + + pub async fn total(&mut self) -> AccElem { + self.mu.read().await; + self.latency.get_total() + } +} + +pub type LogFn = Arc) -> Pin + Send>> + Send + Sync + 'static>; +pub type TimeSizeFn = Arc Pin + Send>> + Send + Sync + 'static>; +pub type TimeFn = Arc Pin + Send>> + Send + Sync + 'static>; + pub struct ScannerMetrics { operations: Vec, + latency: Vec, cycle_info: RwLock>, current_paths: HashMap, } @@ -63,23 +134,63 @@ impl ScannerMetrics { pub fn new() -> Self { Self { operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU32::new(0)).collect(), + latency: vec![LockedLastMinuteLatency::default(); ScannerMetric::LastRealtime as usize], cycle_info: RwLock::new(None), current_paths: HashMap::new(), } } - pub fn log(&mut self, s: ScannerMetric, _paths: &[String], _custom: &HashMap, _start_time: SystemTime) { - // let duration = start_time.duration_since(start_time); - self.operations[s.clone() as usize].fetch_add(1, Ordering::SeqCst); - // Dodo - } - pub async fn set_cycle(&mut self, c: Option) { *self.cycle_info.write().await = c; } + + pub fn log(s: ScannerMetric) -> LogFn { + let start = SystemTime::now(); + let s_clone = s as usize; + Arc::new(move |custom: &HashMap| { + Box::pin(async move { + let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); + let mut sm_w = globalScannerMetrics.write().await; + sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); + if s_clone < ScannerMetric::LastRealtime as usize { + sm_w.latency[s_clone].add(&duration); + } + }) + }) + } + + pub fn time_size(s: ScannerMetric) -> TimeSizeFn { + let start = SystemTime::now(); + let s_clone = s as usize; + Arc::new(move |sz: u64| { + Box::pin(async move { + let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); + let mut sm_w = globalScannerMetrics.write().await; + sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); + if s_clone < ScannerMetric::LastRealtime as usize { + sm_w.latency[s_clone].add_size(&duration, sz); + } + }) + }) + } + + pub fn time(s: ScannerMetric) -> TimeFn { + let start = SystemTime::now(); + let s_clone = s as usize; + Arc::new(move || { + Box::pin(async move { + let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); + let mut sm_w = globalScannerMetrics.write().await; + sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); + if s_clone < ScannerMetric::LastRealtime as usize { + sm_w.latency[s_clone].add(&duration); + } + }) + }) + } } -pub type CloseDiskFn = Arc Pin + Send>> + Send + 'static>; +pub type CloseDiskFn = Arc Pin + Send>> + Send + Sync + 'static>; pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { let disk_1 = disk.to_string(); let disk_2 = disk.to_string(); diff --git a/ecstore/src/heal/data_usage.rs b/ecstore/src/heal/data_usage.rs index 3ffa3453..75dce2bf 100644 --- a/ecstore/src/heal/data_usage.rs +++ b/ecstore/src/heal/data_usage.rs @@ -34,13 +34,13 @@ lazy_static! { // - replica failed count #[derive(Debug, Default, Serialize, Deserialize)] pub struct BucketTargetUsageInfo { - replication_pending_size: u64, - replication_failed_size: u64, - replicated_size: u64, - replica_size: u64, - replication_pending_count: u64, - replication_failed_count: u64, - replicated_count: u64, + pub replication_pending_size: u64, + pub replication_failed_size: u64, + pub replicated_size: u64, + pub replica_size: u64, + pub replication_pending_count: u64, + pub replication_failed_count: u64, + pub replicated_count: u64, } // BucketUsageInfo - bucket usage info provides @@ -49,82 +49,63 @@ pub struct BucketTargetUsageInfo { // - object size histogram per bucket #[derive(Debug, Default, Serialize, Deserialize)] pub struct BucketUsageInfo { - size: u64, + pub size: u64, // Following five fields suffixed with V1 are here for backward compatibility // Total Size for objects that have not yet been replicated - replication_pending_size_v1: u64, + pub replication_pending_size_v1: u64, // Total size for objects that have witness one or more failures and will be retried - replication_failed_size_v1: u64, + pub replication_failed_size_v1: u64, // Total size for objects that have been replicated to destination - replicated_size_v1: u64, + pub replicated_size_v1: u64, // Total number of objects pending replication - replication_pending_count_v1: u64, + pub replication_pending_count_v1: u64, // Total number of objects that failed replication - replication_failed_count_v1: u64, + pub replication_failed_count_v1: u64, - objects_count: u64, - object_size_histogram: HashMap, - object_versions_histogram: HashMap, - versions_count: u64, - delete_markers_count: u64, - replica_size: u64, - replica_count: u64, - replication_info: HashMap, + pub objects_count: u64, + pub object_size_histogram: HashMap, + pub object_versions_histogram: HashMap, + pub versions_count: u64, + pub delete_markers_count: u64, + pub replica_size: u64, + pub replica_count: u64, + pub replication_info: HashMap, } // DataUsageInfo represents data usage stats of the underlying Object API -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct DataUsageInfo { - total_capacity: u64, - total_used_capacity: u64, - total_free_capacity: u64, + pub total_capacity: u64, + pub total_used_capacity: u64, + pub total_free_capacity: u64, // LastUpdate is the timestamp of when the data usage info was last updated. // This does not indicate a full scan. - last_update: SystemTime, + pub last_update: Option, // Objects total count across all buckets - objects_total_count: u64, + pub objects_total_count: u64, // Versions total count across all buckets - versions_total_count: u64, + pub versions_total_count: u64, // Delete markers total count across all buckets - delete_markers_total_count: u64, + pub delete_markers_total_count: u64, // Objects total size across all buckets - objects_total_size: u64, - replication_info: HashMap, + pub objects_total_size: u64, + pub replication_info: HashMap, // Total number of buckets in this cluster - buckets_count: u64, + pub buckets_count: u64, // Buckets usage info provides following information across all buckets // - total size of the bucket // - total objects in a bucket // - object size histogram per bucket - buckets_usage: HashMap, + pub buckets_usage: HashMap, // Deprecated kept here for backward compatibility reasons. - bucket_sizes: HashMap, + pub bucket_sizes: HashMap, // Todo: TierStats // TierStats contains per-tier stats of all configured remote tiers } -impl Default for DataUsageInfo { - fn default() -> Self { - Self { - total_capacity: Default::default(), - total_used_capacity: Default::default(), - total_free_capacity: Default::default(), - last_update: SystemTime::now(), - objects_total_count: Default::default(), - versions_total_count: Default::default(), - delete_markers_total_count: Default::default(), - objects_total_size: Default::default(), - replication_info: Default::default(), - buckets_count: Default::default(), - buckets_usage: Default::default(), - bucket_sizes: Default::default(), - } - } -} - pub async fn store_data_usage_in_backend(mut rx: Receiver) { let layer = new_object_layer_fn(); let lock = layer.read().await; diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index 820bc858..9da7b635 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -4,7 +4,8 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; use crate::error::{Error, Result}; use crate::new_object_layer_fn; use crate::set_disk::SetDisks; -use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions}; +use crate::store_api::{BucketInfo, HTTPRangeSpec, ObjectIO, ObjectOptions}; +use bytes::Bytes; use bytesize::ByteSize; use http::HeaderMap; use path_clean::PathClean; @@ -23,7 +24,7 @@ use tokio::sync::mpsc::Sender; use tokio::time::sleep; use super::data_scanner::{SizeSummary, DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS}; -use super::data_usage::DATA_USAGE_ROOT; +use super::data_usage::{BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, DATA_USAGE_ROOT}; // DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals pub const DATA_USAGE_BUCKET_LEN: usize = 11; @@ -152,6 +153,22 @@ impl SizeHistogram { } } } + + pub fn to_map(&self) -> HashMap { + let mut res = HashMap::new(); + let mut spl_count = 0; + for (count, oh) in self.0.iter().zip(OBJECTS_HISTOGRAM_INTERVALS.iter()) { + if ByteSize::kib(1).as_u64() == oh.start && oh.end == ByteSize::mib(1).as_u64() - 1 { + res.insert(oh.name.to_string(), spl_count); + } else if ByteSize::kib(1).as_u64() <= oh.start && oh.end <= ByteSize::mib(1).as_u64() - 1 { + spl_count += count; + res.insert(oh.name.to_string(), *count); + } else { + res.insert(oh.name.to_string(), *count); + } + } + res + } } // versionsHistogram is a histogram of number of versions in an object. @@ -173,6 +190,14 @@ impl VersionsHistogram { } } } + + pub fn to_map(&self) -> HashMap { + let mut res = HashMap::new(); + for (count, ov) in self.0.iter().zip(OBJECTS_VERSION_COUNT_INTERVALS.iter()) { + res.insert(ov.name.to_string(), *count); + } + res + } } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -319,11 +344,11 @@ pub struct DataUsageEntryInfo { pub entry: DataUsageEntry, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Default, Serialize, Deserialize)] pub struct DataUsageCacheInfo { pub name: String, pub next_cycle: u32, - pub last_update: SystemTime, + pub last_update: Option, pub skip_healing: bool, // todo: life_cycle // pub life_cycle: @@ -333,18 +358,18 @@ pub struct DataUsageCacheInfo { pub replication: Option, } -impl Default for DataUsageCacheInfo { - fn default() -> Self { - Self { - name: Default::default(), - next_cycle: Default::default(), - last_update: SystemTime::now(), - skip_healing: Default::default(), - updates: Default::default(), - replication: Default::default(), - } - } -} +// impl Default for DataUsageCacheInfo { +// fn default() -> Self { +// Self { +// name: Default::default(), +// next_cycle: Default::default(), +// last_update: SystemTime::now(), +// skip_healing: Default::default(), +// updates: Default::default(), +// replication: Default::default(), +// } +// } +// } #[derive(Clone, Default, Serialize, Deserialize)] pub struct DataUsageCache { @@ -410,8 +435,11 @@ impl DataUsageCache { }, } retries += 1; - let mut rng = rand::thread_rng(); - sleep(Duration::from_millis(rng.gen_range(0..1_000))).await; + let dur = { + let mut rng = rand::thread_rng(); + rng.gen_range(0..1_000) + }; + sleep(Duration::from_millis(dur)).await; } Ok(d) } @@ -655,6 +683,100 @@ impl DataUsageCache { n } + pub fn merge(&mut self, o: &DataUsageCache) { + let mut existing_root = self.root(); + let other_root = o.root(); + if existing_root.is_none() && other_root.is_none() { + return; + } + if other_root.is_none() { + return; + } + if existing_root.is_none() { + *self = o.clone(); + return; + } + if o.info.last_update.gt(&self.info.last_update) { + self.info.last_update = o.info.last_update; + } + + existing_root.as_mut().unwrap().merge(other_root.as_ref().unwrap()); + self.cache.insert(hash_path(&self.info.name).key(), existing_root.unwrap()); + let e_hash = self.root_hash(); + for key in other_root.as_ref().unwrap().children.iter() { + let entry = &o.cache[key]; + let flat = o.flatten(entry); + let mut existing = self.cache[key].clone(); + existing.merge(&flat); + self.replace_hashed(&DataUsageHash(key.clone()), &Some(e_hash.clone()), &existing); + } + } + + pub fn root_hash(&self) -> DataUsageHash { + hash_path(&self.info.name) + } + + pub fn root(&self) -> Option { + self.find(&self.info.name) + } + + pub fn dui(&self, path: &str, buckets: &[BucketInfo]) -> DataUsageInfo { + let e = match self.find(path) { + Some(e) => e, + None => return DataUsageInfo::default(), + }; + let flat = self.flatten(&e); + let dui = DataUsageInfo { + last_update: self.info.last_update, + objects_total_count: flat.objects as u64, + versions_total_count: flat.versions as u64, + delete_markers_total_count: flat.delete_markers as u64, + objects_total_size: flat.size as u64, + buckets_count: e.children.len() as u64, + buckets_usage: self.buckets_usage_info(buckets), + ..Default::default() + }; + dui + } + + pub fn buckets_usage_info(&self, buckets: &[BucketInfo]) -> HashMap { + let mut dst = HashMap::new(); + for bucket in buckets.iter() { + let e = match self.find(&bucket.name) { + Some(e) => e, + None => continue, + }; + let flat = self.flatten(&e); + let mut bui = BucketUsageInfo { + size: flat.size as u64, + versions_count: flat.versions as u64, + objects_count: flat.objects as u64, + delete_markers_count: flat.delete_markers as u64, + object_size_histogram: flat.obj_sizes.to_map(), + object_versions_histogram: flat.obj_versions.to_map(), + ..Default::default() + }; + if let Some(rs) = &flat.replication_stats { + bui.replica_size = rs.replica_size; + bui.replica_count = rs.replica_count; + + for (arn, stat) in rs.targets.iter() { + bui.replication_info.insert(arn.clone(), BucketTargetUsageInfo { + replication_pending_size: stat.pending_size, + replicated_size: stat.replicated_size, + replication_failed_size: stat.failed_size, + replication_pending_count: stat.pending_count, + replication_failed_count: stat.failed_count, + replicated_count: stat.replicated_count, + ..Default::default() + }); + } + } + dst.insert(bucket.name.clone(), bui); + } + dst + } + pub fn marshal_msg(&self) -> Result> { let mut buf = Vec::new(); diff --git a/ecstore/src/heal/error.rs b/ecstore/src/heal/error.rs index 1b6cd674..ff6c90f7 100644 --- a/ecstore/src/heal/error.rs +++ b/ecstore/src/heal/error.rs @@ -1 +1,2 @@ pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage"; +pub const ERR_SKIP_FILE: &str = "skip this file"; diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 27e663b9..3d0ddcac 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -10,7 +10,7 @@ use crate::global::{ is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION, DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, }; -use crate::heal::data_usage::DataUsageInfo; +use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT}; use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA}; use crate::heal::heal_ops::HealObjectFn; use crate::new_object_layer_fn; @@ -44,19 +44,21 @@ use http::HeaderMap; use lazy_static::lazy_static; use rand::Rng; use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration}; +use tokio::time::interval; use std::cmp::Ordering; use std::slice::Iter; +use std::time::SystemTime; use std::{ collections::{HashMap, HashSet}, sync::Arc, time::Duration, }; use time::OffsetDateTime; -use tokio::fs; +use tokio::{fs, select}; use tokio::sync::mpsc::Sender; -use tokio::sync::{mpsc, RwLock, Semaphore}; +use tokio::sync::{broadcast, mpsc, RwLock, Semaphore}; -use crate::heal::data_usage_cache::DataUsageCache; +use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo}; use tracing::{debug, info, warn}; use uuid::Uuid; @@ -509,11 +511,16 @@ impl ECStore { total_results += pool.disk_set.len(); }); let mut results = Arc::new(RwLock::new(vec![DataUsageCache::default(); total_results])); + let (cancel, _) = broadcast::channel(100); + let first_err = Arc::new(RwLock::new(None)); let mut futures = Vec::new(); for pool in self.pools.iter() { for set in pool.disk_set.iter() { let index = result_index; let results_clone = results.clone(); + let first_err_clone = first_err.clone(); + let cancel_clone = cancel.clone(); + let all_buckets_clone = all_buckets.clone(); futures.push(async move { let (tx, mut rx) = mpsc::channel(100); let task = tokio::spawn(async move { @@ -528,12 +535,61 @@ impl ECStore { } } }); - + if let Err(err) = set.ns_scanner(&all_buckets_clone, want_cycle.try_into().unwrap(), tx, heal_scan_mode).await { + let mut f_w = first_err_clone.write().await; + if f_w.is_none() { + *f_w = Some(err); + } + let _ = cancel_clone.send(true); + return ; + } let _ = task.await; }); result_index += 1; } } + let (update_closer_tx, mut update_close_rx) = mpsc::channel(10); + let mut ctx_clone = cancel.subscribe(); + let all_buckets_clone = all_buckets.clone(); + let task = tokio::spawn(async move { + let mut last_update = None; + let mut interval = interval(Duration::from_secs(30)); + let all_merged = Arc::new(RwLock::new(DataUsageCache::default())); + loop { + select! { + _ = ctx_clone.recv() => { + return; + } + _ = update_close_rx.recv() => { + last_update = match tokio::spawn(update_scan(all_merged.clone(), results.clone(), last_update.clone(), all_buckets_clone.clone(), updates.clone())).await { + Ok(v) => v, + Err(_) => return, + }; + return; + } + _ = interval.tick() => { + last_update = match tokio::spawn(update_scan(all_merged.clone(), results.clone(), last_update.clone(), all_buckets_clone.clone(), updates.clone())).await { + Ok(v) => v, + Err(_) => return, + }; + } + } + } + }); + let _ = join_all(futures).await; + let mut ctx_closer = cancel.subscribe(); + select! { + _ = update_closer_tx.send(true) => { + + } + _ = ctx_closer.recv() => { + + } + } + let _ = task.await; + if let Some(err) = first_err.read().await.as_ref() { + return Err(err.clone()); + } Ok(()) } @@ -632,6 +688,28 @@ impl ECStore { } } +async fn update_scan(all_merged: Arc>, results: Arc>>, last_update: Option, all_buckets: Vec, updates: Sender) -> Option { + let mut w = all_merged.write().await; + *w = DataUsageCache { + info: DataUsageCacheInfo { + name: DATA_USAGE_ROOT.to_string(), + ..Default::default() + }, + ..Default::default() + }; + for info in results.read().await.iter() { + if info.info.last_update.is_none() { + return last_update; + } + w.merge(info); + } + if w.info.last_update > last_update && w.root().is_none() { + let _ = updates.send(w.dui(&w.info.name, &all_buckets)).await; + return w.info.last_update; + } + last_update +} + pub async fn find_local_disk(disk_path: &String) -> Option { let disk_path = match fs::canonicalize(disk_path).await { Ok(disk_path) => disk_path,