From ca8f3998323a3109a29c14b3b471906dc6ab46c2 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 27 May 2025 13:56:19 +0800 Subject: [PATCH] format comment --- TODO.md | 32 ++--- appauth/src/token.rs | 16 +-- crates/config/src/constants/app.rs | 4 +- crates/event-notifier/src/error.rs | 10 +- crates/zip/src/lib.rs | 12 +- deploy/config/obs.example.toml | 2 +- ecstore/src/bucket/quota/mod.rs | 6 +- ecstore/src/bucket/target/mod.rs | 2 +- ecstore/src/chunk_stream.rs | 10 +- ecstore/src/disk/error.rs | 4 +- ecstore/src/disk/mod.rs | 2 +- ecstore/src/disk/os.rs | 2 +- ecstore/src/erasure.rs | 6 +- ecstore/src/file_meta.rs | 157 ++++++++++----------- ecstore/src/pools.rs | 2 +- ecstore/src/quorum.rs | 12 +- ecstore/src/rebalance.rs | 4 +- ecstore/src/set_disk.rs | 2 +- ecstore/src/sets.rs | 2 +- ecstore/src/store_api.rs | 6 +- ecstore/src/store_init.rs | 2 +- ecstore/src/utils/hash.rs | 4 +- iam/src/store/object.rs | 4 +- rustfs/src/main.rs | 95 +++++++------ s3select/query/src/sql/physical/planner.rs | 6 +- scripts/dev.sh | 6 +- scripts/run.sh | 5 +- 27 files changed, 204 insertions(+), 211 deletions(-) diff --git a/TODO.md b/TODO.md index 9c8e4653..6c98a121 100644 --- a/TODO.md +++ b/TODO.md @@ -2,18 +2,18 @@ ## 基础存储 -- [x] EC可用读写数量判断 Read/WriteQuorum -- [ ] 优化后台并发执行,可中断, 传引用? -- [x] 小文件存储到metafile, inlinedata -- [x] 完善bucketmeta +- [x] EC 可用读写数量判断 Read/WriteQuorum +- [ ] 优化后台并发执行,可中断,传引用? +- [x] 小文件存储到 metafile, inlinedata +- [x] 完善 bucketmeta - [x] 对象锁 -- [x] 边读写边hash,实现reader嵌套 -- [x] 远程rpc -- [x] 错误类型判断,程序中判断错误类型,如何统一错误 -- [x] 优化xlmeta, 自定义msg数据结构 -- [ ] 优化io.reader 参考 GetObjectNInfo 方便io copy 如果 异步写,再平衡 +- [x] 边读写边 hash,实现 reader 嵌套 +- [x] 远程 rpc +- [x] 错误类型判断,程序中判断错误类型,如何统一错误 +- [x] 优化 xlmeta, 自定义 msg 数据结构 +- [ ] 优化 io.reader 参考 GetObjectNInfo 方便 io copy 如果 异步写,再平衡 - [ ] 代码优化 使用范型? -- [ ] 抽象出metafile存储 +- [ ] 抽象出 metafile 存储 ## 基础功能 @@ -43,26 +43,26 @@ ## 扩展功能 - [ ] 用户管理 -- [ ] Policy管理 +- [ ] Policy 管理 - [ ] AK/SK分配管理 -- [ ] data scanner统计和对象修复 +- [ ] data scanner 统计和对象修复 - [ ] 桶配额 - [ ] 桶只读 - [ ] 桶复制 - [ ] 桶事件通知 - [ ] 桶公开、桶私有 - [ ] 对象生命周期管理 -- [ ] prometheus对接 +- [ ] prometheus 对接 - [ ] 日志收集和日志外发 - [ ] 对象压缩 - [ ] STS -- [ ] 分层(阿里云、腾讯云、S3远程对接) +- [ ] 分层(阿里云、腾讯云、S3 远程对接) ## 性能优化 - [ ] bitrot impl AsyncRead/AsyncWrite - [ ] erasure 并发读写 -- [x] 完善删除逻辑, 并发处理,先移动到回收站, +- [x] 完善删除逻辑,并发处理,先移动到回收站, - [ ] 空间不足时清空回收站 -- [ ] list_object 使用reader传输 \ No newline at end of file +- [ ] list_object 使用 reader 传输 \ No newline at end of file diff --git a/appauth/src/token.rs b/appauth/src/token.rs index f18ae57f..4276e45d 100644 --- a/appauth/src/token.rs +++ b/appauth/src/token.rs @@ -8,14 +8,14 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct Token { - pub name: String, // 应用ID - pub expired: u64, // 到期时间 (UNIX时间戳) + pub name: String, // 应用 ID + pub expired: u64, // 到期时间 (UNIX 时间戳) } -// 公钥生成Token -// [token] Token对象 +// 公钥生成 Token +// [token] Token 对象 // [key] 公钥字符串 -// 返回base64处理的加密字符串 +// 返回 base64 处理的加密字符串 pub fn gencode(token: &Token, key: &str) -> Result { let data = serde_json::to_vec(token)?; let public_key = RsaPublicKey::from_public_key_pem(key)?; @@ -23,10 +23,10 @@ pub fn gencode(token: &Token, key: &str) -> Result { Ok(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&encrypted_data)) } -// 私钥解析Token -// [token] base64处理的加密字符串 +// 私钥解析 Token +// [token] base64 处理的加密字符串 // [key] 私钥字符串 -// 返回Token对象 +// 返回 Token 对象 pub fn parse(token: &str, key: &str) -> Result { let encrypted_data = base64_simd::URL_SAFE_NO_PAD.decode_to_vec(token.as_bytes())?; let private_key = RsaPrivateKey::from_pkcs8_pem(key)?; diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index 008647e9..17e3598a 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -15,8 +15,8 @@ pub const VERSION: &str = "0.0.1"; pub const DEFAULT_LOG_LEVEL: &str = "info"; /// Default configuration use stdout -/// Default value: true -pub const USE_STDOUT: bool = true; +/// Default value: false +pub const USE_STDOUT: bool = false; /// Default configuration sample ratio /// Default value: 1.0 diff --git a/crates/event-notifier/src/error.rs b/crates/event-notifier/src/error.rs index 6434764f..cd1ead24 100644 --- a/crates/event-notifier/src/error.rs +++ b/crates/event-notifier/src/error.rs @@ -286,10 +286,10 @@ mod tests { use std::mem; let size = mem::size_of::(); - // 错误类型应该相对紧凑,考虑到包含多种错误类型,96字节是合理的 + // 错误类型应该相对紧凑,考虑到包含多种错误类型,96 字节是合理的 assert!(size <= 128, "Error size should be reasonable, got {} bytes", size); - // 测试Option的大小 + // 测试 Option的大小 let option_size = mem::size_of::>(); assert!(option_size <= 136, "Option should be efficient, got {} bytes", option_size); } @@ -321,7 +321,7 @@ mod tests { _ => panic!("Expected Custom error variant"), } - // 测试包含Unicode字符的消息 + // 测试包含 Unicode 字符的消息 let unicode_error = Error::custom("🚀 Unicode test 测试 🎉"); match unicode_error { Error::Custom(msg) => assert!(msg.contains('🚀')), @@ -405,11 +405,11 @@ mod tests { let display_str = error.to_string(); let debug_str = format!("{:?}", error); - // Display和Debug都不应该为空 + // Display 和 Debug 都不应该为空 assert!(!display_str.is_empty()); assert!(!debug_str.is_empty()); - // Debug输出通常包含更多信息,但不是绝对的 + // Debug 输出通常包含更多信息,但不是绝对的 // 这里我们只验证两者都有内容即可 assert!(debug_str.len() > 0); assert!(display_str.len() > 0); diff --git a/crates/zip/src/lib.rs b/crates/zip/src/lib.rs index 163e07b9..379a9fe5 100644 --- a/crates/zip/src/lib.rs +++ b/crates/zip/src/lib.rs @@ -716,7 +716,7 @@ mod tests { #[test] fn test_compression_format_clone_and_copy() { - // 测试CompressionFormat是否可以被复制 + // 测试 CompressionFormat 是否可以被复制 let format = CompressionFormat::Gzip; let format_copy = format; @@ -729,7 +729,7 @@ mod tests { #[test] fn test_compression_format_match_exhaustiveness() { - // 测试match语句的完整性 + // 测试 match 语句的完整性 fn handle_format(format: CompressionFormat) -> &'static str { match format { CompressionFormat::Gzip => "gzip", @@ -906,7 +906,7 @@ mod tests { #[test] fn test_zip_entry_creation() { - // 测试ZIP条目信息创建 + // 测试 ZIP 条目信息创建 let entry = ZipEntry { name: "test.txt".to_string(), size: 1024, @@ -934,7 +934,7 @@ mod tests { ]; for level in levels { - // 验证每个级别都有对应的Debug实现 + // 验证每个级别都有对应的 Debug 实现 let _debug_str = format!("{:?}", level); } } @@ -960,7 +960,7 @@ mod tests { // 验证支持状态检查 let _supported = format.is_supported(); - // 验证Debug实现 + // 验证 Debug 实现 let _debug = format!("{:?}", format); } } @@ -991,7 +991,7 @@ mod tests { // .await // { // Ok(_) => println!("解压成功!"), -// Err(e) => println!("解压失败: {}", e), +// Err(e) => println!("解压失败:{}", e), // } // Ok(()) diff --git a/deploy/config/obs.example.toml b/deploy/config/obs.example.toml index e6c89833..0733b229 100644 --- a/deploy/config/obs.example.toml +++ b/deploy/config/obs.example.toml @@ -5,7 +5,7 @@ sample_ratio = 2.0 meter_interval = 30 service_name = "rustfs" service_version = "0.0.1" -environment = "develop" +environment = "production" # Default is "production" if not specified logger_level = "info" local_logging_enabled = true diff --git a/ecstore/src/bucket/quota/mod.rs b/ecstore/src/bucket/quota/mod.rs index c3f38e84..39c7ebd0 100644 --- a/ecstore/src/bucket/quota/mod.rs +++ b/ecstore/src/bucket/quota/mod.rs @@ -2,16 +2,16 @@ use common::error::Result; use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; -// 定义QuotaType枚举类型 +// 定义 QuotaType 枚举类型 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum QuotaType { Hard, } -// 定义BucketQuota结构体 +// 定义 BucketQuota 结构体 #[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketQuota { - quota: Option, // 使用Option来表示可能不存在的字段 + quota: Option, // 使用 Option 来表示可能不存在的字段 size: u64, diff --git a/ecstore/src/bucket/target/mod.rs b/ecstore/src/bucket/target/mod.rs index cb8797f2..d3305517 100644 --- a/ecstore/src/bucket/target/mod.rs +++ b/ecstore/src/bucket/target/mod.rs @@ -25,7 +25,7 @@ pub struct LatencyStat { max: Duration, // 最大延迟 } -// 定义BucketTarget结构体 +// 定义 BucketTarget 结构体 #[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketTarget { source_bucket: String, diff --git a/ecstore/src/chunk_stream.rs b/ecstore/src/chunk_stream.rs index 886ee165..c08aa2f7 100644 --- a/ecstore/src/chunk_stream.rs +++ b/ecstore/src/chunk_stream.rs @@ -64,7 +64,7 @@ // // content_length, // // ); -// // 填充0? +// // 填充 0? // if !need_padding { // y.yield_ok(prev_bytes).await; // break; @@ -129,7 +129,7 @@ // combined.extend_from_slice(&data); // // debug!( -// // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", +// // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes 剩余:{}", // // need_size, // // combined.len(), // // bytes.len(), @@ -142,7 +142,7 @@ // combined.extend_from_slice(&bytes); // // debug!( -// // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", +// // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes 剩余:{},直接返回", // // need_size, // // combined.len(), // // bytes.len(), @@ -152,14 +152,14 @@ // } // } -// // 取到的数据比需要的块大,从bytes中截取需要的块大小 +// // 取到的数据比需要的块大,从 bytes 中截取需要的块大小 // if data_size <= bytes.len() { // let n = bytes.len() / data_size; // for _ in 0..n { // let data = bytes.split_to(data_size); -// // println!("bytes_buffer.push: {}, 剩余:{}", data.len(), bytes.len()); +// // println!("bytes_buffer.push: {},剩余:{}", data.len(), bytes.len()); // bytes_buffer.push(data); // } diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index 0d422b10..febf67d7 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -341,7 +341,7 @@ pub fn os_err_to_file_err(e: io::Error) -> Error { // io::ErrorKind::UnexpectedEof => todo!(), // io::ErrorKind::OutOfMemory => todo!(), // io::ErrorKind::Other => todo!(), - // TODO: 把不支持的king用字符串处理 + // TODO: 把不支持的 king 用字符串处理 _ => Error::new(e), } } @@ -355,7 +355,7 @@ pub struct FileAccessDeniedWithContext { impl std::fmt::Display for FileAccessDeniedWithContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "访问文件 '{}' 被拒绝: {}", self.path.display(), self.source) + write!(f, "访问文件 '{}' 被拒绝:{}", self.path.display(), self.source) } } diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index cda289c2..8cbef777 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -237,7 +237,7 @@ impl DiskAPI for Disk { } } - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, fi))] async fn rename_data( &self, src_volume: &str, diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index 41056a45..c04b5903 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -199,7 +199,7 @@ pub async fn os_mkdir_all(dir_path: impl AsRef, base_dir: impl AsRef } if let Some(parent) = dir_path.as_ref().parent() { - // 不支持递归,直接create_dir_all了 + // 不支持递归,直接 create_dir_all 了 if let Err(e) = utils::fs::make_dir_all(&parent).await { if os_is_exist(&e) { return Ok(()); diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 702d393e..7329e77c 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -302,7 +302,7 @@ impl Erasure { // ec encode, 结果会写进 data_buffer let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect(); - // partiy 数量大于0 才ec + // partiy 数量大于 0 才 ec if self.parity_shards > 0 { self.encoder.as_ref().unwrap().encode(data_slices)?; } @@ -348,7 +348,7 @@ impl Erasure { let last_shard_size = last_block_size.div_ceil(self.data_shards); num_shards * self.shard_size(self.block_size) + last_shard_size - // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 + // // 因为写入的时候 ec 需要补全,所以最后一个长度应该也是一样的 // if last_block_size != 0 { // num_shards += 1 // } @@ -446,7 +446,7 @@ pub struct ShardReader { parity_block_count: usize, shard_size: usize, // 每个分片的块大小 一次读取一块 shard_file_size: usize, // 分片文件总长度 - offset: usize, // 在分片中的offset + offset: usize, // 在分片中的 offset } impl ShardReader { diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index cf9052b4..336b11e3 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -71,9 +71,9 @@ impl FileMeta { Ok(xl) } - // check_xl2_v1 读xl文件头,返回后续内容,版本信息 + // check_xl2_v1 读 xl 文件头,返回后续内容,版本信息 // checkXL2V1 - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip_all)] pub fn check_xl2_v1(buf: &[u8]) -> Result<(&[u8], u16, u16)> { if buf.len() < 8 { return Err(Error::msg("xl file header not exists")); @@ -92,11 +92,11 @@ impl FileMeta { Ok((&buf[8..], major, minor)) } - // 固定u32 + // 固定 u32 pub fn read_bytes_header(buf: &[u8]) -> Result<(u32, &[u8])> { let (mut size_buf, _) = buf.split_at(5); - // 取meta数据,buf = crc + data + // 取 meta 数据,buf = crc + data let bin_len = rmp::decode::read_bin_len(&mut size_buf)?; Ok((bin_len, &buf[5..])) @@ -110,7 +110,7 @@ impl FileMeta { let (mut size_buf, buf) = buf.split_at(5); - // 取meta数据,buf = crc + data + // 取 meta 数据,buf = crc + data let bin_len = rmp::decode::read_bin_len(&mut size_buf)?; let (meta, buf) = buf.split_at(bin_len as usize); @@ -130,7 +130,7 @@ impl FileMeta { self.data.validate()?; } - // 解析meta + // 解析 meta if !meta.is_empty() { let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta)?; @@ -168,8 +168,8 @@ impl FileMeta { Ok(i) } - // decode_xl_headers 解析 meta 头,返回 (versions数量,xl_header_version, xl_meta_version, 已读数据长度) - #[tracing::instrument] + // decode_xl_headers 解析 meta 头,返回 (versions 数量,xl_header_version, xl_meta_version, 已读数据长度) + #[tracing::instrument(level = "debug", skip_all)] fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> { let mut cur = Cursor::new(buf); @@ -280,7 +280,7 @@ impl FileMeta { rmp::encode::write_bin(&mut wr, &ver.meta)?; } - // 更新bin长度 + // 更新 bin 长度 let data_len = wr.len() - offset; byteorder::BigEndian::write_u32(&mut wr[offset - 4..offset], data_len as u32); @@ -368,7 +368,7 @@ impl FileMeta { Err(Error::new(DiskError::FileVersionNotFound)) } - // shard_data_dir_count 查询 vid下data_dir的数量 + // shard_data_dir_count 查询 vid 下 data_dir 的数量 #[tracing::instrument(level = "debug", skip_all)] pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { self.versions @@ -494,7 +494,7 @@ impl FileMeta { Err(Error::msg("add_version failed")) } - // delete_version 删除版本,返回data_dir + // delete_version 删除版本,返回 data_dir pub fn delete_version(&mut self, fi: &FileInfo) -> Result> { let mut ventry = FileMetaVersion::default(); if fi.deleted { @@ -710,7 +710,7 @@ impl FileMetaVersion { } } - // decode_data_dir_from_meta 从 meta中读取data_dir TODO: 直接从meta buf中只解析出data_dir, msg.skip + // decode_data_dir_from_meta 从 meta 中读取 data_dir TODO: 直接从 meta buf 中只解析出 data_dir, msg.skip pub fn decode_data_dir_from_meta(buf: &[u8]) -> Result> { let mut ver = Self::default(); ver.unmarshal_msg(buf)?; @@ -733,7 +733,7 @@ impl FileMetaVersion { // println!("unmarshal_msg fields name len() {}", &str_len); - // !!! Vec::with_capacity(str_len) 失败,vec!正常 + // !!!Vec::with_capacity(str_len) 失败,vec! 正常 let mut field_buff = vec![0u8; str_len as usize]; cur.read_exact(&mut field_buff)?; @@ -1143,7 +1143,7 @@ impl From for FileMetaVersionHeader { } #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] -// 因为自定义message_pack,所以一定要保证字段顺序 +// 因为自定义 message_pack,所以一定要保证字段顺序 pub struct MetaObject { pub version_id: Option, // Version ID pub data_dir: Option, // Data dir ID @@ -1182,7 +1182,7 @@ impl MetaObject { // println!("unmarshal_msg fields name len() {}", &str_len); - // !!! Vec::with_capacity(str_len) 失败,vec!正常 + // !!!Vec::with_capacity(str_len) 失败,vec! 正常 let mut field_buff = vec![0u8; str_len as usize]; cur.read_exact(&mut field_buff)?; @@ -1413,7 +1413,7 @@ impl MetaObject { Ok(cur.position()) } - // marshal_msg 自定义 messagepack 命名与go一致 + // marshal_msg 自定义 messagepack 命名与 go 一致 pub fn marshal_msg(&self) -> Result> { let mut len: u32 = 18; let mut mask: u32 = 0; @@ -1682,7 +1682,7 @@ impl MetaDeleteMarker { let str_len = rmp::decode::read_str_len(&mut cur)?; - // !!! Vec::with_capacity(str_len) 失败,vec!正常 + // !!!Vec::with_capacity(str_len) 失败,vec! 正常 let mut field_buff = vec![0u8; str_len as usize]; cur.read_exact(&mut field_buff)?; @@ -2175,7 +2175,6 @@ pub async fn read_xl_meta_no_data(reader: &mut R, size: us } #[cfg(test)] mod test { - use super::*; #[test] @@ -2242,7 +2241,7 @@ mod test { #[test] #[tracing::instrument] fn test_marshal_metaversion() { - let mut fi = FileInfo::new("tset", 3, 2); + let mut fi = FileInfo::new("test", 3, 2); fi.version_id = Some(Uuid::new_v4()); fi.mod_time = Some(OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap()); let mut obj = FileMetaVersion::from(fi); @@ -2257,7 +2256,7 @@ mod test { // println!("obj2 {:?}", &obj2); - // 时间截不一致- - + // 时间截不一致 - - assert_eq!(obj, obj2); assert_eq!(obj.get_version_id(), obj2.get_version_id()); assert_eq!(obj.write_version, obj2.write_version); @@ -2276,7 +2275,7 @@ mod test { let mut obj2 = FileMetaVersionHeader::default(); obj2.unmarshal_msg(&encoded).unwrap(); - // 时间截不一致- - + // 时间截不一致 - - assert_eq!(obj, obj2); assert_eq!(obj.version_id, obj2.version_id); assert_eq!(obj.version_id, vid); @@ -2520,7 +2519,7 @@ mod test { fi3.mod_time = Some(time3); fm.add_version(fi3).unwrap(); - // Sort first to ensure latest is at the front + // Sort first to ensure latest is at the front fm.sort_by_mod_time(); // Should return the first version's mod time (lastest_mod_time returns first version's time) @@ -2690,7 +2689,7 @@ mod test { assert!(result.is_err()); } - #[test] + #[test] fn test_is_latest_delete_marker() { // Test the is_latest_delete_marker function with simple data // Since the function is complex and requires specific XL format, @@ -2798,9 +2797,7 @@ async fn test_file_info_from_raw() { let encoded = fm.marshal_msg().unwrap(); - let raw_info = RawFileInfo { - buf: encoded, - }; + let raw_info = RawFileInfo { buf: encoded }; let result = file_info_from_raw(raw_info, "test-bucket", "test-object", false).await; assert!(result.is_ok()); @@ -2833,26 +2830,26 @@ fn test_file_meta_load_function() { assert!(result.is_err()); } - #[test] - fn test_file_meta_read_bytes_header() { - // Test read_bytes_header function - it expects the first 5 bytes to be msgpack bin length - // Create a buffer with proper msgpack bin format for a 9-byte binary - let mut buf = vec![0xc4, 0x09]; // msgpack bin8 format for 9 bytes - buf.extend_from_slice(b"test data"); // 9 bytes of data - buf.extend_from_slice(b"extra"); // additional data +#[test] +fn test_file_meta_read_bytes_header() { + // Test read_bytes_header function - it expects the first 5 bytes to be msgpack bin length + // Create a buffer with proper msgpack bin format for a 9-byte binary + let mut buf = vec![0xc4, 0x09]; // msgpack bin8 format for 9 bytes + buf.extend_from_slice(b"test data"); // 9 bytes of data + buf.extend_from_slice(b"extra"); // additional data - let result = FileMeta::read_bytes_header(&buf); - assert!(result.is_ok()); - let (length, remaining) = result.unwrap(); - assert_eq!(length, 9); // "test data" length - // remaining should be everything after the 5-byte header (but we only have 2-byte header) - assert_eq!(remaining.len(), buf.len() - 5); + let result = FileMeta::read_bytes_header(&buf); + assert!(result.is_ok()); + let (length, remaining) = result.unwrap(); + assert_eq!(length, 9); // "test data" length + // remaining should be everything after the 5-byte header (but we only have 2-byte header) + assert_eq!(remaining.len(), buf.len() - 5); - // Test with buffer too small - let small_buf = vec![0u8; 2]; - let result = FileMeta::read_bytes_header(&small_buf); - assert!(result.is_err()); - } + // Test with buffer too small + let small_buf = vec![0u8; 2]; + let result = FileMeta::read_bytes_header(&small_buf); + assert!(result.is_err()); +} #[test] fn test_file_meta_get_set_idx() { @@ -3080,11 +3077,11 @@ fn test_file_meta_version_header_ordering() { // Test partial_cmp assert!(header1.partial_cmp(&header2).is_some()); - // Test cmp - header2 should be greater (newer) - use std::cmp::Ordering; - assert_eq!(header1.cmp(&header2), Ordering::Less); // header1 has earlier time - assert_eq!(header2.cmp(&header1), Ordering::Greater); // header2 has later time - assert_eq!(header1.cmp(&header1), Ordering::Equal); + // Test cmp - header2 should be greater (newer) + use std::cmp::Ordering; + assert_eq!(header1.cmp(&header2), Ordering::Less); // header1 has earlier time + assert_eq!(header2.cmp(&header1), Ordering::Greater); // header2 has later time + assert_eq!(header1.cmp(&header1), Ordering::Equal); } #[test] @@ -3110,10 +3107,7 @@ fn test_merge_file_meta_versions_edge_cases() { version2.header.version_id = Some(Uuid::new_v4()); version2.header.mod_time = Some(OffsetDateTime::from_unix_timestamp(2000).unwrap()); - let versions = vec![ - vec![version1.clone()], - vec![version2.clone()], - ]; + let versions = vec![vec![version1.clone()], vec![version2.clone()]]; let _merged_strict = merge_file_meta_versions(1, true, 10, &versions); let merged_non_strict = merge_file_meta_versions(1, false, 10, &versions); @@ -3191,9 +3185,7 @@ async fn test_get_file_info_edge_cases() { #[tokio::test] async fn test_file_info_from_raw_edge_cases() { // Test with empty buffer - let empty_raw = RawFileInfo { - buf: vec![], - }; + let empty_raw = RawFileInfo { buf: vec![] }; let result = file_info_from_raw(empty_raw, "bucket", "object", false).await; assert!(result.is_err()); @@ -3227,12 +3219,12 @@ fn test_meta_object_edge_cases() { obj.data_dir = None; assert!(obj.use_data_dir()); - // Test use_inlinedata (always returns false in current implementation) - obj.size = 128 * 1024; // 128KB threshold - assert!(!obj.use_inlinedata()); // Should be false + // Test use_inlinedata (always returns false in current implementation) + obj.size = 128 * 1024; // 128KB threshold + assert!(!obj.use_inlinedata()); // Should be false - obj.size = 128 * 1024 - 1; - assert!(!obj.use_inlinedata()); // Should also be false (always false) + obj.size = 128 * 1024 - 1; + assert!(!obj.use_inlinedata()); // Should also be false (always false) } #[test] @@ -3244,17 +3236,17 @@ fn test_file_meta_version_header_edge_cases() { header.ec_m = 0; assert!(!header.has_ec()); - // Test matches_not_strict with different signatures but same version_id - let mut other = FileMetaVersionHeader::default(); - let version_id = Some(Uuid::new_v4()); - header.version_id = version_id; - other.version_id = version_id; - header.version_type = VersionType::Object; - other.version_type = VersionType::Object; - header.signature = [1, 2, 3, 4]; - other.signature = [5, 6, 7, 8]; - // Should match because they have same version_id and type - assert!(header.matches_not_strict(&other)); + // Test matches_not_strict with different signatures but same version_id + let mut other = FileMetaVersionHeader::default(); + let version_id = Some(Uuid::new_v4()); + header.version_id = version_id; + other.version_id = version_id; + header.version_type = VersionType::Object; + other.version_type = VersionType::Object; + header.signature = [1, 2, 3, 4]; + other.signature = [5, 6, 7, 8]; + // Should match because they have same version_id and type + assert!(header.matches_not_strict(&other)); // Test sorts_before with same mod_time but different version_id let time = OffsetDateTime::from_unix_timestamp(1000).unwrap(); @@ -3286,12 +3278,12 @@ fn test_file_meta_add_version_edge_cases() { fi2.mod_time = Some(OffsetDateTime::now_utc()); fm.add_version(fi2).unwrap(); - // Should still have only one version, but updated - assert_eq!(fm.versions.len(), 1); - let (_, version) = fm.find_version(version_id).unwrap(); - if let Some(obj) = version.object { - assert_eq!(obj.size, 2048); // Size gets updated when adding same version_id - } + // Should still have only one version, but updated + assert_eq!(fm.versions.len(), 1); + let (_, version) = fm.find_version(version_id).unwrap(); + if let Some(obj) = version.object { + assert_eq!(obj.size, 2048); // Size gets updated when adding same version_id + } } #[test] @@ -3324,12 +3316,11 @@ fn test_file_meta_shard_data_dir_count_edge_cases() { fi.mod_time = Some(OffsetDateTime::now_utc()); fm.add_version(fi).unwrap(); - let count = fm.shard_data_dir_count(&version_id, &data_dir); - assert_eq!(count, 0); // Should be 0 because user_data_dir() requires flag + let count = fm.shard_data_dir_count(&version_id, &data_dir); + assert_eq!(count, 0); // Should be 0 because user_data_dir() requires flag // Test with different version_id let other_version_id = Some(Uuid::new_v4()); - let count = fm.shard_data_dir_count(&other_version_id, &data_dir); - assert_eq!(count, 1); // Should be 1 because the version has matching data_dir and user_data_dir() is true + let count = fm.shard_data_dir_count(&other_version_id, &data_dir); + assert_eq!(count, 1); // Should be 1 because the version has matching data_dir and user_data_dir() is true } - diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index 8a076100..97615a6a 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -1220,7 +1220,7 @@ impl ECStore { reader.read_exact(&mut chunk).await?; - // 每次从reader中读取一个part上传 + // 每次从 reader 中读取一个 part 上传 let rd = Box::new(Cursor::new(chunk)); let mut data = PutObjReader::new(rd, part.size); diff --git a/ecstore/src/quorum.rs b/ecstore/src/quorum.rs index d38177ca..40ad7a34 100644 --- a/ecstore/src/quorum.rs +++ b/ecstore/src/quorum.rs @@ -75,7 +75,7 @@ fn is_err_ignored(err: &Error, ignored_errs: &[Box]) -> bool { // 减少错误数量并返回出现次数最多的错误 fn reduce_errs(errs: &[Option], ignored_errs: &[Box]) -> (usize, Option) { let mut error_counts: HashMap = HashMap::new(); - let mut error_map: HashMap = HashMap::new(); // 存err位置 + let mut error_map: HashMap = HashMap::new(); // 存 err 位置 let nil = "nil".to_string(); for (i, operr) in errs.iter().enumerate() { if let Some(err) = operr { @@ -120,7 +120,7 @@ fn reduce_errs(errs: &[Option], ignored_errs: &[Box]) - } } -// 根据quorum验证错误数量 +// 根据 quorum 验证错误数量 fn reduce_quorum_errs( errs: &[Option], ignored_errs: &[Box], @@ -135,8 +135,8 @@ fn reduce_quorum_errs( } } -// 根据读quorum验证错误数量 -// 返回最大错误数量的下标,或QuorumError +// 根据读 quorum 验证错误数量 +// 返回最大错误数量的下标,或 QuorumError pub fn reduce_read_quorum_errs( errs: &[Option], ignored_errs: &[Box], @@ -145,8 +145,8 @@ pub fn reduce_read_quorum_errs( reduce_quorum_errs(errs, ignored_errs, read_quorum, QuorumError::Read) } -// 根据写quorum验证错误数量 -// 返回最大错误数量的下标,或QuorumError +// 根据写 quorum 验证错误数量 +// 返回最大错误数量的下标,或 QuorumError #[tracing::instrument(level = "info", skip_all)] pub fn reduce_write_quorum_errs( errs: &[Option], diff --git a/ecstore/src/rebalance.rs b/ecstore/src/rebalance.rs index 007cf7a4..1f95042f 100644 --- a/ecstore/src/rebalance.rs +++ b/ecstore/src/rebalance.rs @@ -858,13 +858,13 @@ impl ECStore { let mut reader = rd.stream; for (i, part) in object_info.parts.iter().enumerate() { - // 每次从reader中读取一个part上传 + // 每次从 reader 中读取一个 part 上传 let mut chunk = vec![0u8; part.size]; reader.read_exact(&mut chunk).await?; - // 每次从reader中读取一个part上传 + // 每次从 reader 中读取一个 part 上传 let rd = Box::new(Cursor::new(chunk)); let mut data = PutObjReader::new(rd, part.size); diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 68d78ee2..9fadd7b0 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1452,7 +1452,7 @@ impl SetDisks { } }; - // check endpoint是否一致 + // check endpoint 是否一致 let _ = new_disk.set_disk_id(Some(fm.erasure.this)).await; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 7a4cf93a..22ed4471 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -287,7 +287,7 @@ struct DelObj { #[async_trait::async_trait] impl ObjectIO for Sets { - #[tracing::instrument(level = "debug", skip(self))] + #[tracing::instrument(level = "debug", skip(self, object, h, opts))] async fn get_object_reader( &self, bucket: &str, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 8c8f4f00..e61ff578 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -229,7 +229,7 @@ impl FileInfo { } } - // to_part_offset 取offset 所在的part index, 返回part index, offset + // to_part_offset 取 offset 所在的 part index, 返回 part index, offset pub fn to_part_offset(&self, offset: usize) -> Result<(usize, usize)> { if offset == 0 { return Ok((0, 0)); @@ -356,7 +356,7 @@ impl ErasureInfo { let last_shard_size = last_block_size.div_ceil(self.data_blocks); num_shards * self.shard_size(self.block_size) + last_shard_size - // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 + // // 因为写入的时候 ec 需要补全,所以最后一个长度应该也是一样的 // if last_block_size != 0 { // num_shards += 1 // } @@ -1246,7 +1246,7 @@ mod tests { assert_eq!(object_info.etag, Some("test-etag".to_string())); } - // to_part_offset 取offset 所在的part index, 返回part index, offset + // to_part_offset 取 offset 所在的 part index, 返回 part index, offset #[test] fn test_file_info_to_part_offset() { let mut file_info = FileInfo::new("test", 4, 2); diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index 2e44db00..b27fa462 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -192,7 +192,7 @@ fn check_format_erasure_value(format: &FormatV3) -> Result<()> { Ok(()) } -// load_format_erasure_all 读取所有foramt.json +// load_format_erasure_all 读取所有 foramt.json pub async fn load_format_erasure_all(disks: &[Option], heal: bool) -> (Vec>, Vec>) { let mut futures = Vec::with_capacity(disks.len()); let mut datas = Vec::with_capacity(disks.len()); diff --git a/ecstore/src/utils/hash.rs b/ecstore/src/utils/hash.rs index 1aae48e1..7f99478d 100644 --- a/ecstore/src/utils/hash.rs +++ b/ecstore/src/utils/hash.rs @@ -2,9 +2,9 @@ use crc32fast::Hasher; use siphasher::sip::SipHasher; pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize { - // 你的密钥,必须是16字节 + // 你的密钥,必须是 16 字节 - // 计算字符串的SipHash值 + // 计算字符串的 SipHash 值 let result = SipHasher::new_with_key(id).hash(key.as_bytes()); result as usize % cardinality diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index e956f10c..b0cd63a1 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -349,7 +349,7 @@ impl ObjectStore { // user.credentials.access_key = name.to_owned(); // } - // // todo, 校验session token + // // todo, 校验 session token // Ok(Some(user)) // } @@ -932,7 +932,7 @@ impl Store for ObjectStore { // Arc::new(tokio::sync::Mutex::new(CacheEntity::default())), // ); - // // 一次读取32个元素 + // // 一次读取 32 个元素 // let iter = items // .iter() // .map(|item| item.trim_start_matches("config/iam/")) diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 75316f38..ca7070f1 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -14,7 +14,7 @@ use crate::auth::IAMAuth; use crate::console::{init_console_cfg, CONSOLE_CONFIG}; // Ensure the correct path for parse_license is imported use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT}; -use bytes::Bytes; +// use bytes::Bytes; use chrono::Datelike; use clap::Parser; use common::{ @@ -37,7 +37,7 @@ use ecstore::{ }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use grpc::make_server; -use http::{HeaderMap, Request as HttpRequest, Response}; +// use http::{HeaderMap, Request as HttpRequest, Response}; use hyper_util::server::graceful::GracefulShutdown; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -61,9 +61,10 @@ use tokio::signal::unix::{signal, SignalKind}; use tokio_rustls::TlsAcceptor; use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; -use tower_http::trace::TraceLayer; +// use tracing::{instrument, Span}; +use tracing::instrument; +// use tower_http::trace::TraceLayer; use tracing::{debug, error, info, warn}; -use tracing::{instrument, Span}; const MI_B: usize = 1024 * 1024; @@ -323,49 +324,49 @@ async fn run(opt: config::Opt) -> Result<()> { let mut sigint_inner = sigint_inner; let hybrid_service = TowerToHyperService::new( tower::ServiceBuilder::new() - .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &HttpRequest<_>| { - let span = tracing::info_span!("http-request", - status_code = tracing::field::Empty, - method = %request.method(), - uri = %request.uri(), - version = ?request.version(), - ); - for (header_name, header_value) in request.headers() { - if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" - { - span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); - } - } - - span - }) - .on_request(|request: &HttpRequest<_>, _span: &Span| { - info!( - counter.rustfs_api_requests_total = 1_u64, - key_request_method = %request.method().to_string(), - key_request_uri_path = %request.uri().path().to_owned(), - "handle request api total", - ); - debug!("http started method: {}, url path: {}", request.method(), request.uri().path()) - }) - .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { - _span.record("http response status_code", tracing::field::display(response.status())); - debug!("http response generated in {:?}", latency) - }) - .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { - info!(histogram.request.body.len = chunk.len(), "histogram request body length",); - debug!("http body sending {} bytes in {:?}", chunk.len(), latency) - }) - .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { - debug!("http stream closed after {:?}", stream_duration) - }) - .on_failure(|_error, latency: Duration, _span: &Span| { - info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); - debug!("http request failure error: {:?} in {:?}", _error, latency) - }), - ) + //.layer( + // TraceLayer::new_for_http() + // .make_span_with(|request: &HttpRequest<_>| { + // let span = tracing::info_span!("http-request", + // status_code = tracing::field::Empty, + // method = %request.method(), + // uri = %request.uri(), + // version = ?request.version(), + // ); + // for (header_name, header_value) in request.headers() { + // if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" + // { + // span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); + // } + // } + // + // span + // }) + // .on_request(|request: &HttpRequest<_>, _span: &Span| { + // info!( + // counter.rustfs_api_requests_total = 1_u64, + // key_request_method = %request.method().to_string(), + // key_request_uri_path = %request.uri().path().to_owned(), + // "handle request api total", + // ); + // debug!("http started method: {}, url path: {}", request.method(), request.uri().path()) + // }) + // .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { + // _span.record("http response status_code", tracing::field::display(response.status())); + // debug!("http response generated in {:?}", latency) + // }) + // .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { + // info!(histogram.request.body.len = chunk.len(), "histogram request body length",); + // debug!("http body sending {} bytes in {:?}", chunk.len(), latency) + // }) + // .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { + // debug!("http stream closed after {:?}", stream_duration) + // }) + // .on_failure(|_error, latency: Duration, _span: &Span| { + // info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); + // debug!("http request failure error: {:?} in {:?}", _error, latency) + // }), + // ) .layer(CorsLayer::permissive()) .service(hybrid(s3_service, rpc_service)), ); diff --git a/s3select/query/src/sql/physical/planner.rs b/s3select/query/src/sql/physical/planner.rs index 254c198d..5857b6b6 100644 --- a/s3select/query/src/sql/physical/planner.rs +++ b/s3select/query/src/sql/physical/planner.rs @@ -73,15 +73,15 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { logical_plan: &LogicalPlan, session: &SessionCtx, ) -> QueryResult> { - // 将扩展的物理计划优化规则注入df 的 session state + // 将扩展的物理计划优化规则注入 df 的 session state let new_state = SessionStateBuilder::new_from_existing(session.inner().clone()) .with_physical_optimizer_rules(self.ext_physical_optimizer_rules.clone()) .build(); - // 通过扩展的物理计划转换规则构造df 的 Physical Planner + // 通过扩展的物理计划转换规则构造 df 的 Physical Planner let planner = DFDefaultPhysicalPlanner::with_extension_planners(self.ext_physical_transform_rules.clone()); - // 执行df的物理计划规划及优化 + // 执行 df 的物理计划规划及优化 planner .create_physical_plan(logical_plan, &new_state) .await diff --git a/scripts/dev.sh b/scripts/dev.sh index 0df907a8..eb72331c 100755 --- a/scripts/dev.sh +++ b/scripts/dev.sh @@ -1,6 +1,6 @@ #!/bin/bash -# 脚本名称: scp_to_servers.sh +# 脚本名称:scp_to_servers.sh rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip # 压缩./target/x86_64-unknown-linux-musl/release/rustfs @@ -12,14 +12,14 @@ LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip" REMOTE_PATH="~" # 定义服务器列表数组 -# 格式:服务器IP 用户名 目标路径 +# 格式:服务器 IP 用户名 目标路径 SERVER_LIST=( "root@121.89.80.13" ) # 遍历服务器列表 for SERVER in "${SERVER_LIST[@]}"; do - echo "正在将文件复制到服务器: $SERVER 目标路径: $REMOTE_PATH" + echo "正在将文件复制到服务器:$SERVER 目标路径:$REMOTE_PATH" scp "$LOCAL_FILE" "${SERVER}:${REMOTE_PATH}" if [ $? -eq 0 ]; then echo "成功复制到 $SERVER" diff --git a/scripts/run.sh b/scripts/run.sh index 58d0f264..2004e05a 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -19,7 +19,8 @@ mkdir -p ./target/volume/test{0..4} if [ -z "$RUST_LOG" ]; then export RUST_BACKTRACE=1 - export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,iam=debug" +# export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,iam=debug" + export RUST_LOG="rustfs=info,ecstore=info,s3s=info,iam=info,rustfs-obs=info" fi # export RUSTFS_ERASURE_SET_DRIVE_COUNT=5 @@ -35,7 +36,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9002" # HTTPS 证书目录 # export RUSTFS_TLS_PATH="./deploy/certs" -# 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一 +# 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一 export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值