format comment

This commit is contained in:
houseme
2025-05-27 13:56:19 +08:00
parent f1ef7149e3
commit 366fd98aeb
27 changed files with 204 additions and 211 deletions

32
TODO.md
View File

@@ -2,18 +2,18 @@
## 基础存储
- [x] EC可用读写数量判断 Read/WriteQuorum
- [ ] 优化后台并发执行,可中断, 传引用?
- [x] 小文件存储到metafile, inlinedata
- [x] 完善bucketmeta
- [x] EC 可用读写数量判断 Read/WriteQuorum
- [ ] 优化后台并发执行,可中断传引用?
- [x] 小文件存储到 metafile, inlinedata
- [x] 完善 bucketmeta
- [x] 对象锁
- [x] 边读写边hash实现reader嵌套
- [x] 远程rpc
- [x] 错误类型判断,程序中判断错误类型,如何统一错误
- [x] 优化xlmeta, 自定义msg数据结构
- [ ] 优化io.reader 参考 GetObjectNInfo 方便io copy 如果 异步写,再平衡
- [x] 边读写边 hash实现 reader 嵌套
- [x] 远程 rpc
- [x] 错误类型判断程序中判断错误类型,如何统一错误
- [x] 优化 xlmeta, 自定义 msg 数据结构
- [ ] 优化 io.reader 参考 GetObjectNInfo 方便 io copy 如果 异步写,再平衡
- [ ] 代码优化 使用范型?
- [ ] 抽象出metafile存储
- [ ] 抽象出 metafile 存储
## 基础功能
@@ -43,26 +43,26 @@
## 扩展功能
- [ ] 用户管理
- [ ] Policy管理
- [ ] Policy 管理
- [ ] AK/SK分配管理
- [ ] data scanner统计和对象修复
- [ ] data scanner 统计和对象修复
- [ ] 桶配额
- [ ] 桶只读
- [ ] 桶复制
- [ ] 桶事件通知
- [ ] 桶公开、桶私有
- [ ] 对象生命周期管理
- [ ] prometheus对接
- [ ] prometheus 对接
- [ ] 日志收集和日志外发
- [ ] 对象压缩
- [ ] STS
- [ ] 分层阿里云、腾讯云、S3远程对接
- [ ] 分层阿里云、腾讯云、S3 远程对接)
## 性能优化
- [ ] bitrot impl AsyncRead/AsyncWrite
- [ ] erasure 并发读写
- [x] 完善删除逻辑, 并发处理,先移动到回收站,
- [x] 完善删除逻辑并发处理先移动到回收站
- [ ] 空间不足时清空回收站
- [ ] list_object 使用reader传输
- [ ] list_object 使用 reader 传输

View File

@@ -8,14 +8,14 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct Token {
pub name: String, // 应用ID
pub expired: u64, // 到期时间 (UNIX时间戳)
pub name: String, // 应用 ID
pub expired: u64, // 到期时间 (UNIX 时间戳)
}
// 公钥生成Token
// [token] Token对象
// 公钥生成 Token
// [token] Token 对象
// [key] 公钥字符串
// 返回base64处理的加密字符串
// 返回 base64 处理的加密字符串
pub fn gencode(token: &Token, key: &str) -> Result<String> {
let data = serde_json::to_vec(token)?;
let public_key = RsaPublicKey::from_public_key_pem(key)?;
@@ -23,10 +23,10 @@ pub fn gencode(token: &Token, key: &str) -> Result<String> {
Ok(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&encrypted_data))
}
// 私钥解析Token
// [token] base64处理的加密字符串
// 私钥解析 Token
// [token] base64 处理的加密字符串
// [key] 私钥字符串
// 返回Token对象
// 返回 Token 对象
pub fn parse(token: &str, key: &str) -> Result<Token> {
let encrypted_data = base64_simd::URL_SAFE_NO_PAD.decode_to_vec(token.as_bytes())?;
let private_key = RsaPrivateKey::from_pkcs8_pem(key)?;

View File

@@ -15,8 +15,8 @@ pub const VERSION: &str = "0.0.1";
pub const DEFAULT_LOG_LEVEL: &str = "info";
/// Default configuration use stdout
/// Default value: true
pub const USE_STDOUT: bool = true;
/// Default value: false
pub const USE_STDOUT: bool = false;
/// Default configuration sample ratio
/// Default value: 1.0

View File

@@ -286,10 +286,10 @@ mod tests {
use std::mem;
let size = mem::size_of::<Error>();
// 错误类型应该相对紧凑考虑到包含多种错误类型96字节是合理的
// 错误类型应该相对紧凑考虑到包含多种错误类型96 字节是合理的
assert!(size <= 128, "Error size should be reasonable, got {} bytes", size);
// 测试Option<Error>的大小
// 测试 Option<Error>的大小
let option_size = mem::size_of::<Option<Error>>();
assert!(option_size <= 136, "Option<Error> should be efficient, got {} bytes", option_size);
}
@@ -321,7 +321,7 @@ mod tests {
_ => panic!("Expected Custom error variant"),
}
// 测试包含Unicode字符的消息
// 测试包含 Unicode 字符的消息
let unicode_error = Error::custom("🚀 Unicode test 测试 🎉");
match unicode_error {
Error::Custom(msg) => assert!(msg.contains('🚀')),
@@ -405,11 +405,11 @@ mod tests {
let display_str = error.to_string();
let debug_str = format!("{:?}", error);
// DisplayDebug都不应该为空
// DisplayDebug 都不应该为空
assert!(!display_str.is_empty());
assert!(!debug_str.is_empty());
// Debug输出通常包含更多信息但不是绝对的
// Debug 输出通常包含更多信息,但不是绝对的
// 这里我们只验证两者都有内容即可
assert!(debug_str.len() > 0);
assert!(display_str.len() > 0);

View File

@@ -716,7 +716,7 @@ mod tests {
#[test]
fn test_compression_format_clone_and_copy() {
// 测试CompressionFormat是否可以被复制
// 测试 CompressionFormat 是否可以被复制
let format = CompressionFormat::Gzip;
let format_copy = format;
@@ -729,7 +729,7 @@ mod tests {
#[test]
fn test_compression_format_match_exhaustiveness() {
// 测试match语句的完整性
// 测试 match 语句的完整性
fn handle_format(format: CompressionFormat) -> &'static str {
match format {
CompressionFormat::Gzip => "gzip",
@@ -906,7 +906,7 @@ mod tests {
#[test]
fn test_zip_entry_creation() {
// 测试ZIP条目信息创建
// 测试 ZIP 条目信息创建
let entry = ZipEntry {
name: "test.txt".to_string(),
size: 1024,
@@ -934,7 +934,7 @@ mod tests {
];
for level in levels {
// 验证每个级别都有对应的Debug实现
// 验证每个级别都有对应的 Debug 实现
let _debug_str = format!("{:?}", level);
}
}
@@ -960,7 +960,7 @@ mod tests {
// 验证支持状态检查
let _supported = format.is_supported();
// 验证Debug实现
// 验证 Debug 实现
let _debug = format!("{:?}", format);
}
}
@@ -991,7 +991,7 @@ mod tests {
// .await
// {
// Ok(_) => println!("解压成功!"),
// Err(e) => println!("解压失败: {}", e),
// Err(e) => println!("解压失败{}", e),
// }
// Ok(())

View File

@@ -5,7 +5,7 @@ sample_ratio = 2.0
meter_interval = 30
service_name = "rustfs"
service_version = "0.0.1"
environment = "develop"
environment = "production" # Default is "production" if not specified
logger_level = "info"
local_logging_enabled = true

View File

@@ -2,16 +2,16 @@ use common::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义QuotaType枚举类型
// 定义 QuotaType 枚举类型
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum QuotaType {
Hard,
}
// 定义BucketQuota结构体
// 定义 BucketQuota 结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketQuota {
quota: Option<u64>, // 使用Option来表示可能不存在的字段
quota: Option<u64>, // 使用 Option 来表示可能不存在的字段
size: u64,

View File

@@ -25,7 +25,7 @@ pub struct LatencyStat {
max: Duration, // 最大延迟
}
// 定义BucketTarget结构体
// 定义 BucketTarget 结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketTarget {
source_bucket: String,

View File

@@ -64,7 +64,7 @@
// // content_length,
// // );
// // 填充0
// // 填充 0
// if !need_padding {
// y.yield_ok(prev_bytes).await;
// break;
@@ -129,7 +129,7 @@
// combined.extend_from_slice(&data);
// // debug!(
// // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
// // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes 剩余:{}",
// // need_size,
// // combined.len(),
// // bytes.len(),
@@ -142,7 +142,7 @@
// combined.extend_from_slice(&bytes);
// // debug!(
// // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
// // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes 剩余:{},直接返回",
// // need_size,
// // combined.len(),
// // bytes.len(),
@@ -152,14 +152,14 @@
// }
// }
// // 取到的数据比需要的块大从bytes中截取需要的块大小
// // 取到的数据比需要的块大,从 bytes 中截取需要的块大小
// if data_size <= bytes.len() {
// let n = bytes.len() / data_size;
// for _ in 0..n {
// let data = bytes.split_to(data_size);
// // println!("bytes_buffer.push: {} 剩余:{}", data.len(), bytes.len());
// // println!("bytes_buffer.push: {},剩余:{}", data.len(), bytes.len());
// bytes_buffer.push(data);
// }

View File

@@ -341,7 +341,7 @@ pub fn os_err_to_file_err(e: io::Error) -> Error {
// io::ErrorKind::UnexpectedEof => todo!(),
// io::ErrorKind::OutOfMemory => todo!(),
// io::ErrorKind::Other => todo!(),
// TODO: 把不支持的king用字符串处理
// TODO: 把不支持的 king 用字符串处理
_ => Error::new(e),
}
}
@@ -355,7 +355,7 @@ pub struct FileAccessDeniedWithContext {
impl std::fmt::Display for FileAccessDeniedWithContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "访问文件 '{}' 被拒绝: {}", self.path.display(), self.source)
write!(f, "访问文件 '{}' 被拒绝{}", self.path.display(), self.source)
}
}

View File

@@ -237,7 +237,7 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self, fi))]
async fn rename_data(
&self,
src_volume: &str,

View File

@@ -199,7 +199,7 @@ pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>
}
if let Some(parent) = dir_path.as_ref().parent() {
// 不支持递归直接create_dir_all了
// 不支持递归,直接 create_dir_all
if let Err(e) = utils::fs::make_dir_all(&parent).await {
if os_is_exist(&e) {
return Ok(());

View File

@@ -302,7 +302,7 @@ impl Erasure {
// ec encode, 结果会写进 data_buffer
let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect();
// partiy 数量大于0 才ec
// partiy 数量大于 0 才 ec
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().encode(data_slices)?;
}
@@ -348,7 +348,7 @@ impl Erasure {
let last_shard_size = last_block_size.div_ceil(self.data_shards);
num_shards * self.shard_size(self.block_size) + last_shard_size
// // 因为写入的时候ec需要补全所以最后一个长度应该也是一样的
// // 因为写入的时候 ec 需要补全,所以最后一个长度应该也是一样的
// if last_block_size != 0 {
// num_shards += 1
// }
@@ -446,7 +446,7 @@ pub struct ShardReader {
parity_block_count: usize,
shard_size: usize, // 每个分片的块大小 一次读取一块
shard_file_size: usize, // 分片文件总长度
offset: usize, // 在分片中的offset
offset: usize, // 在分片中的 offset
}
impl ShardReader {

View File

@@ -71,9 +71,9 @@ impl FileMeta {
Ok(xl)
}
// check_xl2_v1 读xl文件头返回后续内容版本信息
// check_xl2_v1 读 xl 文件头,返回后续内容,版本信息
// checkXL2V1
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip_all)]
pub fn check_xl2_v1(buf: &[u8]) -> Result<(&[u8], u16, u16)> {
if buf.len() < 8 {
return Err(Error::msg("xl file header not exists"));
@@ -92,11 +92,11 @@ impl FileMeta {
Ok((&buf[8..], major, minor))
}
// 固定u32
// 固定 u32
pub fn read_bytes_header(buf: &[u8]) -> Result<(u32, &[u8])> {
let (mut size_buf, _) = buf.split_at(5);
// 取meta数据buf = crc + data
// 取 meta 数据buf = crc + data
let bin_len = rmp::decode::read_bin_len(&mut size_buf)?;
Ok((bin_len, &buf[5..]))
@@ -110,7 +110,7 @@ impl FileMeta {
let (mut size_buf, buf) = buf.split_at(5);
// 取meta数据buf = crc + data
// 取 meta 数据buf = crc + data
let bin_len = rmp::decode::read_bin_len(&mut size_buf)?;
let (meta, buf) = buf.split_at(bin_len as usize);
@@ -130,7 +130,7 @@ impl FileMeta {
self.data.validate()?;
}
// 解析meta
// 解析 meta
if !meta.is_empty() {
let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta)?;
@@ -168,8 +168,8 @@ impl FileMeta {
Ok(i)
}
// decode_xl_headers 解析 meta 头,返回 (versions数量xl_header_version, xl_meta_version, 已读数据长度)
#[tracing::instrument]
// decode_xl_headers 解析 meta 头,返回 (versions 数量xl_header_version, xl_meta_version, 已读数据长度)
#[tracing::instrument(level = "debug", skip_all)]
fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> {
let mut cur = Cursor::new(buf);
@@ -280,7 +280,7 @@ impl FileMeta {
rmp::encode::write_bin(&mut wr, &ver.meta)?;
}
// 更新bin长度
// 更新 bin 长度
let data_len = wr.len() - offset;
byteorder::BigEndian::write_u32(&mut wr[offset - 4..offset], data_len as u32);
@@ -368,7 +368,7 @@ impl FileMeta {
Err(Error::new(DiskError::FileVersionNotFound))
}
// shard_data_dir_count 查询 viddata_dir的数量
// shard_data_dir_count 查询 viddata_dir 的数量
#[tracing::instrument(level = "debug", skip_all)]
pub fn shard_data_dir_count(&self, vid: &Option<Uuid>, data_dir: &Option<Uuid>) -> usize {
self.versions
@@ -494,7 +494,7 @@ impl FileMeta {
Err(Error::msg("add_version failed"))
}
// delete_version 删除版本返回data_dir
// delete_version 删除版本,返回 data_dir
pub fn delete_version(&mut self, fi: &FileInfo) -> Result<Option<Uuid>> {
let mut ventry = FileMetaVersion::default();
if fi.deleted {
@@ -710,7 +710,7 @@ impl FileMetaVersion {
}
}
// decode_data_dir_from_meta 从 meta中读取data_dir TODO: 直接从meta buf中只解析出data_dir, msg.skip
// decode_data_dir_from_meta 从 meta 中读取 data_dir TODO: 直接从 meta buf 中只解析出 data_dir, msg.skip
pub fn decode_data_dir_from_meta(buf: &[u8]) -> Result<Option<Uuid>> {
let mut ver = Self::default();
ver.unmarshal_msg(buf)?;
@@ -733,7 +733,7 @@ impl FileMetaVersion {
// println!("unmarshal_msg fields name len() {}", &str_len);
// Vec::with_capacity(str_len) 失败vec!正常
// Vec::with_capacity(str_len) 失败vec! 正常
let mut field_buff = vec![0u8; str_len as usize];
cur.read_exact(&mut field_buff)?;
@@ -1143,7 +1143,7 @@ impl From<FileMetaVersion> for FileMetaVersionHeader {
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
// 因为自定义message_pack所以一定要保证字段顺序
// 因为自定义 message_pack所以一定要保证字段顺序
pub struct MetaObject {
pub version_id: Option<Uuid>, // Version ID
pub data_dir: Option<Uuid>, // Data dir ID
@@ -1182,7 +1182,7 @@ impl MetaObject {
// println!("unmarshal_msg fields name len() {}", &str_len);
// Vec::with_capacity(str_len) 失败vec!正常
// Vec::with_capacity(str_len) 失败vec! 正常
let mut field_buff = vec![0u8; str_len as usize];
cur.read_exact(&mut field_buff)?;
@@ -1413,7 +1413,7 @@ impl MetaObject {
Ok(cur.position())
}
// marshal_msg 自定义 messagepack 命名与go一致
// marshal_msg 自定义 messagepack 命名与 go 一致
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut len: u32 = 18;
let mut mask: u32 = 0;
@@ -1682,7 +1682,7 @@ impl MetaDeleteMarker {
let str_len = rmp::decode::read_str_len(&mut cur)?;
// Vec::with_capacity(str_len) 失败vec!正常
// Vec::with_capacity(str_len) 失败vec! 正常
let mut field_buff = vec![0u8; str_len as usize];
cur.read_exact(&mut field_buff)?;
@@ -2175,7 +2175,6 @@ pub async fn read_xl_meta_no_data<R: AsyncRead + Unpin>(reader: &mut R, size: us
}
#[cfg(test)]
mod test {
use super::*;
#[test]
@@ -2242,7 +2241,7 @@ mod test {
#[test]
#[tracing::instrument]
fn test_marshal_metaversion() {
let mut fi = FileInfo::new("tset", 3, 2);
let mut fi = FileInfo::new("test", 3, 2);
fi.version_id = Some(Uuid::new_v4());
fi.mod_time = Some(OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap());
let mut obj = FileMetaVersion::from(fi);
@@ -2257,7 +2256,7 @@ mod test {
// println!("obj2 {:?}", &obj2);
// 时间截不一致- -
// 时间截不一致 - -
assert_eq!(obj, obj2);
assert_eq!(obj.get_version_id(), obj2.get_version_id());
assert_eq!(obj.write_version, obj2.write_version);
@@ -2276,7 +2275,7 @@ mod test {
let mut obj2 = FileMetaVersionHeader::default();
obj2.unmarshal_msg(&encoded).unwrap();
// 时间截不一致- -
// 时间截不一致 - -
assert_eq!(obj, obj2);
assert_eq!(obj.version_id, obj2.version_id);
assert_eq!(obj.version_id, vid);
@@ -2520,7 +2519,7 @@ mod test {
fi3.mod_time = Some(time3);
fm.add_version(fi3).unwrap();
// Sort first to ensure latest is at the front
// Sort first to ensure latest is at the front
fm.sort_by_mod_time();
// Should return the first version's mod time (lastest_mod_time returns first version's time)
@@ -2690,7 +2689,7 @@ mod test {
assert!(result.is_err());
}
#[test]
#[test]
fn test_is_latest_delete_marker() {
// Test the is_latest_delete_marker function with simple data
// Since the function is complex and requires specific XL format,
@@ -2798,9 +2797,7 @@ async fn test_file_info_from_raw() {
let encoded = fm.marshal_msg().unwrap();
let raw_info = RawFileInfo {
buf: encoded,
};
let raw_info = RawFileInfo { buf: encoded };
let result = file_info_from_raw(raw_info, "test-bucket", "test-object", false).await;
assert!(result.is_ok());
@@ -2833,26 +2830,26 @@ fn test_file_meta_load_function() {
assert!(result.is_err());
}
#[test]
fn test_file_meta_read_bytes_header() {
// Test read_bytes_header function - it expects the first 5 bytes to be msgpack bin length
// Create a buffer with proper msgpack bin format for a 9-byte binary
let mut buf = vec![0xc4, 0x09]; // msgpack bin8 format for 9 bytes
buf.extend_from_slice(b"test data"); // 9 bytes of data
buf.extend_from_slice(b"extra"); // additional data
#[test]
fn test_file_meta_read_bytes_header() {
// Test read_bytes_header function - it expects the first 5 bytes to be msgpack bin length
// Create a buffer with proper msgpack bin format for a 9-byte binary
let mut buf = vec![0xc4, 0x09]; // msgpack bin8 format for 9 bytes
buf.extend_from_slice(b"test data"); // 9 bytes of data
buf.extend_from_slice(b"extra"); // additional data
let result = FileMeta::read_bytes_header(&buf);
assert!(result.is_ok());
let (length, remaining) = result.unwrap();
assert_eq!(length, 9); // "test data" length
// remaining should be everything after the 5-byte header (but we only have 2-byte header)
assert_eq!(remaining.len(), buf.len() - 5);
let result = FileMeta::read_bytes_header(&buf);
assert!(result.is_ok());
let (length, remaining) = result.unwrap();
assert_eq!(length, 9); // "test data" length
// remaining should be everything after the 5-byte header (but we only have 2-byte header)
assert_eq!(remaining.len(), buf.len() - 5);
// Test with buffer too small
let small_buf = vec![0u8; 2];
let result = FileMeta::read_bytes_header(&small_buf);
assert!(result.is_err());
}
// Test with buffer too small
let small_buf = vec![0u8; 2];
let result = FileMeta::read_bytes_header(&small_buf);
assert!(result.is_err());
}
#[test]
fn test_file_meta_get_set_idx() {
@@ -3080,11 +3077,11 @@ fn test_file_meta_version_header_ordering() {
// Test partial_cmp
assert!(header1.partial_cmp(&header2).is_some());
// Test cmp - header2 should be greater (newer)
use std::cmp::Ordering;
assert_eq!(header1.cmp(&header2), Ordering::Less); // header1 has earlier time
assert_eq!(header2.cmp(&header1), Ordering::Greater); // header2 has later time
assert_eq!(header1.cmp(&header1), Ordering::Equal);
// Test cmp - header2 should be greater (newer)
use std::cmp::Ordering;
assert_eq!(header1.cmp(&header2), Ordering::Less); // header1 has earlier time
assert_eq!(header2.cmp(&header1), Ordering::Greater); // header2 has later time
assert_eq!(header1.cmp(&header1), Ordering::Equal);
}
#[test]
@@ -3110,10 +3107,7 @@ fn test_merge_file_meta_versions_edge_cases() {
version2.header.version_id = Some(Uuid::new_v4());
version2.header.mod_time = Some(OffsetDateTime::from_unix_timestamp(2000).unwrap());
let versions = vec![
vec![version1.clone()],
vec![version2.clone()],
];
let versions = vec![vec![version1.clone()], vec![version2.clone()]];
let _merged_strict = merge_file_meta_versions(1, true, 10, &versions);
let merged_non_strict = merge_file_meta_versions(1, false, 10, &versions);
@@ -3191,9 +3185,7 @@ async fn test_get_file_info_edge_cases() {
#[tokio::test]
async fn test_file_info_from_raw_edge_cases() {
// Test with empty buffer
let empty_raw = RawFileInfo {
buf: vec![],
};
let empty_raw = RawFileInfo { buf: vec![] };
let result = file_info_from_raw(empty_raw, "bucket", "object", false).await;
assert!(result.is_err());
@@ -3227,12 +3219,12 @@ fn test_meta_object_edge_cases() {
obj.data_dir = None;
assert!(obj.use_data_dir());
// Test use_inlinedata (always returns false in current implementation)
obj.size = 128 * 1024; // 128KB threshold
assert!(!obj.use_inlinedata()); // Should be false
// Test use_inlinedata (always returns false in current implementation)
obj.size = 128 * 1024; // 128KB threshold
assert!(!obj.use_inlinedata()); // Should be false
obj.size = 128 * 1024 - 1;
assert!(!obj.use_inlinedata()); // Should also be false (always false)
obj.size = 128 * 1024 - 1;
assert!(!obj.use_inlinedata()); // Should also be false (always false)
}
#[test]
@@ -3244,17 +3236,17 @@ fn test_file_meta_version_header_edge_cases() {
header.ec_m = 0;
assert!(!header.has_ec());
// Test matches_not_strict with different signatures but same version_id
let mut other = FileMetaVersionHeader::default();
let version_id = Some(Uuid::new_v4());
header.version_id = version_id;
other.version_id = version_id;
header.version_type = VersionType::Object;
other.version_type = VersionType::Object;
header.signature = [1, 2, 3, 4];
other.signature = [5, 6, 7, 8];
// Should match because they have same version_id and type
assert!(header.matches_not_strict(&other));
// Test matches_not_strict with different signatures but same version_id
let mut other = FileMetaVersionHeader::default();
let version_id = Some(Uuid::new_v4());
header.version_id = version_id;
other.version_id = version_id;
header.version_type = VersionType::Object;
other.version_type = VersionType::Object;
header.signature = [1, 2, 3, 4];
other.signature = [5, 6, 7, 8];
// Should match because they have same version_id and type
assert!(header.matches_not_strict(&other));
// Test sorts_before with same mod_time but different version_id
let time = OffsetDateTime::from_unix_timestamp(1000).unwrap();
@@ -3286,12 +3278,12 @@ fn test_file_meta_add_version_edge_cases() {
fi2.mod_time = Some(OffsetDateTime::now_utc());
fm.add_version(fi2).unwrap();
// Should still have only one version, but updated
assert_eq!(fm.versions.len(), 1);
let (_, version) = fm.find_version(version_id).unwrap();
if let Some(obj) = version.object {
assert_eq!(obj.size, 2048); // Size gets updated when adding same version_id
}
// Should still have only one version, but updated
assert_eq!(fm.versions.len(), 1);
let (_, version) = fm.find_version(version_id).unwrap();
if let Some(obj) = version.object {
assert_eq!(obj.size, 2048); // Size gets updated when adding same version_id
}
}
#[test]
@@ -3324,12 +3316,11 @@ fn test_file_meta_shard_data_dir_count_edge_cases() {
fi.mod_time = Some(OffsetDateTime::now_utc());
fm.add_version(fi).unwrap();
let count = fm.shard_data_dir_count(&version_id, &data_dir);
assert_eq!(count, 0); // Should be 0 because user_data_dir() requires flag
let count = fm.shard_data_dir_count(&version_id, &data_dir);
assert_eq!(count, 0); // Should be 0 because user_data_dir() requires flag
// Test with different version_id
let other_version_id = Some(Uuid::new_v4());
let count = fm.shard_data_dir_count(&other_version_id, &data_dir);
assert_eq!(count, 1); // Should be 1 because the version has matching data_dir and user_data_dir() is true
let count = fm.shard_data_dir_count(&other_version_id, &data_dir);
assert_eq!(count, 1); // Should be 1 because the version has matching data_dir and user_data_dir() is true
}

View File

@@ -1220,7 +1220,7 @@ impl ECStore {
reader.read_exact(&mut chunk).await?;
// 每次从reader中读取一个part上传
// 每次从 reader 中读取一个 part 上传
let rd = Box::new(Cursor::new(chunk));
let mut data = PutObjReader::new(rd, part.size);

View File

@@ -75,7 +75,7 @@ fn is_err_ignored(err: &Error, ignored_errs: &[Box<dyn CheckErrorFn>]) -> bool {
// 减少错误数量并返回出现次数最多的错误
fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -> (usize, Option<Error>) {
let mut error_counts: HashMap<String, usize> = HashMap::new();
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存err位置
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存 err 位置
let nil = "nil".to_string();
for (i, operr) in errs.iter().enumerate() {
if let Some(err) = operr {
@@ -120,7 +120,7 @@ fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -
}
}
// 根据quorum验证错误数量
// 根据 quorum 验证错误数量
fn reduce_quorum_errs(
errs: &[Option<Error>],
ignored_errs: &[Box<dyn CheckErrorFn>],
@@ -135,8 +135,8 @@ fn reduce_quorum_errs(
}
}
// 根据读quorum验证错误数量
// 返回最大错误数量的下标或QuorumError
// 根据读 quorum 验证错误数量
// 返回最大错误数量的下标,或 QuorumError
pub fn reduce_read_quorum_errs(
errs: &[Option<Error>],
ignored_errs: &[Box<dyn CheckErrorFn>],
@@ -145,8 +145,8 @@ pub fn reduce_read_quorum_errs(
reduce_quorum_errs(errs, ignored_errs, read_quorum, QuorumError::Read)
}
// 根据写quorum验证错误数量
// 返回最大错误数量的下标或QuorumError
// 根据写 quorum 验证错误数量
// 返回最大错误数量的下标,或 QuorumError
#[tracing::instrument(level = "info", skip_all)]
pub fn reduce_write_quorum_errs(
errs: &[Option<Error>],

View File

@@ -858,13 +858,13 @@ impl ECStore {
let mut reader = rd.stream;
for (i, part) in object_info.parts.iter().enumerate() {
// 每次从reader中读取一个part上传
// 每次从 reader 中读取一个 part 上传
let mut chunk = vec![0u8; part.size];
reader.read_exact(&mut chunk).await?;
// 每次从reader中读取一个part上传
// 每次从 reader 中读取一个 part 上传
let rd = Box::new(Cursor::new(chunk));
let mut data = PutObjReader::new(rd, part.size);

View File

@@ -1452,7 +1452,7 @@ impl SetDisks {
}
};
// check endpoint是否一致
// check endpoint 是否一致
let _ = new_disk.set_disk_id(Some(fm.erasure.this)).await;

View File

@@ -287,7 +287,7 @@ struct DelObj {
#[async_trait::async_trait]
impl ObjectIO for Sets {
#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "debug", skip(self, object, h, opts))]
async fn get_object_reader(
&self,
bucket: &str,

View File

@@ -229,7 +229,7 @@ impl FileInfo {
}
}
// to_part_offset 取offset 所在的part index, 返回part index, offset
// to_part_offset 取 offset 所在的 part index, 返回 part index, offset
pub fn to_part_offset(&self, offset: usize) -> Result<(usize, usize)> {
if offset == 0 {
return Ok((0, 0));
@@ -356,7 +356,7 @@ impl ErasureInfo {
let last_shard_size = last_block_size.div_ceil(self.data_blocks);
num_shards * self.shard_size(self.block_size) + last_shard_size
// // 因为写入的时候ec需要补全所以最后一个长度应该也是一样的
// // 因为写入的时候 ec 需要补全,所以最后一个长度应该也是一样的
// if last_block_size != 0 {
// num_shards += 1
// }
@@ -1246,7 +1246,7 @@ mod tests {
assert_eq!(object_info.etag, Some("test-etag".to_string()));
}
// to_part_offset 取offset 所在的part index, 返回part index, offset
// to_part_offset 取 offset 所在的 part index, 返回 part index, offset
#[test]
fn test_file_info_to_part_offset() {
let mut file_info = FileInfo::new("test", 4, 2);

View File

@@ -192,7 +192,7 @@ fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
Ok(())
}
// load_format_erasure_all 读取所有foramt.json
// load_format_erasure_all 读取所有 foramt.json
pub async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
let mut datas = Vec::with_capacity(disks.len());

View File

@@ -2,9 +2,9 @@ use crc32fast::Hasher;
use siphasher::sip::SipHasher;
pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize {
// 你的密钥必须是16字节
// 你的密钥,必须是 16 字节
// 计算字符串的SipHash值
// 计算字符串的 SipHash
let result = SipHasher::new_with_key(id).hash(key.as_bytes());
result as usize % cardinality

View File

@@ -349,7 +349,7 @@ impl ObjectStore {
// user.credentials.access_key = name.to_owned();
// }
// // todo, 校验session token
// // todo, 校验 session token
// Ok(Some(user))
// }
@@ -932,7 +932,7 @@ impl Store for ObjectStore {
// Arc::new(tokio::sync::Mutex::new(CacheEntity::default())),
// );
// // 一次读取32个元素
// // 一次读取 32 个元素
// let iter = items
// .iter()
// .map(|item| item.trim_start_matches("config/iam/"))

View File

@@ -14,7 +14,7 @@ use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use bytes::Bytes;
// use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
use common::{
@@ -37,7 +37,7 @@ use ecstore::{
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
use grpc::make_server;
use http::{HeaderMap, Request as HttpRequest, Response};
// use http::{HeaderMap, Request as HttpRequest, Response};
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
@@ -61,9 +61,10 @@ use tokio::signal::unix::{signal, SignalKind};
use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
// use tracing::{instrument, Span};
use tracing::instrument;
// use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, warn};
use tracing::{instrument, Span};
const MI_B: usize = 1024 * 1024;
@@ -323,49 +324,49 @@ async fn run(opt: config::Opt) -> Result<()> {
let mut sigint_inner = sigint_inner;
let hybrid_service = TowerToHyperService::new(
tower::ServiceBuilder::new()
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::info_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
);
for (header_name, header_value) in request.headers() {
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length"
{
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
}
}
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
info!(
counter.rustfs_api_requests_total = 1_u64,
key_request_method = %request.method().to_string(),
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
//.layer(
// TraceLayer::new_for_http()
// .make_span_with(|request: &HttpRequest<_>| {
// let span = tracing::info_span!("http-request",
// status_code = tracing::field::Empty,
// method = %request.method(),
// uri = %request.uri(),
// version = ?request.version(),
// );
// for (header_name, header_value) in request.headers() {
// if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length"
// {
// span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
// }
// }
//
// span
// })
// .on_request(|request: &HttpRequest<_>, _span: &Span| {
// info!(
// counter.rustfs_api_requests_total = 1_u64,
// key_request_method = %request.method().to_string(),
// key_request_uri_path = %request.uri().path().to_owned(),
// "handle request api total",
// );
// debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
// })
// .on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
// _span.record("http response status_code", tracing::field::display(response.status()));
// debug!("http response generated in {:?}", latency)
// })
// .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
// info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
// debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
// })
// .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
// debug!("http stream closed after {:?}", stream_duration)
// })
// .on_failure(|_error, latency: Duration, _span: &Span| {
// info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
// debug!("http request failure error: {:?} in {:?}", _error, latency)
// }),
// )
.layer(CorsLayer::permissive())
.service(hybrid(s3_service, rpc_service)),
);

View File

@@ -73,15 +73,15 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
logical_plan: &LogicalPlan,
session: &SessionCtx,
) -> QueryResult<Arc<dyn ExecutionPlan>> {
// 将扩展的物理计划优化规则注入df 的 session state
// 将扩展的物理计划优化规则注入 df 的 session state
let new_state = SessionStateBuilder::new_from_existing(session.inner().clone())
.with_physical_optimizer_rules(self.ext_physical_optimizer_rules.clone())
.build();
// 通过扩展的物理计划转换规则构造df 的 Physical Planner
// 通过扩展的物理计划转换规则构造 df 的 Physical Planner
let planner = DFDefaultPhysicalPlanner::with_extension_planners(self.ext_physical_transform_rules.clone());
// 执行df的物理计划规划及优化
// 执行 df 的物理计划规划及优化
planner
.create_physical_plan(logical_plan, &new_state)
.await

View File

@@ -1,6 +1,6 @@
#!/bin/bash
# 脚本名称: scp_to_servers.sh
# 脚本名称scp_to_servers.sh
rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip
# 压缩./target/x86_64-unknown-linux-musl/release/rustfs
@@ -12,14 +12,14 @@ LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip"
REMOTE_PATH="~"
# 定义服务器列表数组
# 格式服务器IP 用户名 目标路径
# 格式:服务器 IP 用户名 目标路径
SERVER_LIST=(
"root@121.89.80.13"
)
# 遍历服务器列表
for SERVER in "${SERVER_LIST[@]}"; do
echo "正在将文件复制到服务器: $SERVER 目标路径: $REMOTE_PATH"
echo "正在将文件复制到服务器$SERVER 目标路径$REMOTE_PATH"
scp "$LOCAL_FILE" "${SERVER}:${REMOTE_PATH}"
if [ $? -eq 0 ]; then
echo "成功复制到 $SERVER"

View File

@@ -19,7 +19,8 @@ mkdir -p ./target/volume/test{0..4}
if [ -z "$RUST_LOG" ]; then
export RUST_BACKTRACE=1
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,iam=debug"
# export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,iam=debug"
export RUST_LOG="rustfs=info,ecstore=info,s3s=info,iam=info,rustfs-obs=info"
fi
# export RUSTFS_ERASURE_SET_DRIVE_COUNT=5
@@ -35,7 +36,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9002"
# HTTPS 证书目录
# export RUSTFS_TLS_PATH="./deploy/certs"
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一
export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"
# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值