diff --git a/crates/ecstore/src/checksum.rs b/crates/ecstore/src/checksum.rs deleted file mode 100644 index dd8be1e6..00000000 --- a/crates/ecstore/src/checksum.rs +++ /dev/null @@ -1,350 +0,0 @@ -#![allow(clippy::map_entry)] -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - -use lazy_static::lazy_static; -use rustfs_checksums::ChecksumAlgorithm; -use std::collections::HashMap; - -use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart}; -use crate::{disk::DiskAPI, store_api::GetObjectReader}; -use rustfs_utils::crypto::{base64_decode, base64_encode}; -use s3s::header::{ - X_AMZ_CHECKSUM_ALGORITHM, X_AMZ_CHECKSUM_CRC32, X_AMZ_CHECKSUM_CRC32C, X_AMZ_CHECKSUM_SHA1, X_AMZ_CHECKSUM_SHA256, -}; - -use enumset::{EnumSet, EnumSetType, enum_set}; - -#[derive(Debug, EnumSetType, Default)] -#[enumset(repr = "u8")] -pub enum ChecksumMode { - #[default] - ChecksumNone, - ChecksumSHA256, - ChecksumSHA1, - ChecksumCRC32, - ChecksumCRC32C, - ChecksumCRC64NVME, - ChecksumFullObject, -} - -lazy_static! { - static ref C_ChecksumMask: EnumSet = { - let mut s = EnumSet::all(); - s.remove(ChecksumMode::ChecksumFullObject); - s - }; - static ref C_ChecksumFullObjectCRC32: EnumSet = - enum_set!(ChecksumMode::ChecksumCRC32 | ChecksumMode::ChecksumFullObject); - static ref C_ChecksumFullObjectCRC32C: EnumSet = - enum_set!(ChecksumMode::ChecksumCRC32C | ChecksumMode::ChecksumFullObject); -} -const AMZ_CHECKSUM_CRC64NVME: &str = "x-amz-checksum-crc64nvme"; - -impl ChecksumMode { - //pub const CRC64_NVME_POLYNOMIAL: i64 = 0xad93d23594c93659; - - pub fn base(&self) -> ChecksumMode { - let s = EnumSet::from(*self).intersection(*C_ChecksumMask); - match s.as_u8() { - 1_u8 => ChecksumMode::ChecksumNone, - 2_u8 => ChecksumMode::ChecksumSHA256, - 4_u8 => ChecksumMode::ChecksumSHA1, - 8_u8 => ChecksumMode::ChecksumCRC32, - 16_u8 => ChecksumMode::ChecksumCRC32C, - 32_u8 => ChecksumMode::ChecksumCRC64NVME, - _ => panic!("enum err."), - } - } - - pub fn is(&self, t: ChecksumMode) -> bool { - *self & t == t - } - - pub fn key(&self) -> String { - //match c & checksumMask { - match self { - ChecksumMode::ChecksumCRC32 => { - return X_AMZ_CHECKSUM_CRC32.to_string(); - } - ChecksumMode::ChecksumCRC32C => { - return X_AMZ_CHECKSUM_CRC32C.to_string(); - } - ChecksumMode::ChecksumSHA1 => { - return X_AMZ_CHECKSUM_SHA1.to_string(); - } - ChecksumMode::ChecksumSHA256 => { - return X_AMZ_CHECKSUM_SHA256.to_string(); - } - ChecksumMode::ChecksumCRC64NVME => { - return AMZ_CHECKSUM_CRC64NVME.to_string(); - } - _ => { - return "".to_string(); - } - } - } - - pub fn can_composite(&self) -> bool { - let s = EnumSet::from(*self).intersection(*C_ChecksumMask); - match s.as_u8() { - 2_u8 => true, - 4_u8 => true, - 8_u8 => true, - 16_u8 => true, - _ => false, - } - } - - pub fn can_merge_crc(&self) -> bool { - let s = EnumSet::from(*self).intersection(*C_ChecksumMask); - match s.as_u8() { - 8_u8 => true, - 16_u8 => true, - 32_u8 => true, - _ => false, - } - } - - pub fn full_object_requested(&self) -> bool { - let s = EnumSet::from(*self).intersection(*C_ChecksumMask); - match s.as_u8() { - //C_ChecksumFullObjectCRC32 as u8 => true, - //C_ChecksumFullObjectCRC32C as u8 => true, - 32_u8 => true, - _ => false, - } - } - - pub fn key_capitalized(&self) -> String { - self.key() - } - - pub fn raw_byte_len(&self) -> usize { - let u = EnumSet::from(*self).intersection(*C_ChecksumMask).as_u8(); - if u == ChecksumMode::ChecksumCRC32 as u8 || u == ChecksumMode::ChecksumCRC32C as u8 { - 4 - } else if u == ChecksumMode::ChecksumSHA1 as u8 { - use sha1::Digest; - sha1::Sha1::output_size() as usize - } else if u == ChecksumMode::ChecksumSHA256 as u8 { - use sha2::Digest; - sha2::Sha256::output_size() as usize - } else if u == ChecksumMode::ChecksumCRC64NVME as u8 { - 8 - } else { - 0 - } - } - - pub fn hasher(&self) -> Result, std::io::Error> { - match /*C_ChecksumMask & **/self { - ChecksumMode::ChecksumCRC32 => { - return Ok(ChecksumAlgorithm::Crc32.into_impl()); - } - ChecksumMode::ChecksumCRC32C => { - return Ok(ChecksumAlgorithm::Crc32c.into_impl()); - } - ChecksumMode::ChecksumSHA1 => { - return Ok(ChecksumAlgorithm::Sha1.into_impl()); - } - ChecksumMode::ChecksumSHA256 => { - return Ok(ChecksumAlgorithm::Sha256.into_impl()); - } - ChecksumMode::ChecksumCRC64NVME => { - return Ok(ChecksumAlgorithm::Crc64Nvme.into_impl()); - } - _ => return Err(std::io::Error::other("unsupported checksum type")), - } - } - - pub fn is_set(&self) -> bool { - let s = EnumSet::from(*self).intersection(*C_ChecksumMask); - s.len() == 1 - } - - pub fn set_default(&mut self, t: ChecksumMode) { - if !self.is_set() { - *self = t; - } - } - - pub fn encode_to_string(&self, b: &[u8]) -> Result { - if !self.is_set() { - return Ok("".to_string()); - } - let mut h = self.hasher()?; - h.update(b); - let hash = h.finalize(); - Ok(base64_encode(hash.as_ref())) - } - - pub fn to_string(&self) -> String { - //match c & checksumMask { - match self { - ChecksumMode::ChecksumCRC32 => { - return "CRC32".to_string(); - } - ChecksumMode::ChecksumCRC32C => { - return "CRC32C".to_string(); - } - ChecksumMode::ChecksumSHA1 => { - return "SHA1".to_string(); - } - ChecksumMode::ChecksumSHA256 => { - return "SHA256".to_string(); - } - ChecksumMode::ChecksumNone => { - return "".to_string(); - } - ChecksumMode::ChecksumCRC64NVME => { - return "CRC64NVME".to_string(); - } - _ => { - return "".to_string(); - } - } - } - - // pub fn check_sum_reader(&self, r: GetObjectReader) -> Result { - // let mut h = self.hasher()?; - // Ok(Checksum::new(self.clone(), h.sum().as_bytes())) - // } - - // pub fn check_sum_bytes(&self, b: &[u8]) -> Result { - // let mut h = self.hasher()?; - // Ok(Checksum::new(self.clone(), h.sum().as_bytes())) - // } - - pub fn composite_checksum(&self, p: &mut [ObjectPart]) -> Result { - if !self.can_composite() { - return Err(std::io::Error::other("cannot do composite checksum")); - } - p.sort_by(|i, j| { - if i.part_num < j.part_num { - std::cmp::Ordering::Less - } else if i.part_num > j.part_num { - std::cmp::Ordering::Greater - } else { - std::cmp::Ordering::Equal - } - }); - let c = self.base(); - let crc_bytes = Vec::::with_capacity(p.len() * self.raw_byte_len() as usize); - let mut h = self.hasher()?; - h.update(crc_bytes.as_ref()); - let hash = h.finalize(); - Ok(Checksum { - checksum_type: self.clone(), - r: hash.as_ref().to_vec(), - computed: false, - }) - } - - pub fn full_object_checksum(&self, p: &mut [ObjectPart]) -> Result { - todo!(); - } -} - -#[derive(Default)] -pub struct Checksum { - checksum_type: ChecksumMode, - r: Vec, - computed: bool, -} - -#[allow(dead_code)] -impl Checksum { - fn new(t: ChecksumMode, b: &[u8]) -> Checksum { - if t.is_set() && b.len() == t.raw_byte_len() { - return Checksum { - checksum_type: t, - r: b.to_vec(), - computed: false, - }; - } - Checksum::default() - } - - #[allow(dead_code)] - fn new_checksum_string(t: ChecksumMode, s: &str) -> Result { - let b = match base64_decode(s.as_bytes()) { - Ok(b) => b, - Err(err) => return Err(std::io::Error::other(err.to_string())), - }; - if t.is_set() && b.len() == t.raw_byte_len() { - return Ok(Checksum { - checksum_type: t, - r: b, - computed: false, - }); - } - Ok(Checksum::default()) - } - - fn is_set(&self) -> bool { - self.checksum_type.is_set() && self.r.len() == self.checksum_type.raw_byte_len() - } - - fn encoded(&self) -> String { - if !self.is_set() { - return "".to_string(); - } - base64_encode(&self.r) - } - - #[allow(dead_code)] - fn raw(&self) -> Option> { - if !self.is_set() { - return None; - } - Some(self.r.clone()) - } -} - -pub fn add_auto_checksum_headers(opts: &mut PutObjectOptions) { - opts.user_metadata - .insert("X-Amz-Checksum-Algorithm".to_string(), opts.auto_checksum.to_string()); - if opts.auto_checksum.full_object_requested() { - opts.user_metadata - .insert("X-Amz-Checksum-Type".to_string(), "FULL_OBJECT".to_string()); - } -} - -pub fn apply_auto_checksum(opts: &mut PutObjectOptions, all_parts: &mut [ObjectPart]) -> Result<(), std::io::Error> { - if opts.auto_checksum.can_composite() && !opts.auto_checksum.is(ChecksumMode::ChecksumFullObject) { - let crc = opts.auto_checksum.composite_checksum(all_parts)?; - opts.user_metadata = { - let mut hm = HashMap::new(); - hm.insert(opts.auto_checksum.key(), crc.encoded()); - hm - } - } else if opts.auto_checksum.can_merge_crc() { - let crc = opts.auto_checksum.full_object_checksum(all_parts)?; - opts.user_metadata = { - let mut hm = HashMap::new(); - hm.insert(opts.auto_checksum.key_capitalized(), crc.encoded()); - hm.insert("X-Amz-Checksum-Type".to_string(), "FULL_OBJECT".to_string()); - hm - } - } - - Ok(()) -} diff --git a/crates/ecstore/src/chunk_stream.rs b/crates/ecstore/src/chunk_stream.rs deleted file mode 100644 index 41b3b2d9..00000000 --- a/crates/ecstore/src/chunk_stream.rs +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// use crate::error::StdError; -// use bytes::Bytes; -// use futures::pin_mut; -// use futures::stream::{Stream, StreamExt}; -// use std::future::Future; -// use std::pin::Pin; -// use std::task::{Context, Poll}; -// use transform_stream::AsyncTryStream; - -// pub type SyncBoxFuture<'a, T> = Pin + Send + Sync + 'a>>; - -// pub struct ChunkedStream<'a> { -// /// inner -// inner: AsyncTryStream>>, - -// remaining_length: usize, -// } - -// impl<'a> ChunkedStream<'a> { -// pub fn new(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self -// where -// S: Stream> + Send + Sync + 'a, -// { -// let inner = AsyncTryStream::<_, _, SyncBoxFuture<'a, Result<(), StdError>>>::new(|mut y| { -// #[allow(clippy::shadow_same)] // necessary for `pin_mut!` -// Box::pin(async move { -// pin_mut!(body); -// // Data left over from the previous call -// let mut prev_bytes = Bytes::new(); -// let mut read_size = 0; - -// loop { -// let data: Vec = { -// // Read a fixed-size chunk -// match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await { -// None => break, -// Some(Err(e)) => return Err(e), -// Some(Ok((data, remaining_bytes))) => { -// // debug!( -// // "content_length:{},read_size:{}, read_data data:{}, remaining_bytes: {} ", -// // content_length, -// // read_size, -// // data.len(), -// // remaining_bytes.len() -// // ); - -// prev_bytes = remaining_bytes; -// data -// } -// } -// }; - -// for bytes in data { -// read_size += bytes.len(); -// // debug!("read_size {}, content_length {}", read_size, content_length,); -// y.yield_ok(bytes).await; -// } - -// if read_size + prev_bytes.len() >= content_length { -// // debug!( -// // "Finished reading: read_size:{} + prev_bytes.len({}) == content_length {}", -// // read_size, -// // prev_bytes.len(), -// // content_length, -// // ); - -// // Pad with zeros? -// if !need_padding { -// y.yield_ok(prev_bytes).await; -// break; -// } - -// let mut bytes = vec![0u8; chunk_size]; -// let (left, _) = bytes.split_at_mut(prev_bytes.len()); -// left.copy_from_slice(&prev_bytes); - -// y.yield_ok(Bytes::from(bytes)).await; - -// break; -// } -// } - -// // debug!("chunked stream exit"); - -// Ok(()) -// }) -// }); -// Self { -// inner, -// remaining_length: content_length, -// } -// } -// /// read data and return remaining bytes -// async fn read_data( -// mut body: Pin<&mut S>, -// prev_bytes: Bytes, -// data_size: usize, -// ) -> Option, Bytes), StdError>> -// where -// S: Stream> + Send, -// { -// let mut bytes_buffer = Vec::new(); - -// // Run only once -// let mut push_data_bytes = |mut bytes: Bytes| { -// // debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len()); - -// if bytes.is_empty() { -// return None; -// } - -// if data_size == 0 { -// return Some(bytes); -// } - -// // Merge with the previous data -// if !prev_bytes.is_empty() { -// let need_size = data_size.wrapping_sub(prev_bytes.len()); -// // debug!( -// // "Previous leftover {}, take {} now, total: {}", -// // prev_bytes.len(), -// // need_size, -// // prev_bytes.len() + need_size -// // ); -// if bytes.len() >= need_size { -// let data = bytes.split_to(need_size); -// let mut combined = Vec::new(); -// combined.extend_from_slice(&prev_bytes); -// combined.extend_from_slice(&data); - -// // debug!( -// // "Fetched more bytes than needed: {}, merged result {}, remaining bytes {}", -// // need_size, -// // combined.len(), -// // bytes.len(), -// // ); - -// bytes_buffer.push(Bytes::from(combined)); -// } else { -// let mut combined = Vec::new(); -// combined.extend_from_slice(&prev_bytes); -// combined.extend_from_slice(&bytes); - -// // debug!( -// // "Fetched fewer bytes than needed: {}, merged result {}, remaining bytes {}, return immediately", -// // need_size, -// // combined.len(), -// // bytes.len(), -// // ); - -// return Some(Bytes::from(combined)); -// } -// } - -// // If the fetched data exceeds the chunk, slice the required size -// if data_size <= bytes.len() { -// let n = bytes.len() / data_size; - -// for _ in 0..n { -// let data = bytes.split_to(data_size); - -// // println!("bytes_buffer.push: {}, remaining: {}", data.len(), bytes.len()); -// bytes_buffer.push(data); -// } - -// Some(bytes) -// } else { -// // Insufficient data -// Some(bytes) -// } -// }; - -// // Remaining data -// let remaining_bytes = 'outer: { -// // // Exit if the previous data was sufficient -// // if let Some(remaining_bytes) = push_data_bytes(prev_bytes) { -// // println!("Consuming leftovers"); -// // break 'outer remaining_bytes; -// // } - -// loop { -// match body.next().await? { -// Err(e) => return Some(Err(e)), -// Ok(bytes) => { -// if let Some(remaining_bytes) = push_data_bytes(bytes) { -// break 'outer remaining_bytes; -// } -// } -// } -// } -// }; - -// Some(Ok((bytes_buffer, remaining_bytes))) -// } - -// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { -// let ans = Pin::new(&mut self.inner).poll_next(cx); -// if let Poll::Ready(Some(Ok(ref bytes))) = ans { -// self.remaining_length = self.remaining_length.saturating_sub(bytes.len()); -// } -// ans -// } - -// // pub fn exact_remaining_length(&self) -> usize { -// // self.remaining_length -// // } -// } - -// impl Stream for ChunkedStream<'_> { -// type Item = Result; - -// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// self.poll(cx) -// } - -// fn size_hint(&self) -> (usize, Option) { -// (0, None) -// } -// } - -// #[cfg(test)] -// mod test { - -// use super::*; - -// #[tokio::test] -// async fn test_chunked_stream() { -// let chunk_size = 4; - -// let data1 = vec![1u8; 7777]; // 65536 -// let data2 = vec![1u8; 7777]; // 65536 - -// let content_length = data1.len() + data2.len(); - -// let chunk1 = Bytes::from(data1); -// let chunk2 = Bytes::from(data2); - -// let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; - -// let stream = futures::stream::iter(chunk_results); - -// let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true); - -// loop { -// let ans1 = chunked_stream.next().await; -// if ans1.is_none() { -// break; -// } - -// let bytes = ans1.unwrap().unwrap(); -// assert!(bytes.len() == chunk_size) -// } - -// // assert_eq!(ans1.unwrap(), chunk1_data.as_slice()); -// } -// } diff --git a/crates/ecstore/src/client/hook_reader.rs b/crates/ecstore/src/client/hook_reader.rs deleted file mode 100644 index 38d2c3f8..00000000 --- a/crates/ecstore/src/client/hook_reader.rs +++ /dev/null @@ -1,59 +0,0 @@ -#![allow(clippy::map_entry)] -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{collections::HashMap, sync::Arc}; - -use crate::{ - disk::{ - error::{is_unformatted_disk, DiskError}, - format::{DistributionAlgoVersion, FormatV3}, - new_disk, DiskAPI, DiskInfo, DiskOption, DiskStore, - }, - store_api::{ - BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, - ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, - ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, - }, - credentials::{Credentials, SignatureType,}, - api_put_object_multipart::UploadPartParams, -}; - -use http::HeaderMap; -use tokio_util::sync::CancellationToken; -use tracing::warn; -use tracing::{error, info}; -use url::Url; - -struct HookReader { - source: GetObjectReader, - hook: GetObjectReader, -} - -impl HookReader { - pub fn new(source: GetObjectReader, hook: GetObjectReader) -> HookReader { - HookReader { - source, - hook, - } - } - - fn seek(&self, offset: i64, whence: i64) -> Result { - todo!(); - } - - fn read(&self, b: &[u8]) -> Result { - todo!(); - } -} \ No newline at end of file diff --git a/crates/ecstore/src/erasure.rs b/crates/ecstore/src/erasure.rs deleted file mode 100644 index 2939fe13..00000000 --- a/crates/ecstore/src/erasure.rs +++ /dev/null @@ -1,586 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::bitrot::{BitrotReader, BitrotWriter}; -use crate::disk::error::{Error, Result}; -use crate::disk::error_reduce::{reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS}; -use crate::io::Etag; -use bytes::{Bytes, BytesMut}; -use futures::future::join_all; -use reed_solomon_erasure::galois_8::ReedSolomon; -use smallvec::SmallVec; -use std::any::Any; -use std::io::ErrorKind; -use std::sync::{mpsc, Arc}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::mpsc; -use tracing::warn; -use tracing::{error, info}; -use uuid::Uuid; - -use crate::disk::error::DiskError; - -#[derive(Default)] -pub struct Erasure { - data_shards: usize, - parity_shards: usize, - encoder: Option, - pub block_size: usize, - _id: Uuid, - _buf: Vec, -} - -impl Erasure { - pub fn new(data_shards: usize, parity_shards: usize, block_size: usize) -> Self { - // debug!( - // "Erasure new data_shards {},parity_shards {} block_size {} ", - // data_shards, parity_shards, block_size - // ); - let mut encoder = None; - if parity_shards > 0 { - encoder = Some(ReedSolomon::new(data_shards, parity_shards).unwrap()); - } - - Erasure { - data_shards, - parity_shards, - block_size, - encoder, - _id: Uuid::new_v4(), - _buf: vec![0u8; block_size], - } - } - - #[tracing::instrument(level = "info", skip(self, reader, writers))] - pub async fn encode( - self: Arc, - mut reader: S, - writers: &mut [Option], - // block_size: usize, - total_size: usize, - write_quorum: usize, - ) -> Result<(usize, String)> - where - S: AsyncRead + Etag + Unpin + Send + 'static, - { - let (tx, mut rx) = mpsc::channel(5); - let task = tokio::spawn(async move { - let mut buf = vec![0u8; self.block_size]; - let mut total: usize = 0; - loop { - if total_size > 0 { - let new_len = { - let remain = total_size - total; - if remain > self.block_size { self.block_size } else { remain } - }; - - if new_len == 0 && total > 0 { - break; - } - - buf.resize(new_len, 0u8); - match reader.read_exact(&mut buf).await { - Ok(res) => res, - Err(e) => { - if let ErrorKind::UnexpectedEof = e.kind() { - break; - } else { - return Err(e.into()); - } - } - }; - total += buf.len(); - } - let blocks = Arc::new(Box::pin(self.clone().encode_data(&buf)?)); - let _ = tx.send(blocks).await; - if total_size == 0 { - break; - } - } - let etag = reader.etag().await; - Ok((total, etag)) - }); - - while let Some(blocks) = rx.recv().await { - let write_futures = writers.iter_mut().enumerate().map(|(i, w_op)| { - let i_inner = i; - let blocks_inner = blocks.clone(); - async move { - if let Some(w) = w_op { - w.write(blocks_inner[i_inner].clone()).await.err() - } else { - Some(DiskError::DiskNotFound) - } - } - }); - let errs = join_all(write_futures).await; - let none_count = errs.iter().filter(|&x| x.is_none()).count(); - if none_count >= write_quorum { - if total_size == 0 { - break; - } - continue; - } - - if let Some(err) = reduce_write_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, write_quorum) { - warn!("Erasure encode errs {:?}", &errs); - return Err(err); - } - } - task.await? - } - - pub async fn decode( - &self, - writer: &mut W, - readers: Vec>, - offset: usize, - length: usize, - total_length: usize, - ) -> (usize, Option) - where - W: AsyncWriteExt + Send + Unpin + 'static, - { - if length == 0 { - return (0, None); - } - - let mut reader = ShardReader::new(readers, self, offset, total_length); - - // debug!("ShardReader {:?}", &reader); - - let start_block = offset / self.block_size; - let end_block = (offset + length) / self.block_size; - - // debug!("decode block from {} to {}", start_block, end_block); - - let mut bytes_written = 0; - - for block_idx in start_block..=end_block { - let (block_offset, block_length) = if start_block == end_block { - (offset % self.block_size, length) - } else if block_idx == start_block { - let block_offset = offset % self.block_size; - (block_offset, self.block_size - block_offset) - } else if block_idx == end_block { - (0, (offset + length) % self.block_size) - } else { - (0, self.block_size) - }; - - if block_length == 0 { - // debug!("block_length == 0 break"); - break; - } - - // debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length); - - let mut bufs = match reader.read().await { - Ok(bufs) => bufs, - Err(err) => return (bytes_written, Some(err)), - }; - - if self.parity_shards > 0 { - if let Err(err) = self.decode_data(&mut bufs) { - return (bytes_written, Some(err)); - } - } - - let written_n = match self - .write_data_blocks(writer, bufs, self.data_shards, block_offset, block_length) - .await - { - Ok(n) => n, - Err(err) => { - error!("write_data_blocks err {:?}", &err); - return (bytes_written, Some(err)); - } - }; - - bytes_written += written_n; - - // debug!("decode {} written_n {}, total_written: {} ", block_idx, written_n, bytes_written); - } - - if bytes_written != length { - // debug!("bytes_written != length: {} != {} ", bytes_written, length); - return (bytes_written, Some(Error::other("erasure decode less data"))); - } - - (bytes_written, None) - } - - async fn write_data_blocks( - &self, - writer: &mut W, - bufs: Vec>>, - data_blocks: usize, - offset: usize, - length: usize, - ) -> Result - where - W: AsyncWrite + Send + Unpin + 'static, - { - if bufs.len() < data_blocks { - return Err(Error::other("read bufs not match data_blocks")); - } - - let data_len: usize = bufs - .iter() - .take(data_blocks) - .filter(|v| v.is_some()) - .map(|v| v.as_ref().unwrap().len()) - .sum(); - if data_len < length { - return Err(Error::other(format!("write_data_blocks data_len < length {} < {}", data_len, length))); - } - - let mut offset = offset; - - // debug!("write_data_blocks offset {}, length {}", offset, length); - - let mut write = length; - let mut total_written = 0; - - for opt_buf in bufs.iter().take(data_blocks) { - let buf = opt_buf.as_ref().unwrap(); - - if offset >= buf.len() { - offset -= buf.len(); - continue; - } - - let buf = &buf[offset..]; - - offset = 0; - - // debug!("write_data_blocks write buf len {}", buf.len()); - - if write < buf.len() { - let buf = &buf[..write]; - - // debug!("write_data_blocks write buf less len {}", buf.len()); - writer.write_all(buf).await?; - // debug!("write_data_blocks write done len {}", buf.len()); - total_written += buf.len(); - break; - } - - writer.write_all(buf).await?; - let n = buf.len(); - - // debug!("write_data_blocks write done len {}", n); - write -= n; - total_written += n; - } - - Ok(total_written) - } - - pub fn total_shard_count(&self) -> usize { - self.data_shards + self.parity_shards - } - - #[tracing::instrument(level = "info", skip_all, fields(data_len=data.len()))] - pub fn encode_data(self: Arc, data: &[u8]) -> Result> { - let (shard_size, total_size) = self.need_size(data.len()); - - // Generate the total length required for all shards - let mut data_buffer = BytesMut::with_capacity(total_size); - - // Copy the source data - data_buffer.extend_from_slice(data); - data_buffer.resize(total_size, 0u8); - - { - // Perform EC encoding; the results go into data_buffer - let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect(); - - // Only perform EC encoding when parity shards are present - if self.parity_shards > 0 { - self.encoder.as_ref().unwrap().encode(data_slices).map_err(Error::other)?; - } - } - - // Zero-copy shards: every shard references data_buffer - let mut data_buffer = data_buffer.freeze(); - let mut shards = Vec::with_capacity(self.total_shard_count()); - for _ in 0..self.total_shard_count() { - let shard = data_buffer.split_to(shard_size); - shards.push(shard); - } - - Ok(shards) - } - - pub fn decode_data(&self, shards: &mut [Option>]) -> Result<()> { - if self.parity_shards > 0 { - self.encoder.as_ref().unwrap().reconstruct(shards).map_err(Error::other)?; - } - - Ok(()) - } - - // The length per shard and the total required length - fn need_size(&self, data_size: usize) -> (usize, usize) { - let shard_size = self.shard_size(data_size); - (shard_size, shard_size * (self.total_shard_count())) - } - - // Compute each shard size - pub fn shard_size(&self, data_size: usize) -> usize { - data_size.div_ceil(self.data_shards) - } - // returns final erasure size from original size. - pub fn shard_file_size(&self, total_size: usize) -> usize { - if total_size == 0 { - return 0; - } - - let num_shards = total_size / self.block_size; - let last_block_size = total_size % self.block_size; - let last_shard_size = last_block_size.div_ceil(self.data_shards); - num_shards * self.shard_size(self.block_size) + last_shard_size - - // When writing, EC pads the data so the last shard length should match - // if last_block_size != 0 { - // num_shards += 1 - // } - // num_shards * self.shard_size(self.block_size) - } - - // where erasure reading begins. - pub fn shard_file_offset(&self, start_offset: usize, length: usize, total_length: usize) -> usize { - let shard_size = self.shard_size(self.block_size); - let shard_file_size = self.shard_file_size(total_length); - let end_shard = (start_offset + length) / self.block_size; - let mut till_offset = end_shard * shard_size + shard_size; - if till_offset > shard_file_size { - till_offset = shard_file_size; - } - - till_offset - } - - pub async fn heal( - &self, - writers: &mut [Option], - readers: Vec>, - total_length: usize, - _prefer: &[bool], - ) -> Result<()> { - info!( - "Erasure heal, writers len: {}, readers len: {}, total_length: {}", - writers.len(), - readers.len(), - total_length - ); - if writers.len() != self.parity_shards + self.data_shards { - return Err(Error::other("invalid argument")); - } - let mut reader = ShardReader::new(readers, self, 0, total_length); - - let start_block = 0; - let mut end_block = total_length / self.block_size; - if total_length % self.block_size != 0 { - end_block += 1; - } - - let mut errs = Vec::new(); - for _ in start_block..end_block { - let mut bufs = reader.read().await?; - - if self.parity_shards > 0 { - self.encoder.as_ref().unwrap().reconstruct(&mut bufs).map_err(Error::other)?; - } - - let shards = bufs.into_iter().flatten().map(Bytes::from).collect::>(); - if shards.len() != self.parity_shards + self.data_shards { - return Err(Error::other("can not reconstruct data")); - } - - for (i, w) in writers.iter_mut().enumerate() { - if w.is_none() { - continue; - } - match w.as_mut().unwrap().write(shards[i].clone()).await { - Ok(_) => {} - Err(e) => { - info!("write failed, err: {:?}", e); - errs.push(e); - } - } - } - } - if !errs.is_empty() { - return Err(errs[0].clone().into()); - } - - Ok(()) - } -} - -#[async_trait::async_trait] -pub trait Writer { - fn as_any(&self) -> &dyn Any; - async fn write(&mut self, buf: Bytes) -> Result<()>; - async fn close(&mut self) -> Result<()> { - Ok(()) - } -} - -#[async_trait::async_trait] -pub trait ReadAt { - async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)>; -} - -pub struct ShardReader { - readers: Vec>, // Disk readers - data_block_count: usize, // Total number of shards - parity_block_count: usize, - shard_size: usize, // Block size per shard (read one block at a time) - shard_file_size: usize, // Total size of the shard file - offset: usize, // Offset within the shard -} - -impl ShardReader { - pub fn new(readers: Vec>, ec: &Erasure, offset: usize, total_length: usize) -> Self { - Self { - readers, - data_block_count: ec.data_shards, - parity_block_count: ec.parity_shards, - shard_size: ec.shard_size(ec.block_size), - shard_file_size: ec.shard_file_size(total_length), - offset: (offset / ec.block_size) * ec.shard_size(ec.block_size), - } - } - - pub async fn read(&mut self) -> Result>>> { - // let mut disks = self.readers; - let reader_length = self.readers.len(); - // Length of the block to read - let mut read_length = self.shard_size; - if self.offset + read_length > self.shard_file_size { - read_length = self.shard_file_size - self.offset - } - - if read_length == 0 { - return Ok(vec![None; reader_length]); - } - - // debug!("shard reader read offset {}, shard_size {}", self.offset, read_length); - - let mut futures = Vec::with_capacity(reader_length); - let mut errors = Vec::with_capacity(reader_length); - - let mut ress = Vec::with_capacity(reader_length); - - for disk in self.readers.iter_mut() { - // if disk.is_none() { - // ress.push(None); - // errors.push(Some(Error::new(DiskError::DiskNotFound))); - // continue; - // } - - // let disk: &mut BitrotReader = disk.as_mut().unwrap(); - let offset = self.offset; - futures.push(async move { - if let Some(disk) = disk { - disk.read_at(offset, read_length).await - } else { - Err(DiskError::DiskNotFound) - } - }); - } - - let results = join_all(futures).await; - for result in results { - match result { - Ok((res, _)) => { - ress.push(Some(res)); - errors.push(None); - } - Err(e) => { - ress.push(None); - errors.push(Some(e)); - } - } - } - - if !self.can_decode(&ress) { - warn!("ec decode read ress {:?}", &ress); - warn!("ec decode read errors {:?}", &errors); - - return Err(Error::other("shard reader read failed")); - } - - self.offset += self.shard_size; - - Ok(ress) - } - - fn can_decode(&self, bufs: &[Option>]) -> bool { - let c = bufs.iter().filter(|v| v.is_some()).count(); - if self.parity_block_count > 0 { - c >= self.data_block_count - } else { - c == self.data_block_count - } - } -} - -// fn shards_to_option_shards(shards: &[Vec]) -> Vec>> { -// let mut result = Vec::with_capacity(shards.len()); - -// for v in shards.iter() { -// let inner: Vec = v.clone(); -// result.push(Some(inner)); -// } -// result -// } - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_erasure() { - let data_shards = 3; - let parity_shards = 2; - let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; - let ec = Erasure::new(data_shards, parity_shards, 1); - let shards = Arc::new(ec).encode_data(data).unwrap(); - println!("shards:{:?}", shards); - - let mut s: Vec<_> = shards - .iter() - .map(|d| if d.is_empty() { None } else { Some(d.to_vec()) }) - .collect(); - - // let mut s = shards_to_option_shards(&shards); - - // s[0] = None; - s[4] = None; - s[3] = None; - - println!("sss:{:?}", &s); - - let ec = Erasure::new(data_shards, parity_shards, 1); - ec.decode_data(&mut s).unwrap(); - // ec.encoder.reconstruct(&mut s).unwrap(); - - println!("sss:{:?}", &s); - } -} diff --git a/crates/ecstore/src/lib.rs b/crates/ecstore/src/lib.rs index 3194f2b8..d8ea3440 100644 --- a/crates/ecstore/src/lib.rs +++ b/crates/ecstore/src/lib.rs @@ -20,7 +20,6 @@ pub mod batch_processor; pub mod bitrot; pub mod bucket; pub mod cache_value; -mod chunk_stream; pub mod compress; pub mod config; pub mod data_usage; diff --git a/crates/ecstore/src/tier/warm_backend_azure2.rs b/crates/ecstore/src/tier/warm_backend_azure2.rs deleted file mode 100644 index 338a475d..00000000 --- a/crates/ecstore/src/tier/warm_backend_azure2.rs +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - -use std::collections::HashMap; -use std::sync::Arc; - -use azure_core::http::{Body, ClientOptions, RequestContent}; -use azure_storage::StorageCredentials; -use azure_storage_blobs::prelude::*; - -use crate::client::{ - admin_handler_utils::AdminError, - api_put_object::PutObjectOptions, - transition_api::{Options, ReadCloser, ReaderImpl}, -}; -use crate::tier::{ - tier_config::TierAzure, - warm_backend::{WarmBackend, WarmBackendGetOpts}, -}; -use tracing::warn; - -const MAX_MULTIPART_PUT_OBJECT_SIZE: i64 = 1024 * 1024 * 1024 * 1024 * 5; -const MAX_PARTS_COUNT: i64 = 10000; -const _MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5; -const MIN_PART_SIZE: i64 = 1024 * 1024 * 128; - -pub struct WarmBackendAzure { - pub client: Arc, - pub bucket: String, - pub prefix: String, - pub storage_class: String, -} - -impl WarmBackendAzure { - pub async fn new(conf: &TierAzure, tier: &str) -> Result { - if conf.access_key == "" || conf.secret_key == "" { - return Err(std::io::Error::other("both access and secret keys are required")); - } - - if conf.bucket == "" { - return Err(std::io::Error::other("no bucket name was provided")); - } - - let creds = StorageCredentials::access_key(conf.access_key.clone(), conf.secret_key.clone()); - let client = ClientBuilder::new(conf.access_key.clone(), creds) - //.endpoint(conf.endpoint) - .blob_service_client(); - let client = Arc::new(client); - Ok(Self { - client, - bucket: conf.bucket.clone(), - prefix: conf.prefix.strip_suffix("/").unwrap_or(&conf.prefix).to_owned(), - storage_class: "".to_string(), - }) - } - - /*pub fn tier(&self) -> *blob.AccessTier { - if self.storage_class == "" { - return None; - } - for t in blob.PossibleAccessTierValues() { - if strings.EqualFold(self.storage_class, t) { - return &t - } - } - None - }*/ - - pub fn get_dest(&self, object: &str) -> String { - let mut dest_obj = object.to_string(); - if self.prefix != "" { - dest_obj = format!("{}/{}", &self.prefix, object); - } - return dest_obj; - } -} - -#[async_trait::async_trait] -impl WarmBackend for WarmBackendAzure { - async fn put_with_meta( - &self, - object: &str, - r: ReaderImpl, - length: i64, - meta: HashMap, - ) -> Result { - let part_size = length; - let client = self.client.clone(); - let container_client = client.container_client(self.bucket.clone()); - let blob_client = container_client.blob_client(self.get_dest(object)); - /*let res = blob_client - .upload( - RequestContent::from(match r { - ReaderImpl::Body(content_body) => content_body.to_vec(), - ReaderImpl::ObjectBody(mut content_body) => content_body.read_all().await?, - }), - false, - length as u64, - None, - ) - .await - else { - return Err(std::io::Error::other("upload error")); - };*/ - - let Ok(res) = blob_client - .put_block_blob(match r { - ReaderImpl::Body(content_body) => content_body.to_vec(), - ReaderImpl::ObjectBody(mut content_body) => content_body.read_all().await?, - }) - .content_type("text/plain") - .into_future() - .await - else { - return Err(std::io::Error::other("put_block_blob error")); - }; - - //self.ToObjectError(err, object) - Ok(res.request_id.to_string()) - } - - async fn put(&self, object: &str, r: ReaderImpl, length: i64) -> Result { - self.put_with_meta(object, r, length, HashMap::new()).await - } - - async fn get(&self, object: &str, rv: &str, opts: WarmBackendGetOpts) -> Result { - let client = self.client.clone(); - let container_client = client.container_client(self.bucket.clone()); - let blob_client = container_client.blob_client(self.get_dest(object)); - blob_client.get(); - todo!(); - } - - async fn remove(&self, object: &str, rv: &str) -> Result<(), std::io::Error> { - let client = self.client.clone(); - let container_client = client.container_client(self.bucket.clone()); - let blob_client = container_client.blob_client(self.get_dest(object)); - blob_client.delete(); - todo!(); - } - - async fn in_use(&self) -> Result { - /*let result = self.client - .list_objects_v2(&self.bucket, &self.prefix, "", "", SLASH_SEPARATOR, 1) - .await?; - - Ok(result.common_prefixes.len() > 0 || result.contents.len() > 0)*/ - Ok(false) - } -} - -/*fn azure_to_object_error(err: Error, params: Vec) -> Option { - if err == nil { - return nil - } - - bucket := "" - object := "" - if len(params) >= 1 { - bucket = params[0] - } - if len(params) == 2 { - object = params[1] - } - - azureErr, ok := err.(*azcore.ResponseError) - if !ok { - // We don't interpret non Azure errors. As azure errors will - // have StatusCode to help to convert to object errors. - return err - } - - serviceCode := azureErr.ErrorCode - statusCode := azureErr.StatusCode - - azureCodesToObjectError(err, serviceCode, statusCode, bucket, object) -}*/ - -/*fn azure_codes_to_object_error(err: Error, service_code: String, status_code: i32, bucket: String, object: String) -> Option { - switch serviceCode { - case "ContainerNotFound", "ContainerBeingDeleted": - err = BucketNotFound{Bucket: bucket} - case "ContainerAlreadyExists": - err = BucketExists{Bucket: bucket} - case "InvalidResourceName": - err = BucketNameInvalid{Bucket: bucket} - case "RequestBodyTooLarge": - err = PartTooBig{} - case "InvalidMetadata": - err = UnsupportedMetadata{} - case "BlobAccessTierNotSupportedForAccountType": - err = NotImplemented{} - case "OutOfRangeInput": - err = ObjectNameInvalid{ - Bucket: bucket, - Object: object, - } - default: - switch statusCode { - case http.StatusNotFound: - if object != "" { - err = ObjectNotFound{ - Bucket: bucket, - Object: object, - } - } else { - err = BucketNotFound{Bucket: bucket} - } - case http.StatusBadRequest: - err = BucketNameInvalid{Bucket: bucket} - } - } - return err -}*/ diff --git a/crates/filemeta/src/headers.rs b/crates/filemeta/src/headers.rs deleted file mode 100644 index 687198a0..00000000 --- a/crates/filemeta/src/headers.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub const AMZ_META_UNENCRYPTED_CONTENT_LENGTH: &str = "X-Amz-Meta-X-Amz-Unencrypted-Content-Length"; -pub const AMZ_META_UNENCRYPTED_CONTENT_MD5: &str = "X-Amz-Meta-X-Amz-Unencrypted-Content-Md5"; - -pub const AMZ_STORAGE_CLASS: &str = "x-amz-storage-class"; - -pub const RESERVED_METADATA_PREFIX: &str = "X-RustFS-Internal-"; -pub const RESERVED_METADATA_PREFIX_LOWER: &str = "x-rustfs-internal-"; - -pub const RUSTFS_HEALING: &str = "X-Rustfs-Internal-healing"; -// pub const RUSTFS_DATA_MOVE: &str = "X-Rustfs-Internal-data-mov"; - -// pub const X_RUSTFS_INLINE_DATA: &str = "x-rustfs-inline-data"; - -pub const VERSION_PURGE_STATUS_KEY: &str = "X-Rustfs-Internal-purgestatus"; - -pub const X_RUSTFS_HEALING: &str = "X-Rustfs-Internal-healing"; -pub const X_RUSTFS_DATA_MOV: &str = "X-Rustfs-Internal-data-mov"; - -pub const AMZ_OBJECT_TAGGING: &str = "X-Amz-Tagging"; -pub const AMZ_BUCKET_REPLICATION_STATUS: &str = "X-Amz-Replication-Status"; -pub const AMZ_DECODED_CONTENT_LENGTH: &str = "X-Amz-Decoded-Content-Length"; - -pub const RUSTFS_DATA_MOVE: &str = "X-Rustfs-Internal-data-mov"; - -// Server-side encryption headers -pub const AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption"; -pub const AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str = "x-amz-server-side-encryption-aws-kms-key-id"; -pub const AMZ_SERVER_SIDE_ENCRYPTION_CONTEXT: &str = "x-amz-server-side-encryption-context"; -pub const AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = "x-amz-server-side-encryption-customer-algorithm"; -pub const AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = "x-amz-server-side-encryption-customer-key"; -pub const AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = "x-amz-server-side-encryption-customer-key-md5"; - -// SSE-C copy source headers -pub const AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = - "x-amz-copy-source-server-side-encryption-customer-algorithm"; -pub const AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = "x-amz-copy-source-server-side-encryption-customer-key"; -pub const AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = - "x-amz-copy-source-server-side-encryption-customer-key-md5"; diff --git a/crates/lock/src/fast_lock/benchmarks.rs b/crates/lock/src/fast_lock/benchmarks.rs deleted file mode 100644 index 930a5a81..00000000 --- a/crates/lock/src/fast_lock/benchmarks.rs +++ /dev/null @@ -1,325 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Benchmarks comparing fast lock vs old lock performance - -#[cfg(test)] -#[allow(dead_code)] // Temporarily disable benchmark tests -mod benchmarks { - use super::super::*; - use std::sync::Arc; - use std::time::{Duration, Instant}; - use tokio::task; - - /// Benchmark single-threaded lock operations - #[tokio::test] - async fn bench_single_threaded_fast_locks() { - let manager = Arc::new(FastObjectLockManager::new()); - let iterations = 10000; - - // Warm up - for i in 0..100 { - let _guard = manager - .acquire_write_lock("bucket", &format!("warm_{}", i), "owner") - .await - .unwrap(); - } - - // Benchmark write locks - let start = Instant::now(); - for i in 0..iterations { - let _guard = manager - .acquire_write_lock("bucket", &format!("object_{}", i), "owner") - .await - .unwrap(); - } - let duration = start.elapsed(); - - println!("Fast locks: {} write locks in {:?}", iterations, duration); - println!("Average: {:?} per lock", duration / iterations); - - let metrics = manager.get_metrics(); - println!("Fast path rate: {:.2}%", metrics.shard_metrics.fast_path_rate() * 100.0); - - // Should be much faster than old implementation - assert!(duration.as_millis() < 1000, "Should complete 10k locks in <1s"); - assert!(metrics.shard_metrics.fast_path_rate() > 0.95, "Should have >95% fast path rate"); - } - - /// Benchmark concurrent lock operations - #[tokio::test] - async fn bench_concurrent_fast_locks() { - let manager = Arc::new(FastObjectLockManager::new()); - let concurrent_tasks = 100; - let iterations_per_task = 100; - - let start = Instant::now(); - - let mut handles = Vec::new(); - for task_id in 0..concurrent_tasks { - let manager_clone = manager.clone(); - let handle = task::spawn(async move { - for i in 0..iterations_per_task { - let object_name = format!("obj_{}_{}", task_id, i); - let _guard = manager_clone - .acquire_write_lock("bucket", &object_name, &format!("owner_{}", task_id)) - .await - .unwrap(); - - // Simulate some work - tokio::task::yield_now().await; - } - }); - handles.push(handle); - } - - // Wait for all tasks - for handle in handles { - handle.await.unwrap(); - } - - let duration = start.elapsed(); - let total_ops = concurrent_tasks * iterations_per_task; - - println!("Concurrent fast locks: {} operations across {} tasks in {:?}", - total_ops, concurrent_tasks, duration); - println!("Throughput: {:.2} ops/sec", total_ops as f64 / duration.as_secs_f64()); - - let metrics = manager.get_metrics(); - println!("Fast path rate: {:.2}%", metrics.shard_metrics.fast_path_rate() * 100.0); - println!("Contention events: {}", metrics.shard_metrics.contention_events); - - // Should maintain high throughput even with concurrency - assert!(duration.as_millis() < 5000, "Should complete concurrent ops in <5s"); - } - - /// Benchmark contended lock operations - #[tokio::test] - async fn bench_contended_locks() { - let manager = Arc::new(FastObjectLockManager::new()); - let concurrent_tasks = 50; - let shared_objects = 10; // High contention on few objects - let iterations_per_task = 50; - - let start = Instant::now(); - - let mut handles = Vec::new(); - for task_id in 0..concurrent_tasks { - let manager_clone = manager.clone(); - let handle = task::spawn(async move { - for i in 0..iterations_per_task { - let object_name = format!("shared_{}", i % shared_objects); - - // Mix of read and write operations - if i % 3 == 0 { - // Write operation - if let Ok(_guard) = manager_clone - .acquire_write_lock("bucket", &object_name, &format!("owner_{}", task_id)) - .await - { - tokio::task::yield_now().await; - } - } else { - // Read operation - if let Ok(_guard) = manager_clone - .acquire_read_lock("bucket", &object_name, &format!("owner_{}", task_id)) - .await - { - tokio::task::yield_now().await; - } - } - } - }); - handles.push(handle); - } - - // Wait for all tasks - for handle in handles { - handle.await.unwrap(); - } - - let duration = start.elapsed(); - - println!("Contended locks: {} tasks on {} objects in {:?}", - concurrent_tasks, shared_objects, duration); - - let metrics = manager.get_metrics(); - println!("Total acquisitions: {}", metrics.shard_metrics.total_acquisitions()); - println!("Fast path rate: {:.2}%", metrics.shard_metrics.fast_path_rate() * 100.0); - println!("Average wait time: {:?}", metrics.shard_metrics.avg_wait_time()); - println!("Timeout rate: {:.2}%", metrics.shard_metrics.timeout_rate() * 100.0); - - // Even with contention, should maintain reasonable performance - assert!(metrics.shard_metrics.timeout_rate() < 0.1, "Should have <10% timeout rate"); - assert!(metrics.shard_metrics.avg_wait_time() < Duration::from_millis(100), "Avg wait should be <100ms"); - } - - /// Benchmark batch operations - #[tokio::test] - async fn bench_batch_operations() { - let manager = FastObjectLockManager::new(); - let batch_sizes = vec![10, 50, 100, 500]; - - for batch_size in batch_sizes { - // Create batch request - let mut batch = BatchLockRequest::new("batch_owner"); - for i in 0..batch_size { - batch = batch.add_write_lock("bucket", &format!("batch_obj_{}", i)); - } - - let start = Instant::now(); - let result = manager.acquire_locks_batch(batch).await; - let duration = start.elapsed(); - - assert!(result.all_acquired, "Batch should succeed"); - println!("Batch size {}: {:?} ({:.2} μs per lock)", - batch_size, - duration, - duration.as_micros() as f64 / batch_size as f64); - - // Batch should be much faster than individual acquisitions - assert!(duration.as_millis() < batch_size as u128 / 10, - "Batch should be 10x+ faster than individual locks"); - } - } - - /// Benchmark version-specific locks - #[tokio::test] - async fn bench_versioned_locks() { - let manager = Arc::new(FastObjectLockManager::new()); - let objects = 100; - let versions_per_object = 10; - - let start = Instant::now(); - - let mut handles = Vec::new(); - for obj_id in 0..objects { - let manager_clone = manager.clone(); - let handle = task::spawn(async move { - for version in 0..versions_per_object { - let _guard = manager_clone - .acquire_write_lock_versioned( - "bucket", - &format!("obj_{}", obj_id), - &format!("v{}", version), - "version_owner" - ) - .await - .unwrap(); - } - }); - handles.push(handle); - } - - for handle in handles { - handle.await.unwrap(); - } - - let duration = start.elapsed(); - let total_ops = objects * versions_per_object; - - println!("Versioned locks: {} version locks in {:?}", total_ops, duration); - println!("Throughput: {:.2} locks/sec", total_ops as f64 / duration.as_secs_f64()); - - let metrics = manager.get_metrics(); - println!("Fast path rate: {:.2}%", metrics.shard_metrics.fast_path_rate() * 100.0); - - // Versioned locks should not interfere with each other - assert!(metrics.shard_metrics.fast_path_rate() > 0.9, "Should maintain high fast path rate"); - } - - /// Compare with theoretical maximum performance - #[tokio::test] - async fn bench_theoretical_maximum() { - let manager = Arc::new(FastObjectLockManager::new()); - let iterations = 100000; - - // Measure pure fast path performance (no contention) - let start = Instant::now(); - for i in 0..iterations { - let _guard = manager - .acquire_write_lock("bucket", &format!("unique_{}", i), "owner") - .await - .unwrap(); - } - let duration = start.elapsed(); - - println!("Theoretical maximum: {} unique locks in {:?}", iterations, duration); - println!("Rate: {:.2} locks/sec", iterations as f64 / duration.as_secs_f64()); - println!("Latency: {:?} per lock", duration / iterations); - - let metrics = manager.get_metrics(); - println!("Fast path rate: {:.2}%", metrics.shard_metrics.fast_path_rate() * 100.0); - - // Should achieve very high performance with no contention - assert!(metrics.shard_metrics.fast_path_rate() > 0.99, "Should be nearly 100% fast path"); - assert!(duration.as_secs_f64() / (iterations as f64) < 0.0001, "Should be <100μs per lock"); - } - - /// Performance regression test - #[tokio::test] - async fn performance_regression_test() { - let manager = Arc::new(FastObjectLockManager::new()); - - // This test ensures we maintain performance targets - let test_cases = vec![ - ("single_thread", 1, 10000), - ("low_contention", 10, 1000), - ("high_contention", 100, 100), - ]; - - for (test_name, threads, ops_per_thread) in test_cases { - let start = Instant::now(); - - let mut handles = Vec::new(); - for thread_id in 0..threads { - let manager_clone = manager.clone(); - let handle = task::spawn(async move { - for op_id in 0..ops_per_thread { - let object = if threads == 1 { - format!("obj_{}_{}", thread_id, op_id) - } else { - format!("obj_{}", op_id % 100) // Create contention - }; - - let owner = format!("owner_{}", thread_id); - let _guard = manager_clone - .acquire_write_lock("bucket", object, owner) - .await - .unwrap(); - } - }); - handles.push(handle); - } - - for handle in handles { - handle.await.unwrap(); - } - - let duration = start.elapsed(); - let total_ops = threads * ops_per_thread; - let ops_per_sec = total_ops as f64 / duration.as_secs_f64(); - - println!("{}: {:.2} ops/sec", test_name, ops_per_sec); - - // Performance targets (adjust based on requirements) - match test_name { - "single_thread" => assert!(ops_per_sec > 50000.0, "Single thread should exceed 50k ops/sec"), - "low_contention" => assert!(ops_per_sec > 20000.0, "Low contention should exceed 20k ops/sec"), - "high_contention" => assert!(ops_per_sec > 5000.0, "High contention should exceed 5k ops/sec"), - _ => {} - } - } - } -} \ No newline at end of file diff --git a/crates/lock/src/fast_lock/mod.rs b/crates/lock/src/fast_lock/mod.rs index d6e89243..3cd4b9c9 100644 --- a/crates/lock/src/fast_lock/mod.rs +++ b/crates/lock/src/fast_lock/mod.rs @@ -37,9 +37,6 @@ pub mod shard; pub mod state; pub mod types; -// #[cfg(test)] -// pub mod benchmarks; // Temporarily disabled due to compilation issues - // Re-export main types pub use disabled_manager::DisabledLockManager; pub use guard::FastLockGuard; diff --git a/crates/s3select-api/src/query/datasource/mod.rs b/crates/s3select-api/src/query/datasource/mod.rs deleted file mode 100644 index 6238cfff..00000000 --- a/crates/s3select-api/src/query/datasource/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. diff --git a/crates/s3select-api/src/query/mod.rs b/crates/s3select-api/src/query/mod.rs index f21da83a..6e1529f9 100644 --- a/crates/s3select-api/src/query/mod.rs +++ b/crates/s3select-api/src/query/mod.rs @@ -18,7 +18,6 @@ use s3s::dto::SelectObjectContentInput; pub mod analyzer; pub mod ast; -pub mod datasource; pub mod dispatcher; pub mod execution; pub mod function; diff --git a/rustfs/src/storage/error.rs b/rustfs/src/storage/error.rs deleted file mode 100644 index e3b10cde..00000000 --- a/rustfs/src/storage/error.rs +++ /dev/null @@ -1,499 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use ecstore::error::StorageError; -use rustfs_common::error::Error; -use s3s::{s3_error, S3Error, S3ErrorCode}; -pub fn to_s3_error(err: Error) -> S3Error { - if let Some(storage_err) = err.downcast_ref::() { - return match storage_err { - StorageError::NotImplemented => s3_error!(NotImplemented), - StorageError::InvalidArgument(bucket, object, version_id) => { - s3_error!(InvalidArgument, "Invalid arguments provided for {}/{}-{}", bucket, object, version_id) - } - StorageError::MethodNotAllowed => s3_error!(MethodNotAllowed), - StorageError::BucketNotFound(bucket) => { - s3_error!(NoSuchBucket, "bucket not found {}", bucket) - } - StorageError::BucketNotEmpty(bucket) => s3_error!(BucketNotEmpty, "bucket not empty {}", bucket), - StorageError::BucketNameInvalid(bucket) => s3_error!(InvalidBucketName, "invalid bucket name {}", bucket), - StorageError::ObjectNameInvalid(bucket, object) => { - s3_error!(InvalidArgument, "invalid object name {}/{}", bucket, object) - } - StorageError::BucketExists(bucket) => s3_error!(BucketAlreadyExists, "{}", bucket), - StorageError::StorageFull => s3_error!(ServiceUnavailable, "Storage reached its minimum free drive threshold."), - StorageError::SlowDown => s3_error!(SlowDown, "Please reduce your request rate"), - StorageError::PrefixAccessDenied(bucket, object) => { - s3_error!(AccessDenied, "PrefixAccessDenied {}/{}", bucket, object) - } - StorageError::InvalidUploadIDKeyCombination(bucket, object) => { - s3_error!(InvalidArgument, "Invalid UploadID KeyCombination: {}/{}", bucket, object) - } - StorageError::MalformedUploadID(bucket) => s3_error!(InvalidArgument, "Malformed UploadID: {}", bucket), - StorageError::ObjectNameTooLong(bucket, object) => { - s3_error!(InvalidArgument, "Object name too long: {}/{}", bucket, object) - } - StorageError::ObjectNamePrefixAsSlash(bucket, object) => { - s3_error!(InvalidArgument, "Object name contains forward slash as prefix: {}/{}", bucket, object) - } - StorageError::ObjectNotFound(bucket, object) => s3_error!(NoSuchKey, "{}/{}", bucket, object), - StorageError::VersionNotFound(bucket, object, version_id) => { - s3_error!(NoSuchVersion, "{}/{}/{}", bucket, object, version_id) - } - StorageError::InvalidUploadID(bucket, object, version_id) => { - s3_error!(InvalidPart, "Invalid upload id: {}/{}-{}", bucket, object, version_id) - } - StorageError::InvalidVersionID(bucket, object, version_id) => { - s3_error!(InvalidArgument, "Invalid version id: {}/{}-{}", bucket, object, version_id) - } - // extended - StorageError::DataMovementOverwriteErr(bucket, object, version_id) => s3_error!( - InvalidArgument, - "invalid data movement operation, source and destination pool are the same for : {}/{}-{}", - bucket, - object, - version_id - ), - - // extended - StorageError::ObjectExistsAsDirectory(bucket, object) => { - s3_error!(InvalidArgument, "Object exists on :{} as directory {}", bucket, object) - } - StorageError::InvalidPart(bucket, object, version_id) => { - s3_error!( - InvalidPart, - "Specified part could not be found. PartNumber {}, Expected {}, got {}", - bucket, - object, - version_id - ) - } - StorageError::DoneForNow => s3_error!(InternalError, "DoneForNow"), - }; - } - - if is_err_file_not_found(&err) { - return S3Error::with_message(S3ErrorCode::NoSuchKey, format!(" ec err {}", err)); - } - - S3Error::with_message(S3ErrorCode::InternalError, format!(" ec err {}", err)) -} - -#[cfg(test)] -mod tests { - use super::*; - use s3s::S3ErrorCode; - - #[test] - fn test_to_s3_error_not_implemented() { - let storage_err = StorageError::NotImplemented; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NotImplemented); - } - - #[test] - fn test_to_s3_error_invalid_argument() { - let storage_err = - StorageError::InvalidArgument("test-bucket".to_string(), "test-object".to_string(), "test-version".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Invalid arguments provided")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-object")); - assert!(s3_err.message().unwrap().contains("test-version")); - } - - #[test] - fn test_to_s3_error_method_not_allowed() { - let storage_err = StorageError::MethodNotAllowed; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::MethodNotAllowed); - } - - #[test] - fn test_to_s3_error_bucket_not_found() { - let storage_err = StorageError::BucketNotFound("test-bucket".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchBucket); - assert!(s3_err.message().unwrap().contains("bucket not found")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - } - - #[test] - fn test_to_s3_error_bucket_not_empty() { - let storage_err = StorageError::BucketNotEmpty("test-bucket".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::BucketNotEmpty); - assert!(s3_err.message().unwrap().contains("bucket not empty")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - } - - #[test] - fn test_to_s3_error_bucket_name_invalid() { - let storage_err = StorageError::BucketNameInvalid("invalid-bucket-name".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidBucketName); - assert!(s3_err.message().unwrap().contains("invalid bucket name")); - assert!(s3_err.message().unwrap().contains("invalid-bucket-name")); - } - - #[test] - fn test_to_s3_error_object_name_invalid() { - let storage_err = StorageError::ObjectNameInvalid("test-bucket".to_string(), "invalid-object".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("invalid object name")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("invalid-object")); - } - - #[test] - fn test_to_s3_error_bucket_exists() { - let storage_err = StorageError::BucketExists("existing-bucket".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::BucketAlreadyExists); - assert!(s3_err.message().unwrap().contains("existing-bucket")); - } - - #[test] - fn test_to_s3_error_storage_full() { - let storage_err = StorageError::StorageFull; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::ServiceUnavailable); - assert!( - s3_err - .message() - .unwrap() - .contains("Storage reached its minimum free drive threshold") - ); - } - - #[test] - fn test_to_s3_error_slow_down() { - let storage_err = StorageError::SlowDown; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::SlowDown); - assert!(s3_err.message().unwrap().contains("Please reduce your request rate")); - } - - #[test] - fn test_to_s3_error_prefix_access_denied() { - let storage_err = StorageError::PrefixAccessDenied("test-bucket".to_string(), "test-prefix".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::AccessDenied); - assert!(s3_err.message().unwrap().contains("PrefixAccessDenied")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-prefix")); - } - - #[test] - fn test_to_s3_error_invalid_upload_id_key_combination() { - let storage_err = StorageError::InvalidUploadIDKeyCombination("test-bucket".to_string(), "test-object".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Invalid UploadID KeyCombination")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-object")); - } - - #[test] - fn test_to_s3_error_malformed_upload_id() { - let storage_err = StorageError::MalformedUploadID("malformed-id".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Malformed UploadID")); - assert!(s3_err.message().unwrap().contains("malformed-id")); - } - - #[test] - fn test_to_s3_error_object_name_too_long() { - let storage_err = StorageError::ObjectNameTooLong("test-bucket".to_string(), "very-long-object-name".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Object name too long")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("very-long-object-name")); - } - - #[test] - fn test_to_s3_error_object_name_prefix_as_slash() { - let storage_err = StorageError::ObjectNamePrefixAsSlash("test-bucket".to_string(), "/invalid-object".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!( - s3_err - .message() - .unwrap() - .contains("Object name contains forward slash as prefix") - ); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("/invalid-object")); - } - - #[test] - fn test_to_s3_error_object_not_found() { - let storage_err = StorageError::ObjectNotFound("test-bucket".to_string(), "missing-object".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchKey); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("missing-object")); - } - - #[test] - fn test_to_s3_error_version_not_found() { - let storage_err = - StorageError::VersionNotFound("test-bucket".to_string(), "test-object".to_string(), "missing-version".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchVersion); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-object")); - assert!(s3_err.message().unwrap().contains("missing-version")); - } - - #[test] - fn test_to_s3_error_invalid_upload_id() { - let storage_err = - StorageError::InvalidUploadID("test-bucket".to_string(), "test-object".to_string(), "invalid-upload-id".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidPart); - assert!(s3_err.message().unwrap().contains("Invalid upload id")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-object")); - assert!(s3_err.message().unwrap().contains("invalid-upload-id")); - } - - #[test] - fn test_to_s3_error_invalid_version_id() { - let storage_err = StorageError::InvalidVersionID( - "test-bucket".to_string(), - "test-object".to_string(), - "invalid-version-id".to_string(), - ); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Invalid version id")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-object")); - assert!(s3_err.message().unwrap().contains("invalid-version-id")); - } - - #[test] - fn test_to_s3_error_data_movement_overwrite_err() { - let storage_err = StorageError::DataMovementOverwriteErr( - "test-bucket".to_string(), - "test-object".to_string(), - "test-version".to_string(), - ); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("invalid data movement operation")); - assert!(s3_err.message().unwrap().contains("source and destination pool are the same")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("test-object")); - assert!(s3_err.message().unwrap().contains("test-version")); - } - - #[test] - fn test_to_s3_error_object_exists_as_directory() { - let storage_err = StorageError::ObjectExistsAsDirectory("test-bucket".to_string(), "directory-object".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Object exists on")); - assert!(s3_err.message().unwrap().contains("as directory")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - assert!(s3_err.message().unwrap().contains("directory-object")); - } - - #[test] - fn test_to_s3_error_insufficient_read_quorum() { - let storage_err = StorageError::InsufficientReadQuorum; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::SlowDown); - assert!( - s3_err - .message() - .unwrap() - .contains("Storage resources are insufficient for the read operation") - ); - } - - #[test] - fn test_to_s3_error_insufficient_write_quorum() { - let storage_err = StorageError::InsufficientWriteQuorum; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::SlowDown); - assert!( - s3_err - .message() - .unwrap() - .contains("Storage resources are insufficient for the write operation") - ); - } - - #[test] - fn test_to_s3_error_decommission_not_started() { - let storage_err = StorageError::DecommissionNotStarted; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("Decommission Not Started")); - } - - #[test] - fn test_to_s3_error_decommission_already_running() { - let storage_err = StorageError::DecommissionAlreadyRunning; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InternalError); - assert!(s3_err.message().unwrap().contains("Decommission already running")); - } - - #[test] - fn test_to_s3_error_volume_not_found() { - let storage_err = StorageError::VolumeNotFound("test-volume".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchBucket); - assert!(s3_err.message().unwrap().contains("bucket not found")); - assert!(s3_err.message().unwrap().contains("test-volume")); - } - - #[test] - fn test_to_s3_error_invalid_part() { - let storage_err = StorageError::InvalidPart(1, "expected-part".to_string(), "got-part".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidPart); - assert!(s3_err.message().unwrap().contains("Specified part could not be found")); - assert!(s3_err.message().unwrap().contains("PartNumber")); - assert!(s3_err.message().unwrap().contains("expected-part")); - assert!(s3_err.message().unwrap().contains("got-part")); - } - - #[test] - fn test_to_s3_error_done_for_now() { - let storage_err = StorageError::DoneForNow; - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InternalError); - assert!(s3_err.message().unwrap().contains("DoneForNow")); - } - - #[test] - fn test_to_s3_error_non_storage_error() { - // Test with a non-StorageError - let err = Error::from_string("Generic error message".to_string()); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InternalError); - assert!(s3_err.message().unwrap().contains("ec err")); - assert!(s3_err.message().unwrap().contains("Generic error message")); - } - - #[test] - fn test_to_s3_error_with_unicode_strings() { - let storage_err = StorageError::BucketNotFound("test-bucket".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchBucket); - assert!(s3_err.message().unwrap().contains("bucket not found")); - assert!(s3_err.message().unwrap().contains("test-bucket")); - } - - #[test] - fn test_to_s3_error_with_special_characters() { - let storage_err = StorageError::ObjectNameInvalid("bucket-with-@#$%".to_string(), "object-with-!@#$%^&*()".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::InvalidArgument); - assert!(s3_err.message().unwrap().contains("invalid object name")); - assert!(s3_err.message().unwrap().contains("bucket-with-@#$%")); - assert!(s3_err.message().unwrap().contains("object-with-!@#$%^&*()")); - } - - #[test] - fn test_to_s3_error_with_empty_strings() { - let storage_err = StorageError::BucketNotFound("".to_string()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchBucket); - assert!(s3_err.message().unwrap().contains("bucket not found")); - } - - #[test] - fn test_to_s3_error_with_very_long_strings() { - let long_bucket_name = "a".repeat(1000); - let storage_err = StorageError::BucketNotFound(long_bucket_name.clone()); - let err = Error::new(storage_err); - let s3_err = to_s3_error(err); - - assert_eq!(*s3_err.code(), S3ErrorCode::NoSuchBucket); - assert!(s3_err.message().unwrap().contains("bucket not found")); - assert!(s3_err.message().unwrap().contains(&long_bucket_name)); - } -}