diff --git a/common/protos/src/generated/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs index c2f95718..9d13aa58 100644 --- a/common/protos/src/generated/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -128,6 +128,48 @@ pub struct DeleteResponse { pub error_info: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct VerifyFileRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub file_info: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VerifyFileResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub check_parts_resp: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CheckPartsRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub file_info: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CheckPartsResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub check_parts_resp: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct RenamePartRequst { #[prost(string, tag = "1")] pub disk: ::prost::alloc::string::String, @@ -814,6 +856,54 @@ pub mod node_service_client { .insert(GrpcMethod::new("node_service.NodeService", "Delete")); self.inner.unary(req, path, codec).await } + pub async fn verify_file( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/VerifyFile", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "VerifyFile")); + self.inner.unary(req, path, codec).await + } + pub async fn check_parts( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/CheckParts", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "CheckParts")); + self.inner.unary(req, path, codec).await + } pub async fn rename_part( &mut self, request: impl tonic::IntoRequest, @@ -1544,6 +1634,20 @@ pub mod node_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn verify_file( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn check_parts( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn rename_part( &self, request: tonic::Request, @@ -2173,6 +2277,96 @@ pub mod node_service_server { }; Box::pin(fut) } + "/node_service.NodeService/VerifyFile" => { + #[allow(non_camel_case_types)] + struct VerifyFileSvc(pub Arc); + impl< + T: NodeService, + > tonic::server::UnaryService + for VerifyFileSvc { + type Response = super::VerifyFileResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::verify_file(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = VerifyFileSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/CheckParts" => { + #[allow(non_camel_case_types)] + struct CheckPartsSvc(pub Arc); + impl< + T: NodeService, + > tonic::server::UnaryService + for CheckPartsSvc { + type Response = super::CheckPartsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::check_parts(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CheckPartsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/node_service.NodeService/RenamePart" => { #[allow(non_camel_case_types)] struct RenamePartSvc(pub Arc); diff --git a/common/protos/src/node.proto b/common/protos/src/node.proto index 51ac6cbc..f789a302 100644 --- a/common/protos/src/node.proto +++ b/common/protos/src/node.proto @@ -88,6 +88,32 @@ message DeleteResponse { optional string error_info = 2; } +message VerifyFileRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + string file_info = 4; +} + +message VerifyFileResponse { + bool success = 1; + string check_parts_resp = 2; + optional string error_info = 3; +} + +message CheckPartsRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + string file_info = 4; +} + +message CheckPartsResponse { + bool success = 1; + string check_parts_resp = 2; + optional string error_info = 3; +} + message RenamePartRequst { string disk = 1; string src_volume = 2; @@ -381,6 +407,8 @@ service NodeService { rpc ReadAll(ReadAllRequest) returns (ReadAllResponse) {}; rpc WriteAll(WriteAllRequest) returns (WriteAllResponse) {}; rpc Delete(DeleteRequest) returns (DeleteResponse) {}; + rpc VerifyFile(VerifyFileRequest) returns (VerifyFileResponse) {}; + rpc CheckParts(CheckPartsRequest) returns (CheckPartsResponse) {}; rpc RenamePart(RenamePartRequst) returns (RenamePartResponse) {}; rpc RenameFile(RenameFileRequst) returns (RenameFileResponse) {}; rpc Write(WriteRequest) returns (WriteResponse) {}; diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index 57744c6d..7d24aefa 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -1,4 +1,8 @@ -use std::{any::Any, collections::HashMap, io::Cursor}; +use std::{ + any::Any, + collections::HashMap, + io::{Cursor, Read}, +}; use blake2::Blake2b512; use highway::{HighwayHash, HighwayHasher, Key}; @@ -205,21 +209,49 @@ pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: BitrotAlgori } pub fn bitrot_verify( - r: Cursor>, - _want_size: usize, - _part_size: usize, + r: &mut Cursor>, + want_size: usize, + part_size: usize, algo: BitrotAlgorithm, want: Vec, - _shard_size: usize, + mut shard_size: usize, ) -> Result<()> { if algo != BitrotAlgorithm::HighwayHash256S { let mut h = algo.new(); - h.update(r.into_inner()); + h.update(r.get_ref()); if h.finalize() != want { return Err(Error::new(DiskError::FileCorrupt)); } + + return Ok(()); } - todo!() + let mut h = algo.new(); + let mut hash_buf = vec![0; h.size()]; + let mut left = want_size; + + if left != bitrot_shard_file_size(part_size, shard_size, algo) { + return Err(Error::new(DiskError::FileCorrupt)); + } + + while left > 0 { + h.reset(); + let n = r.read(&mut hash_buf)?; + left -= n; + + if left < shard_size { + shard_size = left; + } + + let mut buf = vec![0; shard_size]; + let read = r.read(&mut buf)?; + h.update(buf); + left -= read; + if h.clone().finalize() != hash_buf[0..n] { + return Err(Error::new(DiskError::FileCorrupt)); + } + } + + Ok(()) } pub struct WholeBitrotWriter { @@ -295,7 +327,7 @@ impl ReadAt for WholeBitrotReader { return Err(Error::new(DiskError::LessData)); } - return Ok((buf.split_off(length), length)); + return Ok((buf.drain(0..length).collect::>(), length)); } Err(Error::new(DiskError::LessData)) @@ -453,10 +485,7 @@ mod test { use tempfile::TempDir; use crate::{ - bitrot::{new_bitrot_writer, BITROT_ALGORITHMS}, - disk::{endpoint::Endpoint, error::DiskError, new_disk, DiskOption}, - error::{Error, Result}, - store_api::BitrotAlgorithm, + bitrot::{new_bitrot_writer, BITROT_ALGORITHMS}, disk::{endpoint::Endpoint, error::DiskError, new_disk, DiskOption}, error::{Error, Result}, store_api::BitrotAlgorithm }; use super::{bitrot_writer_sum, new_bitrot_reader}; @@ -512,9 +541,6 @@ mod test { #[tokio::test] async fn test_all_bitrot_algorithms() -> Result<()> { for algo in BITROT_ALGORITHMS.keys() { - if *algo != BitrotAlgorithm::HighwayHash256S { - continue; - } test_bitrot_reader_writer_algo(algo.clone()).await?; } diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index da8c7063..8fbfbf47 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -100,6 +100,12 @@ pub enum DiskError { #[error("more data was sent than what was advertised")] MoreData, + + #[error("outdated XL meta")] + OutdatedXLMeta, + + #[error("part missing or corrupt")] + PartMissingOrCorrupt, } impl DiskError { @@ -202,6 +208,8 @@ pub fn clone_disk_err(e: &DiskError) -> Error { DiskError::CrossDeviceLink => Error::new(DiskError::CrossDeviceLink), DiskError::LessData => Error::new(DiskError::LessData), DiskError::MoreData => Error::new(DiskError::MoreData), + DiskError::OutdatedXLMeta => Error::new(DiskError::OutdatedXLMeta), + DiskError::PartMissingOrCorrupt => Error::new(DiskError::PartMissingOrCorrupt), } } diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 31bb69a9..1450d0b4 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -6,6 +6,7 @@ use super::{ FileReader, FileWriter, Info, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, }; +use crate::bitrot::bitrot_verify; use crate::cache_value::cache::{Cache, Opts}; use crate::disk::error::{ convert_access_error, is_sys_err_handle_invalid, is_sys_err_invalid_arg, is_sys_err_is_dir, is_sys_err_not_dir, @@ -15,8 +16,9 @@ use crate::disk::os::check_path_length; use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE}; use crate::error::{Error, Result}; use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; +use crate::set_disk::{conv_part_err_to_int, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, CHECK_PART_VOLUME_NOT_FOUND}; use crate::store_api::BitrotAlgorithm; -use crate::utils::fs::{lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; +use crate::utils::fs::{access, lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; use crate::utils::os::get_info; use crate::utils::path::{clean, has_suffix, SLASH_SEPARATOR}; use crate::{ @@ -26,6 +28,8 @@ use crate::{ }; use path_absolutize::Absolutize; use std::fmt::Debug; +use std::io::Cursor; +use std::os::unix::fs::MetadataExt; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -38,7 +42,7 @@ use tokio::fs::{self, File}; use tokio::io::{AsyncReadExt, AsyncWriteExt, ErrorKind}; use tokio::runtime::Runtime; use tokio::sync::RwLock; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use uuid::Uuid; #[derive(Debug)] @@ -180,7 +184,7 @@ impl LocalDisk { // format_data: Mutex::new(format_data), // format_last_check: Mutex::new(format_last_check), }; - let (info, root) = get_disk_info(root_clone).await?; + let (info, _root) = get_disk_info(root_clone).await?; disk.major = info.major; disk.minor = info.minor; disk.fstype = info.fstype; @@ -653,17 +657,18 @@ impl LocalDisk { async fn bitrot_verify( &self, part_path: &PathBuf, - part_size: u64, + part_size: usize, algo: BitrotAlgorithm, sum: &[u8], - shard_size: u64, + shard_size: usize, ) -> Result<()> { - let file = utils::fs::open_file(part_path, O_CREATE | O_WRONLY) + let mut file = utils::fs::open_file(part_path, O_CREATE | O_WRONLY) .await .map_err(os_err_to_file_err)?; - - todo!() + let mut data = Vec::new(); + let n = file.read_to_end(&mut data).await?; + bitrot_verify(&mut Cursor::new(data), n, part_size, algo, sum.to_vec(), shard_size) } } @@ -861,7 +866,7 @@ impl DiskAPI for LocalDisk { Ok(()) } - async fn verify_file(&self, volume: &str, path: &str, fi: FileInfo) -> Result { + async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { let volume_dir = self.get_bucket_path(volume)?; if !skip_access_checks(volume) { if let Err(e) = utils::fs::access(&volume_dir).await { @@ -873,17 +878,92 @@ impl DiskAPI for LocalDisk { results: Vec::with_capacity(fi.parts.len()), }; - let erasure = fi.erasure; - fi.parts.iter().enumerate().for_each(|(i, part)| { + let erasure = &fi.erasure; + for (i, part) in fi.parts.iter().enumerate() { let checksum_info = erasure.get_checksum_info(part.number); let part_path = Path::new(&volume_dir) .join(path) .join(fi.data_dir.map_or("".to_string(), |dir| dir.to_string())) .join(format!("part.{}", part.number)); + let err = match self + .bitrot_verify( + &part_path, + erasure.shard_file_size(part.size), + checksum_info.algorithm, + &checksum_info.hash, + erasure.shard_size(erasure.block_size), + ) + .await + { + Ok(_) => None, + Err(err) => Some(err), + }; + resp.results[i] = conv_part_err_to_int(&err); + if resp.results[i] == CHECK_PART_UNKNOWN { + if let Some(err) = err { + match err.downcast_ref::() { + Some(DiskError::FileAccessDenied) => {} + _ => { + info!("part unknown, disk: {}, path: {:?}", self.to_string(), part_path); + } + } + } + } + } - // self.bi - }); - todo!() + Ok(resp) + } + + async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { + let volume_dir = self.get_bucket_path(&volume)?; + check_path_length(volume_dir.join(path).to_string_lossy().as_ref())?; + let mut resp = CheckPartsResp { + results: vec![0; fi.parts.len()], + }; + + for (i, part) in fi.parts.iter().enumerate() { + let file_path = Path::new(&volume_dir) + .join(path) + .join(fi.data_dir.map_or("".to_string(), |dir| dir.to_string())) + .join(format!("part.{}", part.number)); + + match lstat(file_path).await { + Ok(st) => { + if st.is_dir() { + resp.results[i] = CHECK_PART_FILE_NOT_FOUND; + continue; + } + if (st.size() as usize) < fi.erasure.shard_file_size(part.size) { + resp.results[i] = CHECK_PART_FILE_CORRUPT; + continue; + } + + resp.results[i] = CHECK_PART_SUCCESS; + }, + Err(err) => { + match os_err_to_file_err(err).downcast_ref() { + Some(DiskError::FileNotFound) => { + if !skip_access_checks(volume) { + if let Err(err) = access(&volume_dir).await { + match err.kind() { + ErrorKind::NotFound => { + resp.results[i] = CHECK_PART_VOLUME_NOT_FOUND; + continue; + }, + _ => {}, + } + } + } + resp.results[i] = CHECK_PART_FILE_NOT_FOUND; + }, + _ => {}, + } + continue; + } + } + } + + Ok(resp) } async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec) -> Result<()> { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 6408e875..7ce541ee 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -124,10 +124,11 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { // ReadFileStream async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec) -> Result<()>; - // CheckParts async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>; // VerifyFile - async fn verify_file(&self, volume: &str, path: &str, fi: FileInfo) -> Result; + async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result; + // CheckParts + async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result; // StatInfoFile // ReadParts async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; @@ -137,6 +138,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn disk_info(&self, opts: &DiskInfoOptions) -> Result; } +#[derive(Debug, Default, Serialize, Deserialize)] pub struct CheckPartsResp { pub results: Vec, } diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 8d120a06..496aa575 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -4,10 +4,7 @@ use futures::lock::Mutex; use protos::{ node_service_time_out_client, proto_gen::node_service::{ - DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, DiskInfoRequest, - ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, - ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, UpdateMetadataRequest, - WalkDirRequest, WriteAllRequest, WriteMetadataRequest, + CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest }, }; use tonic::Request; @@ -162,8 +159,52 @@ impl DiskAPI for RemoteDisk { Ok(()) } - async fn verify_file(&self, volume: &str, path: &str, fi: FileInfo) -> Result { - unimplemented!() + async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { + info!("verify_file"); + let file_info = serde_json::to_string(&fi)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; + let request = Request::new(VerifyFileRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info, + }); + + let response = client.verify_file(request).await?.into_inner(); + + if !response.success { + return Err(Error::from_string(response.error_info.unwrap_or("".to_string()))); + } + + let check_parts_resp = serde_json::from_str::(&response.check_parts_resp)?; + + Ok(check_parts_resp) + } + + async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { + info!("check_parts"); + let file_info = serde_json::to_string(&fi)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; + let request = Request::new(CheckPartsRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info, + }); + + let response = client.check_parts(request).await?.into_inner(); + + if !response.success { + return Err(Error::from_string(response.error_info.unwrap_or("".to_string()))); + } + + let check_parts_resp = serde_json::from_str::(&response.check_parts_resp)?; + + Ok(check_parts_resp) } async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec) -> Result<()> { diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 44910a4d..ab53e802 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,11 +1,10 @@ use std::collections::HashMap; use crate::{ - error::{Error, Result}, - heal::{ + disk::error::DiskError, error::{Error, Result}, heal::{ heal_commands::{HealOpts, HealResultItem}, heal_ops::HealObjectFn, - }, + } }; use futures::StreamExt; use http::HeaderMap; @@ -17,6 +16,8 @@ use uuid::Uuid; pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; pub const BLOCK_SIZE_V2: usize = 1048576; // 1M +pub const RESERVED_METADATA_PREFIX: &str = "X-Rustfs-Internal-"; +pub const RESERVED_METADATA_PREFIX_LOWER: &str = "X-Rustfs-Internal-"; // #[derive(Debug, Clone)] #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] @@ -172,6 +173,7 @@ impl FileInfo { parts: self.parts.clone(), is_latest: self.is_latest, tags: self.tags.clone(), + ..Default::default() } } // to_part_offset 取offset 所在的part index, 返回part index, offset @@ -250,6 +252,29 @@ impl ErasureInfo { ChecksumInfo {algorithm: DEFAULT_BITROT_ALGO, ..Default::default()} } + + // 算出每个分片大小 + pub fn shard_size(&self, data_size: usize) -> usize { + (data_size + self.data_blocks - 1) / self.data_blocks + } + + // returns final erasure size from original size. + pub fn shard_file_size(&self, total_size: usize) -> usize { + if total_size == 0 { + return 0; + } + + let num_shards = total_size / self.block_size; + let last_block_size = total_size % self.block_size; + let last_shard_size = (last_block_size + self.data_blocks - 1) / self.data_blocks; + num_shards * self.shard_size(self.block_size) + last_shard_size + + // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 + // if last_block_size != 0 { + // num_shards += 1 + // } + // num_shards * self.shard_size(self.block_size) + } } #[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)] @@ -481,7 +506,10 @@ pub struct ObjectInfo { pub name: String, pub mod_time: Option, pub size: usize, + // Actual size is the real size of the object uploaded by client. + pub actual_size: Option, pub is_dir: bool, + pub user_defined: HashMap, pub parity_blocks: usize, pub data_blocks: usize, pub version_id: Option, @@ -491,6 +519,40 @@ pub struct ObjectInfo { pub tags: Option>, } +impl ObjectInfo { + pub fn is_compressed(&self) -> bool { + self.user_defined.contains_key(&format!("{}compression", RESERVED_METADATA_PREFIX)) + } + + pub fn get_actual_size(&self) -> Result { + if let Some(actual_size) = self.actual_size { + return Ok(actual_size); + } + + if self.is_compressed() { + if let Some(size_str) = self.user_defined.get(&format!("{}actual-size", RESERVED_METADATA_PREFIX)) { + if !size_str.is_empty() { + // Todo: deal with error + let size = size_str.parse::()?; + return Ok(size); + } + } + let mut actual_size = 0; + self.parts.iter().for_each(|part| { + actual_size += part.actual_size; + }); + if actual_size == 0 && actual_size != self.size { + return Err(Error::from_string("invalid decompressed size")); + } + return Ok(actual_size); + } + + // TODO: IsEncrypted + + Ok(self.size) + } +} + #[derive(Debug, Default)] pub struct ListObjectsInfo { // Indicates whether the returned list objects response is truncated. A diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index e809a885..e5caf61c 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -16,17 +16,7 @@ use lock::{lock_args::LockArgs, Locker, GLOBAL_LOCAL_SERVER}; use protos::{ models::{PingBody, PingBodyBuilder}, proto_gen::node_service::{ - node_service_server::NodeService as Node, DeleteBucketRequest, DeleteBucketResponse, DeletePathsRequest, - DeletePathsResponse, DeleteRequest, DeleteResponse, DeleteVersionRequest, DeleteVersionResponse, DeleteVersionsRequest, - DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse, DiskInfoRequest, DiskInfoResponse, - GenerallyLockRequest, GenerallyLockResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, - ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, - MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest, - PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, - ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse, - RenameFileRequst, RenameFileResponse, RenamePartRequst, RenamePartResponse, StatVolumeRequest, StatVolumeResponse, - UpdateMetadataRequest, UpdateMetadataResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, - WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse, + node_service_server::NodeService as Node, CheckPartsRequest, CheckPartsResponse, DeleteBucketRequest, DeleteBucketResponse, DeletePathsRequest, DeletePathsResponse, DeleteRequest, DeleteResponse, DeleteVersionRequest, DeleteVersionResponse, DeleteVersionsRequest, DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse, DiskInfoRequest, DiskInfoResponse, GenerallyLockRequest, GenerallyLockResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, RenamePartRequst, RenamePartResponse, StatVolumeRequest, StatVolumeResponse, UpdateMetadataRequest, UpdateMetadataResponse, VerifyFileRequest, VerifyFileResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse }, }; use tokio::sync::mpsc; @@ -306,6 +296,98 @@ impl Node for NodeService { } } + async fn verify_file(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let file_info = match serde_json::from_str::(&request.file_info) { + Ok(file_info) => file_info, + Err(_) => { + return Ok(tonic::Response::new(VerifyFileResponse { + success: false, + check_parts_resp: "".to_string(), + error_info: Some("can not decode FileInfo".to_string()), + })); + } + }; + match disk.verify_file(&request.volume, &request.path, &file_info).await { + Ok(check_parts_resp) => { + let check_parts_resp = match serde_json::to_string(&check_parts_resp) { + Ok(check_parts_resp) => check_parts_resp, + Err(_) => { + return Ok(tonic::Response::new(VerifyFileResponse { + success: false, + check_parts_resp: String::new(), + error_info: Some("can not encode RenameDataResp".to_string()), + })); + } + }; + Ok(tonic::Response::new(VerifyFileResponse { + success: true, + check_parts_resp, + error_info: None, + })) + }, + Err(err) => Ok(tonic::Response::new(VerifyFileResponse { + success: false, + check_parts_resp: "".to_string(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(VerifyFileResponse { + success: false, + check_parts_resp: "".to_string(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn check_parts(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let file_info = match serde_json::from_str::(&request.file_info) { + Ok(file_info) => file_info, + Err(_) => { + return Ok(tonic::Response::new(CheckPartsResponse { + success: false, + check_parts_resp: "".to_string(), + error_info: Some("can not decode FileInfo".to_string()), + })); + } + }; + match disk.verify_file(&request.volume, &request.path, &file_info).await { + Ok(check_parts_resp) => { + let check_parts_resp = match serde_json::to_string(&check_parts_resp) { + Ok(check_parts_resp) => check_parts_resp, + Err(_) => { + return Ok(tonic::Response::new(CheckPartsResponse { + success: false, + check_parts_resp: String::new(), + error_info: Some("can not encode RenameDataResp".to_string()), + })); + } + }; + Ok(tonic::Response::new(CheckPartsResponse { + success: true, + check_parts_resp, + error_info: None, + })) + }, + Err(err) => Ok(tonic::Response::new(CheckPartsResponse { + success: false, + check_parts_resp: "".to_string(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(CheckPartsResponse { + success: false, + check_parts_resp: "".to_string(), + error_info: Some("can not find disk".to_string()), + })) + } + } + async fn rename_part(&self, request: Request) -> Result, Status> { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await {