remove unlinked file (#1258)

This commit is contained in:
houseme
2025-12-24 23:37:43 +08:00
committed by GitHub
parent 8bdff3fbcb
commit 7e75c9b1f5
12 changed files with 0 additions and 2390 deletions

View File

@@ -1,350 +0,0 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use lazy_static::lazy_static;
use rustfs_checksums::ChecksumAlgorithm;
use std::collections::HashMap;
use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart};
use crate::{disk::DiskAPI, store_api::GetObjectReader};
use rustfs_utils::crypto::{base64_decode, base64_encode};
use s3s::header::{
X_AMZ_CHECKSUM_ALGORITHM, X_AMZ_CHECKSUM_CRC32, X_AMZ_CHECKSUM_CRC32C, X_AMZ_CHECKSUM_SHA1, X_AMZ_CHECKSUM_SHA256,
};
use enumset::{EnumSet, EnumSetType, enum_set};
#[derive(Debug, EnumSetType, Default)]
#[enumset(repr = "u8")]
pub enum ChecksumMode {
#[default]
ChecksumNone,
ChecksumSHA256,
ChecksumSHA1,
ChecksumCRC32,
ChecksumCRC32C,
ChecksumCRC64NVME,
ChecksumFullObject,
}
lazy_static! {
static ref C_ChecksumMask: EnumSet<ChecksumMode> = {
let mut s = EnumSet::all();
s.remove(ChecksumMode::ChecksumFullObject);
s
};
static ref C_ChecksumFullObjectCRC32: EnumSet<ChecksumMode> =
enum_set!(ChecksumMode::ChecksumCRC32 | ChecksumMode::ChecksumFullObject);
static ref C_ChecksumFullObjectCRC32C: EnumSet<ChecksumMode> =
enum_set!(ChecksumMode::ChecksumCRC32C | ChecksumMode::ChecksumFullObject);
}
const AMZ_CHECKSUM_CRC64NVME: &str = "x-amz-checksum-crc64nvme";
impl ChecksumMode {
//pub const CRC64_NVME_POLYNOMIAL: i64 = 0xad93d23594c93659;
pub fn base(&self) -> ChecksumMode {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
1_u8 => ChecksumMode::ChecksumNone,
2_u8 => ChecksumMode::ChecksumSHA256,
4_u8 => ChecksumMode::ChecksumSHA1,
8_u8 => ChecksumMode::ChecksumCRC32,
16_u8 => ChecksumMode::ChecksumCRC32C,
32_u8 => ChecksumMode::ChecksumCRC64NVME,
_ => panic!("enum err."),
}
}
pub fn is(&self, t: ChecksumMode) -> bool {
*self & t == t
}
pub fn key(&self) -> String {
//match c & checksumMask {
match self {
ChecksumMode::ChecksumCRC32 => {
return X_AMZ_CHECKSUM_CRC32.to_string();
}
ChecksumMode::ChecksumCRC32C => {
return X_AMZ_CHECKSUM_CRC32C.to_string();
}
ChecksumMode::ChecksumSHA1 => {
return X_AMZ_CHECKSUM_SHA1.to_string();
}
ChecksumMode::ChecksumSHA256 => {
return X_AMZ_CHECKSUM_SHA256.to_string();
}
ChecksumMode::ChecksumCRC64NVME => {
return AMZ_CHECKSUM_CRC64NVME.to_string();
}
_ => {
return "".to_string();
}
}
}
pub fn can_composite(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
2_u8 => true,
4_u8 => true,
8_u8 => true,
16_u8 => true,
_ => false,
}
}
pub fn can_merge_crc(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
8_u8 => true,
16_u8 => true,
32_u8 => true,
_ => false,
}
}
pub fn full_object_requested(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
//C_ChecksumFullObjectCRC32 as u8 => true,
//C_ChecksumFullObjectCRC32C as u8 => true,
32_u8 => true,
_ => false,
}
}
pub fn key_capitalized(&self) -> String {
self.key()
}
pub fn raw_byte_len(&self) -> usize {
let u = EnumSet::from(*self).intersection(*C_ChecksumMask).as_u8();
if u == ChecksumMode::ChecksumCRC32 as u8 || u == ChecksumMode::ChecksumCRC32C as u8 {
4
} else if u == ChecksumMode::ChecksumSHA1 as u8 {
use sha1::Digest;
sha1::Sha1::output_size() as usize
} else if u == ChecksumMode::ChecksumSHA256 as u8 {
use sha2::Digest;
sha2::Sha256::output_size() as usize
} else if u == ChecksumMode::ChecksumCRC64NVME as u8 {
8
} else {
0
}
}
pub fn hasher(&self) -> Result<Box<dyn rustfs_checksums::http::HttpChecksum>, std::io::Error> {
match /*C_ChecksumMask & **/self {
ChecksumMode::ChecksumCRC32 => {
return Ok(ChecksumAlgorithm::Crc32.into_impl());
}
ChecksumMode::ChecksumCRC32C => {
return Ok(ChecksumAlgorithm::Crc32c.into_impl());
}
ChecksumMode::ChecksumSHA1 => {
return Ok(ChecksumAlgorithm::Sha1.into_impl());
}
ChecksumMode::ChecksumSHA256 => {
return Ok(ChecksumAlgorithm::Sha256.into_impl());
}
ChecksumMode::ChecksumCRC64NVME => {
return Ok(ChecksumAlgorithm::Crc64Nvme.into_impl());
}
_ => return Err(std::io::Error::other("unsupported checksum type")),
}
}
pub fn is_set(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
s.len() == 1
}
pub fn set_default(&mut self, t: ChecksumMode) {
if !self.is_set() {
*self = t;
}
}
pub fn encode_to_string(&self, b: &[u8]) -> Result<String, std::io::Error> {
if !self.is_set() {
return Ok("".to_string());
}
let mut h = self.hasher()?;
h.update(b);
let hash = h.finalize();
Ok(base64_encode(hash.as_ref()))
}
pub fn to_string(&self) -> String {
//match c & checksumMask {
match self {
ChecksumMode::ChecksumCRC32 => {
return "CRC32".to_string();
}
ChecksumMode::ChecksumCRC32C => {
return "CRC32C".to_string();
}
ChecksumMode::ChecksumSHA1 => {
return "SHA1".to_string();
}
ChecksumMode::ChecksumSHA256 => {
return "SHA256".to_string();
}
ChecksumMode::ChecksumNone => {
return "".to_string();
}
ChecksumMode::ChecksumCRC64NVME => {
return "CRC64NVME".to_string();
}
_ => {
return "<invalid>".to_string();
}
}
}
// pub fn check_sum_reader(&self, r: GetObjectReader) -> Result<Checksum, std::io::Error> {
// let mut h = self.hasher()?;
// Ok(Checksum::new(self.clone(), h.sum().as_bytes()))
// }
// pub fn check_sum_bytes(&self, b: &[u8]) -> Result<Checksum, std::io::Error> {
// let mut h = self.hasher()?;
// Ok(Checksum::new(self.clone(), h.sum().as_bytes()))
// }
pub fn composite_checksum(&self, p: &mut [ObjectPart]) -> Result<Checksum, std::io::Error> {
if !self.can_composite() {
return Err(std::io::Error::other("cannot do composite checksum"));
}
p.sort_by(|i, j| {
if i.part_num < j.part_num {
std::cmp::Ordering::Less
} else if i.part_num > j.part_num {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Equal
}
});
let c = self.base();
let crc_bytes = Vec::<u8>::with_capacity(p.len() * self.raw_byte_len() as usize);
let mut h = self.hasher()?;
h.update(crc_bytes.as_ref());
let hash = h.finalize();
Ok(Checksum {
checksum_type: self.clone(),
r: hash.as_ref().to_vec(),
computed: false,
})
}
pub fn full_object_checksum(&self, p: &mut [ObjectPart]) -> Result<Checksum, std::io::Error> {
todo!();
}
}
#[derive(Default)]
pub struct Checksum {
checksum_type: ChecksumMode,
r: Vec<u8>,
computed: bool,
}
#[allow(dead_code)]
impl Checksum {
fn new(t: ChecksumMode, b: &[u8]) -> Checksum {
if t.is_set() && b.len() == t.raw_byte_len() {
return Checksum {
checksum_type: t,
r: b.to_vec(),
computed: false,
};
}
Checksum::default()
}
#[allow(dead_code)]
fn new_checksum_string(t: ChecksumMode, s: &str) -> Result<Checksum, std::io::Error> {
let b = match base64_decode(s.as_bytes()) {
Ok(b) => b,
Err(err) => return Err(std::io::Error::other(err.to_string())),
};
if t.is_set() && b.len() == t.raw_byte_len() {
return Ok(Checksum {
checksum_type: t,
r: b,
computed: false,
});
}
Ok(Checksum::default())
}
fn is_set(&self) -> bool {
self.checksum_type.is_set() && self.r.len() == self.checksum_type.raw_byte_len()
}
fn encoded(&self) -> String {
if !self.is_set() {
return "".to_string();
}
base64_encode(&self.r)
}
#[allow(dead_code)]
fn raw(&self) -> Option<Vec<u8>> {
if !self.is_set() {
return None;
}
Some(self.r.clone())
}
}
pub fn add_auto_checksum_headers(opts: &mut PutObjectOptions) {
opts.user_metadata
.insert("X-Amz-Checksum-Algorithm".to_string(), opts.auto_checksum.to_string());
if opts.auto_checksum.full_object_requested() {
opts.user_metadata
.insert("X-Amz-Checksum-Type".to_string(), "FULL_OBJECT".to_string());
}
}
pub fn apply_auto_checksum(opts: &mut PutObjectOptions, all_parts: &mut [ObjectPart]) -> Result<(), std::io::Error> {
if opts.auto_checksum.can_composite() && !opts.auto_checksum.is(ChecksumMode::ChecksumFullObject) {
let crc = opts.auto_checksum.composite_checksum(all_parts)?;
opts.user_metadata = {
let mut hm = HashMap::new();
hm.insert(opts.auto_checksum.key(), crc.encoded());
hm
}
} else if opts.auto_checksum.can_merge_crc() {
let crc = opts.auto_checksum.full_object_checksum(all_parts)?;
opts.user_metadata = {
let mut hm = HashMap::new();
hm.insert(opts.auto_checksum.key_capitalized(), crc.encoded());
hm.insert("X-Amz-Checksum-Type".to_string(), "FULL_OBJECT".to_string());
hm
}
}
Ok(())
}

View File

@@ -1,270 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// use crate::error::StdError;
// use bytes::Bytes;
// use futures::pin_mut;
// use futures::stream::{Stream, StreamExt};
// use std::future::Future;
// use std::pin::Pin;
// use std::task::{Context, Poll};
// use transform_stream::AsyncTryStream;
// pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
// pub struct ChunkedStream<'a> {
// /// inner
// inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'a, Result<(), StdError>>>,
// remaining_length: usize,
// }
// impl<'a> ChunkedStream<'a> {
// pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
// where
// S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'a,
// {
// let inner = AsyncTryStream::<_, _, SyncBoxFuture<'a, Result<(), StdError>>>::new(|mut y| {
// #[allow(clippy::shadow_same)] // necessary for `pin_mut!`
// Box::pin(async move {
// pin_mut!(body);
// // Data left over from the previous call
// let mut prev_bytes = Bytes::new();
// let mut read_size = 0;
// loop {
// let data: Vec<Bytes> = {
// // Read a fixed-size chunk
// match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
// None => break,
// Some(Err(e)) => return Err(e),
// Some(Ok((data, remaining_bytes))) => {
// // debug!(
// // "content_length:{},read_size:{}, read_data data:{}, remaining_bytes: {} ",
// // content_length,
// // read_size,
// // data.len(),
// // remaining_bytes.len()
// // );
// prev_bytes = remaining_bytes;
// data
// }
// }
// };
// for bytes in data {
// read_size += bytes.len();
// // debug!("read_size {}, content_length {}", read_size, content_length,);
// y.yield_ok(bytes).await;
// }
// if read_size + prev_bytes.len() >= content_length {
// // debug!(
// // "Finished reading: read_size:{} + prev_bytes.len({}) == content_length {}",
// // read_size,
// // prev_bytes.len(),
// // content_length,
// // );
// // Pad with zeros?
// if !need_padding {
// y.yield_ok(prev_bytes).await;
// break;
// }
// let mut bytes = vec![0u8; chunk_size];
// let (left, _) = bytes.split_at_mut(prev_bytes.len());
// left.copy_from_slice(&prev_bytes);
// y.yield_ok(Bytes::from(bytes)).await;
// break;
// }
// }
// // debug!("chunked stream exit");
// Ok(())
// })
// });
// Self {
// inner,
// remaining_length: content_length,
// }
// }
// /// read data and return remaining bytes
// async fn read_data<S>(
// mut body: Pin<&mut S>,
// prev_bytes: Bytes,
// data_size: usize,
// ) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
// where
// S: Stream<Item = Result<Bytes, StdError>> + Send,
// {
// let mut bytes_buffer = Vec::new();
// // Run only once
// let mut push_data_bytes = |mut bytes: Bytes| {
// // debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len());
// if bytes.is_empty() {
// return None;
// }
// if data_size == 0 {
// return Some(bytes);
// }
// // Merge with the previous data
// if !prev_bytes.is_empty() {
// let need_size = data_size.wrapping_sub(prev_bytes.len());
// // debug!(
// // "Previous leftover {}, take {} now, total: {}",
// // prev_bytes.len(),
// // need_size,
// // prev_bytes.len() + need_size
// // );
// if bytes.len() >= need_size {
// let data = bytes.split_to(need_size);
// let mut combined = Vec::new();
// combined.extend_from_slice(&prev_bytes);
// combined.extend_from_slice(&data);
// // debug!(
// // "Fetched more bytes than needed: {}, merged result {}, remaining bytes {}",
// // need_size,
// // combined.len(),
// // bytes.len(),
// // );
// bytes_buffer.push(Bytes::from(combined));
// } else {
// let mut combined = Vec::new();
// combined.extend_from_slice(&prev_bytes);
// combined.extend_from_slice(&bytes);
// // debug!(
// // "Fetched fewer bytes than needed: {}, merged result {}, remaining bytes {}, return immediately",
// // need_size,
// // combined.len(),
// // bytes.len(),
// // );
// return Some(Bytes::from(combined));
// }
// }
// // If the fetched data exceeds the chunk, slice the required size
// if data_size <= bytes.len() {
// let n = bytes.len() / data_size;
// for _ in 0..n {
// let data = bytes.split_to(data_size);
// // println!("bytes_buffer.push: {}, remaining: {}", data.len(), bytes.len());
// bytes_buffer.push(data);
// }
// Some(bytes)
// } else {
// // Insufficient data
// Some(bytes)
// }
// };
// // Remaining data
// let remaining_bytes = 'outer: {
// // // Exit if the previous data was sufficient
// // if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
// // println!("Consuming leftovers");
// // break 'outer remaining_bytes;
// // }
// loop {
// match body.next().await? {
// Err(e) => return Some(Err(e)),
// Ok(bytes) => {
// if let Some(remaining_bytes) = push_data_bytes(bytes) {
// break 'outer remaining_bytes;
// }
// }
// }
// }
// };
// Some(Ok((bytes_buffer, remaining_bytes)))
// }
// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, StdError>>> {
// let ans = Pin::new(&mut self.inner).poll_next(cx);
// if let Poll::Ready(Some(Ok(ref bytes))) = ans {
// self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
// }
// ans
// }
// // pub fn exact_remaining_length(&self) -> usize {
// // self.remaining_length
// // }
// }
// impl Stream for ChunkedStream<'_> {
// type Item = Result<Bytes, StdError>;
// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// self.poll(cx)
// }
// fn size_hint(&self) -> (usize, Option<usize>) {
// (0, None)
// }
// }
// #[cfg(test)]
// mod test {
// use super::*;
// #[tokio::test]
// async fn test_chunked_stream() {
// let chunk_size = 4;
// let data1 = vec![1u8; 7777]; // 65536
// let data2 = vec![1u8; 7777]; // 65536
// let content_length = data1.len() + data2.len();
// let chunk1 = Bytes::from(data1);
// let chunk2 = Bytes::from(data2);
// let chunk_results: Vec<Result<Bytes, _>> = vec![Ok(chunk1), Ok(chunk2)];
// let stream = futures::stream::iter(chunk_results);
// let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true);
// loop {
// let ans1 = chunked_stream.next().await;
// if ans1.is_none() {
// break;
// }
// let bytes = ans1.unwrap().unwrap();
// assert!(bytes.len() == chunk_size)
// }
// // assert_eq!(ans1.unwrap(), chunk1_data.as_slice());
// }
// }

View File

@@ -1,59 +0,0 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, sync::Arc};
use crate::{
disk::{
error::{is_unformatted_disk, DiskError},
format::{DistributionAlgoVersion, FormatV3},
new_disk, DiskAPI, DiskInfo, DiskOption, DiskStore,
},
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult,
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
},
credentials::{Credentials, SignatureType,},
api_put_object_multipart::UploadPartParams,
};
use http::HeaderMap;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{error, info};
use url::Url;
struct HookReader {
source: GetObjectReader,
hook: GetObjectReader,
}
impl HookReader {
pub fn new(source: GetObjectReader, hook: GetObjectReader) -> HookReader {
HookReader {
source,
hook,
}
}
fn seek(&self, offset: i64, whence: i64) -> Result<i64> {
todo!();
}
fn read(&self, b: &[u8]) -> Result<i64> {
todo!();
}
}

View File

@@ -1,586 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::bitrot::{BitrotReader, BitrotWriter};
use crate::disk::error::{Error, Result};
use crate::disk::error_reduce::{reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS};
use crate::io::Etag;
use bytes::{Bytes, BytesMut};
use futures::future::join_all;
use reed_solomon_erasure::galois_8::ReedSolomon;
use smallvec::SmallVec;
use std::any::Any;
use std::io::ErrorKind;
use std::sync::{mpsc, Arc};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tracing::warn;
use tracing::{error, info};
use uuid::Uuid;
use crate::disk::error::DiskError;
#[derive(Default)]
pub struct Erasure {
data_shards: usize,
parity_shards: usize,
encoder: Option<ReedSolomon>,
pub block_size: usize,
_id: Uuid,
_buf: Vec<u8>,
}
impl Erasure {
pub fn new(data_shards: usize, parity_shards: usize, block_size: usize) -> Self {
// debug!(
// "Erasure new data_shards {},parity_shards {} block_size {} ",
// data_shards, parity_shards, block_size
// );
let mut encoder = None;
if parity_shards > 0 {
encoder = Some(ReedSolomon::new(data_shards, parity_shards).unwrap());
}
Erasure {
data_shards,
parity_shards,
block_size,
encoder,
_id: Uuid::new_v4(),
_buf: vec![0u8; block_size],
}
}
#[tracing::instrument(level = "info", skip(self, reader, writers))]
pub async fn encode<S>(
self: Arc<Self>,
mut reader: S,
writers: &mut [Option<BitrotWriter>],
// block_size: usize,
total_size: usize,
write_quorum: usize,
) -> Result<(usize, String)>
where
S: AsyncRead + Etag + Unpin + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(5);
let task = tokio::spawn(async move {
let mut buf = vec![0u8; self.block_size];
let mut total: usize = 0;
loop {
if total_size > 0 {
let new_len = {
let remain = total_size - total;
if remain > self.block_size { self.block_size } else { remain }
};
if new_len == 0 && total > 0 {
break;
}
buf.resize(new_len, 0u8);
match reader.read_exact(&mut buf).await {
Ok(res) => res,
Err(e) => {
if let ErrorKind::UnexpectedEof = e.kind() {
break;
} else {
return Err(e.into());
}
}
};
total += buf.len();
}
let blocks = Arc::new(Box::pin(self.clone().encode_data(&buf)?));
let _ = tx.send(blocks).await;
if total_size == 0 {
break;
}
}
let etag = reader.etag().await;
Ok((total, etag))
});
while let Some(blocks) = rx.recv().await {
let write_futures = writers.iter_mut().enumerate().map(|(i, w_op)| {
let i_inner = i;
let blocks_inner = blocks.clone();
async move {
if let Some(w) = w_op {
w.write(blocks_inner[i_inner].clone()).await.err()
} else {
Some(DiskError::DiskNotFound)
}
}
});
let errs = join_all(write_futures).await;
let none_count = errs.iter().filter(|&x| x.is_none()).count();
if none_count >= write_quorum {
if total_size == 0 {
break;
}
continue;
}
if let Some(err) = reduce_write_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, write_quorum) {
warn!("Erasure encode errs {:?}", &errs);
return Err(err);
}
}
task.await?
}
pub async fn decode<W>(
&self,
writer: &mut W,
readers: Vec<Option<BitrotReader>>,
offset: usize,
length: usize,
total_length: usize,
) -> (usize, Option<Error>)
where
W: AsyncWriteExt + Send + Unpin + 'static,
{
if length == 0 {
return (0, None);
}
let mut reader = ShardReader::new(readers, self, offset, total_length);
// debug!("ShardReader {:?}", &reader);
let start_block = offset / self.block_size;
let end_block = (offset + length) / self.block_size;
// debug!("decode block from {} to {}", start_block, end_block);
let mut bytes_written = 0;
for block_idx in start_block..=end_block {
let (block_offset, block_length) = if start_block == end_block {
(offset % self.block_size, length)
} else if block_idx == start_block {
let block_offset = offset % self.block_size;
(block_offset, self.block_size - block_offset)
} else if block_idx == end_block {
(0, (offset + length) % self.block_size)
} else {
(0, self.block_size)
};
if block_length == 0 {
// debug!("block_length == 0 break");
break;
}
// debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length);
let mut bufs = match reader.read().await {
Ok(bufs) => bufs,
Err(err) => return (bytes_written, Some(err)),
};
if self.parity_shards > 0 {
if let Err(err) = self.decode_data(&mut bufs) {
return (bytes_written, Some(err));
}
}
let written_n = match self
.write_data_blocks(writer, bufs, self.data_shards, block_offset, block_length)
.await
{
Ok(n) => n,
Err(err) => {
error!("write_data_blocks err {:?}", &err);
return (bytes_written, Some(err));
}
};
bytes_written += written_n;
// debug!("decode {} written_n {}, total_written: {} ", block_idx, written_n, bytes_written);
}
if bytes_written != length {
// debug!("bytes_written != length: {} != {} ", bytes_written, length);
return (bytes_written, Some(Error::other("erasure decode less data")));
}
(bytes_written, None)
}
async fn write_data_blocks<W>(
&self,
writer: &mut W,
bufs: Vec<Option<Vec<u8>>>,
data_blocks: usize,
offset: usize,
length: usize,
) -> Result<usize>
where
W: AsyncWrite + Send + Unpin + 'static,
{
if bufs.len() < data_blocks {
return Err(Error::other("read bufs not match data_blocks"));
}
let data_len: usize = bufs
.iter()
.take(data_blocks)
.filter(|v| v.is_some())
.map(|v| v.as_ref().unwrap().len())
.sum();
if data_len < length {
return Err(Error::other(format!("write_data_blocks data_len < length {} < {}", data_len, length)));
}
let mut offset = offset;
// debug!("write_data_blocks offset {}, length {}", offset, length);
let mut write = length;
let mut total_written = 0;
for opt_buf in bufs.iter().take(data_blocks) {
let buf = opt_buf.as_ref().unwrap();
if offset >= buf.len() {
offset -= buf.len();
continue;
}
let buf = &buf[offset..];
offset = 0;
// debug!("write_data_blocks write buf len {}", buf.len());
if write < buf.len() {
let buf = &buf[..write];
// debug!("write_data_blocks write buf less len {}", buf.len());
writer.write_all(buf).await?;
// debug!("write_data_blocks write done len {}", buf.len());
total_written += buf.len();
break;
}
writer.write_all(buf).await?;
let n = buf.len();
// debug!("write_data_blocks write done len {}", n);
write -= n;
total_written += n;
}
Ok(total_written)
}
pub fn total_shard_count(&self) -> usize {
self.data_shards + self.parity_shards
}
#[tracing::instrument(level = "info", skip_all, fields(data_len=data.len()))]
pub fn encode_data(self: Arc<Self>, data: &[u8]) -> Result<Vec<Bytes>> {
let (shard_size, total_size) = self.need_size(data.len());
// Generate the total length required for all shards
let mut data_buffer = BytesMut::with_capacity(total_size);
// Copy the source data
data_buffer.extend_from_slice(data);
data_buffer.resize(total_size, 0u8);
{
// Perform EC encoding; the results go into data_buffer
let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect();
// Only perform EC encoding when parity shards are present
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().encode(data_slices).map_err(Error::other)?;
}
}
// Zero-copy shards: every shard references data_buffer
let mut data_buffer = data_buffer.freeze();
let mut shards = Vec::with_capacity(self.total_shard_count());
for _ in 0..self.total_shard_count() {
let shard = data_buffer.split_to(shard_size);
shards.push(shard);
}
Ok(shards)
}
pub fn decode_data(&self, shards: &mut [Option<Vec<u8>>]) -> Result<()> {
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().reconstruct(shards).map_err(Error::other)?;
}
Ok(())
}
// The length per shard and the total required length
fn need_size(&self, data_size: usize) -> (usize, usize) {
let shard_size = self.shard_size(data_size);
(shard_size, shard_size * (self.total_shard_count()))
}
// Compute each shard size
pub fn shard_size(&self, data_size: usize) -> usize {
data_size.div_ceil(self.data_shards)
}
// returns final erasure size from original size.
pub fn shard_file_size(&self, total_size: usize) -> usize {
if total_size == 0 {
return 0;
}
let num_shards = total_size / self.block_size;
let last_block_size = total_size % self.block_size;
let last_shard_size = last_block_size.div_ceil(self.data_shards);
num_shards * self.shard_size(self.block_size) + last_shard_size
// When writing, EC pads the data so the last shard length should match
// if last_block_size != 0 {
// num_shards += 1
// }
// num_shards * self.shard_size(self.block_size)
}
// where erasure reading begins.
pub fn shard_file_offset(&self, start_offset: usize, length: usize, total_length: usize) -> usize {
let shard_size = self.shard_size(self.block_size);
let shard_file_size = self.shard_file_size(total_length);
let end_shard = (start_offset + length) / self.block_size;
let mut till_offset = end_shard * shard_size + shard_size;
if till_offset > shard_file_size {
till_offset = shard_file_size;
}
till_offset
}
pub async fn heal(
&self,
writers: &mut [Option<BitrotWriter>],
readers: Vec<Option<BitrotReader>>,
total_length: usize,
_prefer: &[bool],
) -> Result<()> {
info!(
"Erasure heal, writers len: {}, readers len: {}, total_length: {}",
writers.len(),
readers.len(),
total_length
);
if writers.len() != self.parity_shards + self.data_shards {
return Err(Error::other("invalid argument"));
}
let mut reader = ShardReader::new(readers, self, 0, total_length);
let start_block = 0;
let mut end_block = total_length / self.block_size;
if total_length % self.block_size != 0 {
end_block += 1;
}
let mut errs = Vec::new();
for _ in start_block..end_block {
let mut bufs = reader.read().await?;
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().reconstruct(&mut bufs).map_err(Error::other)?;
}
let shards = bufs.into_iter().flatten().map(Bytes::from).collect::<Vec<_>>();
if shards.len() != self.parity_shards + self.data_shards {
return Err(Error::other("can not reconstruct data"));
}
for (i, w) in writers.iter_mut().enumerate() {
if w.is_none() {
continue;
}
match w.as_mut().unwrap().write(shards[i].clone()).await {
Ok(_) => {}
Err(e) => {
info!("write failed, err: {:?}", e);
errs.push(e);
}
}
}
}
if !errs.is_empty() {
return Err(errs[0].clone().into());
}
Ok(())
}
}
#[async_trait::async_trait]
pub trait Writer {
fn as_any(&self) -> &dyn Any;
async fn write(&mut self, buf: Bytes) -> Result<()>;
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
#[async_trait::async_trait]
pub trait ReadAt {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
}
pub struct ShardReader {
readers: Vec<Option<BitrotReader>>, // Disk readers
data_block_count: usize, // Total number of shards
parity_block_count: usize,
shard_size: usize, // Block size per shard (read one block at a time)
shard_file_size: usize, // Total size of the shard file
offset: usize, // Offset within the shard
}
impl ShardReader {
pub fn new(readers: Vec<Option<BitrotReader>>, ec: &Erasure, offset: usize, total_length: usize) -> Self {
Self {
readers,
data_block_count: ec.data_shards,
parity_block_count: ec.parity_shards,
shard_size: ec.shard_size(ec.block_size),
shard_file_size: ec.shard_file_size(total_length),
offset: (offset / ec.block_size) * ec.shard_size(ec.block_size),
}
}
pub async fn read(&mut self) -> Result<Vec<Option<Vec<u8>>>> {
// let mut disks = self.readers;
let reader_length = self.readers.len();
// Length of the block to read
let mut read_length = self.shard_size;
if self.offset + read_length > self.shard_file_size {
read_length = self.shard_file_size - self.offset
}
if read_length == 0 {
return Ok(vec![None; reader_length]);
}
// debug!("shard reader read offset {}, shard_size {}", self.offset, read_length);
let mut futures = Vec::with_capacity(reader_length);
let mut errors = Vec::with_capacity(reader_length);
let mut ress = Vec::with_capacity(reader_length);
for disk in self.readers.iter_mut() {
// if disk.is_none() {
// ress.push(None);
// errors.push(Some(Error::new(DiskError::DiskNotFound)));
// continue;
// }
// let disk: &mut BitrotReader = disk.as_mut().unwrap();
let offset = self.offset;
futures.push(async move {
if let Some(disk) = disk {
disk.read_at(offset, read_length).await
} else {
Err(DiskError::DiskNotFound)
}
});
}
let results = join_all(futures).await;
for result in results {
match result {
Ok((res, _)) => {
ress.push(Some(res));
errors.push(None);
}
Err(e) => {
ress.push(None);
errors.push(Some(e));
}
}
}
if !self.can_decode(&ress) {
warn!("ec decode read ress {:?}", &ress);
warn!("ec decode read errors {:?}", &errors);
return Err(Error::other("shard reader read failed"));
}
self.offset += self.shard_size;
Ok(ress)
}
fn can_decode(&self, bufs: &[Option<Vec<u8>>]) -> bool {
let c = bufs.iter().filter(|v| v.is_some()).count();
if self.parity_block_count > 0 {
c >= self.data_block_count
} else {
c == self.data_block_count
}
}
}
// fn shards_to_option_shards<T: Clone>(shards: &[Vec<T>]) -> Vec<Option<Vec<T>>> {
// let mut result = Vec::with_capacity(shards.len());
// for v in shards.iter() {
// let inner: Vec<T> = v.clone();
// result.push(Some(inner));
// }
// result
// }
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_erasure() {
let data_shards = 3;
let parity_shards = 2;
let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
let ec = Erasure::new(data_shards, parity_shards, 1);
let shards = Arc::new(ec).encode_data(data).unwrap();
println!("shards:{:?}", shards);
let mut s: Vec<_> = shards
.iter()
.map(|d| if d.is_empty() { None } else { Some(d.to_vec()) })
.collect();
// let mut s = shards_to_option_shards(&shards);
// s[0] = None;
s[4] = None;
s[3] = None;
println!("sss:{:?}", &s);
let ec = Erasure::new(data_shards, parity_shards, 1);
ec.decode_data(&mut s).unwrap();
// ec.encoder.reconstruct(&mut s).unwrap();
println!("sss:{:?}", &s);
}
}

View File

@@ -20,7 +20,6 @@ pub mod batch_processor;
pub mod bitrot;
pub mod bucket;
pub mod cache_value;
mod chunk_stream;
pub mod compress;
pub mod config;
pub mod data_usage;

View File

@@ -1,231 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use std::collections::HashMap;
use std::sync::Arc;
use azure_core::http::{Body, ClientOptions, RequestContent};
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::*;
use crate::client::{
admin_handler_utils::AdminError,
api_put_object::PutObjectOptions,
transition_api::{Options, ReadCloser, ReaderImpl},
};
use crate::tier::{
tier_config::TierAzure,
warm_backend::{WarmBackend, WarmBackendGetOpts},
};
use tracing::warn;
const MAX_MULTIPART_PUT_OBJECT_SIZE: i64 = 1024 * 1024 * 1024 * 1024 * 5;
const MAX_PARTS_COUNT: i64 = 10000;
const _MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5;
const MIN_PART_SIZE: i64 = 1024 * 1024 * 128;
pub struct WarmBackendAzure {
pub client: Arc<BlobServiceClient>,
pub bucket: String,
pub prefix: String,
pub storage_class: String,
}
impl WarmBackendAzure {
pub async fn new(conf: &TierAzure, tier: &str) -> Result<Self, std::io::Error> {
if conf.access_key == "" || conf.secret_key == "" {
return Err(std::io::Error::other("both access and secret keys are required"));
}
if conf.bucket == "" {
return Err(std::io::Error::other("no bucket name was provided"));
}
let creds = StorageCredentials::access_key(conf.access_key.clone(), conf.secret_key.clone());
let client = ClientBuilder::new(conf.access_key.clone(), creds)
//.endpoint(conf.endpoint)
.blob_service_client();
let client = Arc::new(client);
Ok(Self {
client,
bucket: conf.bucket.clone(),
prefix: conf.prefix.strip_suffix("/").unwrap_or(&conf.prefix).to_owned(),
storage_class: "".to_string(),
})
}
/*pub fn tier(&self) -> *blob.AccessTier {
if self.storage_class == "" {
return None;
}
for t in blob.PossibleAccessTierValues() {
if strings.EqualFold(self.storage_class, t) {
return &t
}
}
None
}*/
pub fn get_dest(&self, object: &str) -> String {
let mut dest_obj = object.to_string();
if self.prefix != "" {
dest_obj = format!("{}/{}", &self.prefix, object);
}
return dest_obj;
}
}
#[async_trait::async_trait]
impl WarmBackend for WarmBackendAzure {
async fn put_with_meta(
&self,
object: &str,
r: ReaderImpl,
length: i64,
meta: HashMap<String, String>,
) -> Result<String, std::io::Error> {
let part_size = length;
let client = self.client.clone();
let container_client = client.container_client(self.bucket.clone());
let blob_client = container_client.blob_client(self.get_dest(object));
/*let res = blob_client
.upload(
RequestContent::from(match r {
ReaderImpl::Body(content_body) => content_body.to_vec(),
ReaderImpl::ObjectBody(mut content_body) => content_body.read_all().await?,
}),
false,
length as u64,
None,
)
.await
else {
return Err(std::io::Error::other("upload error"));
};*/
let Ok(res) = blob_client
.put_block_blob(match r {
ReaderImpl::Body(content_body) => content_body.to_vec(),
ReaderImpl::ObjectBody(mut content_body) => content_body.read_all().await?,
})
.content_type("text/plain")
.into_future()
.await
else {
return Err(std::io::Error::other("put_block_blob error"));
};
//self.ToObjectError(err, object)
Ok(res.request_id.to_string())
}
async fn put(&self, object: &str, r: ReaderImpl, length: i64) -> Result<String, std::io::Error> {
self.put_with_meta(object, r, length, HashMap::new()).await
}
async fn get(&self, object: &str, rv: &str, opts: WarmBackendGetOpts) -> Result<ReadCloser, std::io::Error> {
let client = self.client.clone();
let container_client = client.container_client(self.bucket.clone());
let blob_client = container_client.blob_client(self.get_dest(object));
blob_client.get();
todo!();
}
async fn remove(&self, object: &str, rv: &str) -> Result<(), std::io::Error> {
let client = self.client.clone();
let container_client = client.container_client(self.bucket.clone());
let blob_client = container_client.blob_client(self.get_dest(object));
blob_client.delete();
todo!();
}
async fn in_use(&self) -> Result<bool, std::io::Error> {
/*let result = self.client
.list_objects_v2(&self.bucket, &self.prefix, "", "", SLASH_SEPARATOR, 1)
.await?;
Ok(result.common_prefixes.len() > 0 || result.contents.len() > 0)*/
Ok(false)
}
}
/*fn azure_to_object_error(err: Error, params: Vec<String>) -> Option<error> {
if err == nil {
return nil
}
bucket := ""
object := ""
if len(params) >= 1 {
bucket = params[0]
}
if len(params) == 2 {
object = params[1]
}
azureErr, ok := err.(*azcore.ResponseError)
if !ok {
// We don't interpret non Azure errors. As azure errors will
// have StatusCode to help to convert to object errors.
return err
}
serviceCode := azureErr.ErrorCode
statusCode := azureErr.StatusCode
azureCodesToObjectError(err, serviceCode, statusCode, bucket, object)
}*/
/*fn azure_codes_to_object_error(err: Error, service_code: String, status_code: i32, bucket: String, object: String) -> Option<Error> {
switch serviceCode {
case "ContainerNotFound", "ContainerBeingDeleted":
err = BucketNotFound{Bucket: bucket}
case "ContainerAlreadyExists":
err = BucketExists{Bucket: bucket}
case "InvalidResourceName":
err = BucketNameInvalid{Bucket: bucket}
case "RequestBodyTooLarge":
err = PartTooBig{}
case "InvalidMetadata":
err = UnsupportedMetadata{}
case "BlobAccessTierNotSupportedForAccountType":
err = NotImplemented{}
case "OutOfRangeInput":
err = ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
default:
switch statusCode {
case http.StatusNotFound:
if object != "" {
err = ObjectNotFound{
Bucket: bucket,
Object: object,
}
} else {
err = BucketNotFound{Bucket: bucket}
}
case http.StatusBadRequest:
err = BucketNameInvalid{Bucket: bucket}
}
}
return err
}*/