mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix: simplify filemeta MessagePack serialization and improve code quality
- Refactor `unmarshal_msg` to use `rmp_serde::from_slice` instead of manual parsing
- Add serde field renaming attributes to `FileMetaVersion` struct
- Remove 428 lines of manual MessagePack parsing code
- Improve string comparison using `!object.is_empty()` instead of `object != ""`
- Update volume directory numbering in run script from test{0..4} to test{1..4}
- Clean up unused imports and code
Fixes #517
This commit is contained in:
@@ -7,7 +7,6 @@ use crate::headers::{
|
||||
};
|
||||
use byteorder::ByteOrder;
|
||||
use bytes::Bytes;
|
||||
use rmp::Marker;
|
||||
use s3s::header::X_AMZ_RESTORE;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::Ordering;
|
||||
@@ -901,9 +900,13 @@ impl TryFrom<FileMetaVersion> for FileMetaShallowVersion {
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
|
||||
pub struct FileMetaVersion {
|
||||
#[serde(rename = "Type")]
|
||||
pub version_type: VersionType,
|
||||
#[serde(rename = "V2Obj")]
|
||||
pub object: Option<MetaObject>,
|
||||
#[serde(rename = "DelObj")]
|
||||
pub delete_marker: Option<MetaDeleteMarker>,
|
||||
#[serde(rename = "v")]
|
||||
pub write_version: u64, // rustfs version
|
||||
}
|
||||
|
||||
@@ -967,125 +970,16 @@ impl FileMetaVersion {
|
||||
}
|
||||
|
||||
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
|
||||
let mut cur = Cursor::new(buf);
|
||||
let ret: Self = rmp_serde::from_slice(buf)?;
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
|
||||
*self = ret;
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
|
||||
// println!("unmarshal_msg fields idx {}", fields_len);
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
// println!("unmarshal_msg fields name len() {}", &str_len);
|
||||
|
||||
// !!! Vec::with_capacity(str_len) fails, vec! works normally
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let field = String::from_utf8(field_buff)?;
|
||||
|
||||
// println!("unmarshal_msg fields name {}", &field);
|
||||
|
||||
match field.as_str() {
|
||||
"Type" => {
|
||||
let u: u8 = rmp::decode::read_int(&mut cur)?;
|
||||
self.version_type = VersionType::from_u8(u);
|
||||
}
|
||||
|
||||
"V2Obj" => {
|
||||
// is_nil()
|
||||
if buf[cur.position() as usize] == 0xc0 {
|
||||
rmp::decode::read_nil(&mut cur)?;
|
||||
} else {
|
||||
// let buf = unsafe { cur.position() };
|
||||
let mut obj = MetaObject::default();
|
||||
// let start = cur.position();
|
||||
|
||||
let (_, remain) = buf.split_at(cur.position() as usize);
|
||||
|
||||
let read_len = obj.unmarshal_msg(remain)?;
|
||||
cur.set_position(cur.position() + read_len);
|
||||
|
||||
self.object = Some(obj);
|
||||
}
|
||||
}
|
||||
"DelObj" => {
|
||||
if buf[cur.position() as usize] == 0xc0 {
|
||||
rmp::decode::read_nil(&mut cur)?;
|
||||
} else {
|
||||
// let buf = unsafe { cur.position() };
|
||||
let mut obj = MetaDeleteMarker::default();
|
||||
// let start = cur.position();
|
||||
|
||||
let (_, remain) = buf.split_at(cur.position() as usize);
|
||||
let read_len = obj.unmarshal_msg(remain)?;
|
||||
cur.set_position(cur.position() + read_len);
|
||||
|
||||
self.delete_marker = Some(obj);
|
||||
}
|
||||
}
|
||||
"v" => {
|
||||
self.write_version = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
name => return Err(Error::other(format!("not suport field name {name}"))),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cur.position())
|
||||
Ok(buf.len() as u64)
|
||||
}
|
||||
|
||||
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
|
||||
let mut len: u32 = 4;
|
||||
let mut mask: u8 = 0;
|
||||
|
||||
if self.object.is_none() {
|
||||
len -= 1;
|
||||
mask |= 0x2;
|
||||
}
|
||||
if self.delete_marker.is_none() {
|
||||
len -= 1;
|
||||
mask |= 0x4;
|
||||
}
|
||||
|
||||
let mut wr = Vec::new();
|
||||
|
||||
// Field count
|
||||
rmp::encode::write_map_len(&mut wr, len)?;
|
||||
|
||||
// write "Type"
|
||||
rmp::encode::write_str(&mut wr, "Type")?;
|
||||
rmp::encode::write_uint(&mut wr, self.version_type.to_u8() as u64)?;
|
||||
|
||||
if (mask & 0x2) == 0 {
|
||||
// write V2Obj
|
||||
rmp::encode::write_str(&mut wr, "V2Obj")?;
|
||||
if self.object.is_none() {
|
||||
rmp::encode::write_nil(&mut wr)?;
|
||||
} else {
|
||||
let buf = self.object.as_ref().unwrap().marshal_msg()?;
|
||||
wr.write_all(&buf)?;
|
||||
}
|
||||
}
|
||||
|
||||
if (mask & 0x4) == 0 {
|
||||
// write "DelObj"
|
||||
rmp::encode::write_str(&mut wr, "DelObj")?;
|
||||
if self.delete_marker.is_none() {
|
||||
rmp::encode::write_nil(&mut wr)?;
|
||||
} else {
|
||||
let buf = self.delete_marker.as_ref().unwrap().marshal_msg()?;
|
||||
wr.write_all(&buf)?;
|
||||
}
|
||||
}
|
||||
|
||||
// write "v"
|
||||
rmp::encode::write_str(&mut wr, "v")?;
|
||||
rmp::encode::write_uint(&mut wr, self.write_version)?;
|
||||
|
||||
Ok(wr)
|
||||
let buf = rmp_serde::to_vec(self)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub fn free_version(&self) -> bool {
|
||||
@@ -1457,405 +1351,56 @@ impl From<FileMetaVersion> for FileMetaVersionHeader {
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
|
||||
// Because of custom message_pack, field order must be guaranteed
|
||||
pub struct MetaObject {
|
||||
pub version_id: Option<Uuid>, // Version ID
|
||||
pub data_dir: Option<Uuid>, // Data dir ID
|
||||
pub erasure_algorithm: ErasureAlgo, // Erasure coding algorithm
|
||||
pub erasure_m: usize, // Erasure data blocks
|
||||
pub erasure_n: usize, // Erasure parity blocks
|
||||
pub erasure_block_size: usize, // Erasure block size
|
||||
pub erasure_index: usize, // Erasure disk index
|
||||
pub erasure_dist: Vec<u8>, // Erasure distribution
|
||||
#[serde(rename = "ID")]
|
||||
pub version_id: Option<Uuid>, // Version ID
|
||||
#[serde(rename = "DDir")]
|
||||
pub data_dir: Option<Uuid>, // Data dir ID
|
||||
#[serde(rename = "EcAlgo")]
|
||||
pub erasure_algorithm: ErasureAlgo, // Erasure coding algorithm
|
||||
#[serde(rename = "EcM")]
|
||||
pub erasure_m: usize, // Erasure data blocks
|
||||
#[serde(rename = "EcN")]
|
||||
pub erasure_n: usize, // Erasure parity blocks
|
||||
#[serde(rename = "EcBSize")]
|
||||
pub erasure_block_size: usize, // Erasure block size
|
||||
#[serde(rename = "EcIndex")]
|
||||
pub erasure_index: usize, // Erasure disk index
|
||||
#[serde(rename = "EcDist")]
|
||||
pub erasure_dist: Vec<u8>, // Erasure distribution
|
||||
#[serde(rename = "CSumAlgo")]
|
||||
pub bitrot_checksum_algo: ChecksumAlgo, // Bitrot checksum algo
|
||||
pub part_numbers: Vec<usize>, // Part Numbers
|
||||
pub part_etags: Vec<String>, // Part ETags
|
||||
pub part_sizes: Vec<usize>, // Part Sizes
|
||||
pub part_actual_sizes: Vec<i64>, // Part ActualSizes (compression)
|
||||
pub part_indices: Vec<Bytes>, // Part Indexes (compression)
|
||||
pub size: i64, // Object version size
|
||||
pub mod_time: Option<OffsetDateTime>, // Object version modified time
|
||||
#[serde(rename = "PartNums")]
|
||||
pub part_numbers: Vec<usize>, // Part Numbers
|
||||
#[serde(rename = "PartETags")]
|
||||
pub part_etags: Vec<String>, // Part ETags
|
||||
#[serde(rename = "PartSizes")]
|
||||
pub part_sizes: Vec<usize>, // Part Sizes
|
||||
#[serde(rename = "PartASizes")]
|
||||
pub part_actual_sizes: Vec<i64>, // Part ActualSizes (compression)
|
||||
#[serde(rename = "PartIdx")]
|
||||
pub part_indices: Vec<Bytes>, // Part Indexes (compression)
|
||||
#[serde(rename = "Size")]
|
||||
pub size: i64, // Object version size
|
||||
#[serde(rename = "MTime")]
|
||||
pub mod_time: Option<OffsetDateTime>, // Object version modified time
|
||||
#[serde(rename = "MetaSys")]
|
||||
pub meta_sys: HashMap<String, Vec<u8>>, // Object version internal metadata
|
||||
#[serde(rename = "MetaUsr")]
|
||||
pub meta_user: HashMap<String, String>, // Object version metadata set by user
|
||||
}
|
||||
|
||||
impl MetaObject {
|
||||
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
|
||||
let mut cur = Cursor::new(buf);
|
||||
let ret: Self = rmp_serde::from_slice(buf)?;
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
|
||||
*self = ret;
|
||||
|
||||
// let mut ret = Self::default();
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
|
||||
// println!("unmarshal_msg fields idx {}", fields_len);
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
// println!("unmarshal_msg fields name len() {}", &str_len);
|
||||
|
||||
// !!! Vec::with_capacity(str_len) fails, vec! works normally
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let field = String::from_utf8(field_buff)?;
|
||||
|
||||
// println!("unmarshal_msg fields name {}", &field);
|
||||
|
||||
match field.as_str() {
|
||||
"ID" => {
|
||||
rmp::decode::read_bin_len(&mut cur)?;
|
||||
let mut buf = [0u8; 16];
|
||||
cur.read_exact(&mut buf)?;
|
||||
self.version_id = {
|
||||
let id = Uuid::from_bytes(buf);
|
||||
if id.is_nil() { None } else { Some(id) }
|
||||
};
|
||||
}
|
||||
"DDir" => {
|
||||
rmp::decode::read_bin_len(&mut cur)?;
|
||||
let mut buf = [0u8; 16];
|
||||
cur.read_exact(&mut buf)?;
|
||||
self.data_dir = {
|
||||
let id = Uuid::from_bytes(buf);
|
||||
if id.is_nil() { None } else { Some(id) }
|
||||
};
|
||||
}
|
||||
"EcAlgo" => {
|
||||
let u: u8 = rmp::decode::read_int(&mut cur)?;
|
||||
self.erasure_algorithm = ErasureAlgo::from_u8(u)
|
||||
}
|
||||
"EcM" => {
|
||||
self.erasure_m = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
"EcN" => {
|
||||
self.erasure_n = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
"EcBSize" => {
|
||||
self.erasure_block_size = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
"EcIndex" => {
|
||||
self.erasure_index = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
"EcDist" => {
|
||||
let alen = rmp::decode::read_array_len(&mut cur)? as usize;
|
||||
self.erasure_dist = vec![0u8; alen];
|
||||
for i in 0..alen {
|
||||
self.erasure_dist[i] = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
}
|
||||
"CSumAlgo" => {
|
||||
let u: u8 = rmp::decode::read_int(&mut cur)?;
|
||||
self.bitrot_checksum_algo = ChecksumAlgo::from_u8(u)
|
||||
}
|
||||
"PartNums" => {
|
||||
let alen = rmp::decode::read_array_len(&mut cur)? as usize;
|
||||
self.part_numbers = vec![0; alen];
|
||||
for i in 0..alen {
|
||||
self.part_numbers[i] = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
}
|
||||
"PartETags" => {
|
||||
let array_len = match rmp::decode::read_nil(&mut cur) {
|
||||
Ok(_) => None,
|
||||
Err(e) => match e {
|
||||
rmp::decode::ValueReadError::TypeMismatch(marker) => match marker {
|
||||
Marker::FixArray(l) => Some(l as usize),
|
||||
Marker::Array16 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
Marker::Array32 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
_ => return Err(Error::other("PartETags parse failed")),
|
||||
},
|
||||
_ => return Err(Error::other("PartETags parse failed.")),
|
||||
},
|
||||
};
|
||||
|
||||
if array_len.is_some() {
|
||||
let l = array_len.unwrap();
|
||||
let mut etags = Vec::with_capacity(l);
|
||||
for _ in 0..l {
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
etags.push(String::from_utf8(field_buff)?);
|
||||
}
|
||||
self.part_etags = etags;
|
||||
}
|
||||
}
|
||||
"PartSizes" => {
|
||||
let alen = rmp::decode::read_array_len(&mut cur)? as usize;
|
||||
self.part_sizes = vec![0; alen];
|
||||
for i in 0..alen {
|
||||
self.part_sizes[i] = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
}
|
||||
"PartASizes" => {
|
||||
let array_len = match rmp::decode::read_nil(&mut cur) {
|
||||
Ok(_) => None,
|
||||
Err(e) => match e {
|
||||
rmp::decode::ValueReadError::TypeMismatch(marker) => match marker {
|
||||
Marker::FixArray(l) => Some(l as usize),
|
||||
Marker::Array16 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
Marker::Array32 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
_ => return Err(Error::other("PartETags parse failed")),
|
||||
},
|
||||
_ => return Err(Error::other("PartETags parse failed.")),
|
||||
},
|
||||
};
|
||||
if let Some(l) = array_len {
|
||||
let mut sizes = vec![0; l];
|
||||
for size in sizes.iter_mut().take(l) {
|
||||
*size = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
// for size in sizes.iter_mut().take(l) {
|
||||
// let tmp = rmp::decode::read_int(&mut cur)?;
|
||||
// size = tmp;
|
||||
// }
|
||||
self.part_actual_sizes = sizes;
|
||||
}
|
||||
}
|
||||
"PartIdx" => {
|
||||
let alen = rmp::decode::read_array_len(&mut cur)? as usize;
|
||||
|
||||
if alen == 0 {
|
||||
self.part_indices = Vec::new();
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut indices = Vec::with_capacity(alen);
|
||||
for _ in 0..alen {
|
||||
let blen = rmp::decode::read_bin_len(&mut cur)?;
|
||||
let mut buf = vec![0u8; blen as usize];
|
||||
cur.read_exact(&mut buf)?;
|
||||
|
||||
indices.push(Bytes::from(buf));
|
||||
}
|
||||
|
||||
self.part_indices = indices;
|
||||
}
|
||||
"Size" => {
|
||||
self.size = rmp::decode::read_int(&mut cur)?;
|
||||
}
|
||||
"MTime" => {
|
||||
let unix: i128 = rmp::decode::read_int(&mut cur)?;
|
||||
let time = OffsetDateTime::from_unix_timestamp_nanos(unix)?;
|
||||
if time == OffsetDateTime::UNIX_EPOCH {
|
||||
self.mod_time = None;
|
||||
} else {
|
||||
self.mod_time = Some(time);
|
||||
}
|
||||
}
|
||||
"MetaSys" => {
|
||||
let len = match rmp::decode::read_nil(&mut cur) {
|
||||
Ok(_) => None,
|
||||
Err(e) => match e {
|
||||
rmp::decode::ValueReadError::TypeMismatch(marker) => match marker {
|
||||
Marker::FixMap(l) => Some(l as usize),
|
||||
Marker::Map16 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
Marker::Map32 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
_ => return Err(Error::other("MetaSys parse failed")),
|
||||
},
|
||||
_ => return Err(Error::other("MetaSys parse failed.")),
|
||||
},
|
||||
};
|
||||
if len.is_some() {
|
||||
let l = len.unwrap();
|
||||
let mut map = HashMap::new();
|
||||
for _ in 0..l {
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
let key = String::from_utf8(field_buff)?;
|
||||
|
||||
let blen = rmp::decode::read_bin_len(&mut cur)?;
|
||||
let mut val = vec![0u8; blen as usize];
|
||||
cur.read_exact(&mut val)?;
|
||||
|
||||
map.insert(key, val);
|
||||
}
|
||||
|
||||
self.meta_sys = map;
|
||||
}
|
||||
}
|
||||
"MetaUsr" => {
|
||||
let len = match rmp::decode::read_nil(&mut cur) {
|
||||
Ok(_) => None,
|
||||
Err(e) => match e {
|
||||
rmp::decode::ValueReadError::TypeMismatch(marker) => match marker {
|
||||
Marker::FixMap(l) => Some(l as usize),
|
||||
Marker::Map16 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
Marker::Map32 => Some(rmp::decode::read_u16(&mut cur)? as usize),
|
||||
_ => return Err(Error::other("MetaUsr parse failed")),
|
||||
},
|
||||
_ => return Err(Error::other("MetaUsr parse failed.")),
|
||||
},
|
||||
};
|
||||
if len.is_some() {
|
||||
let l = len.unwrap();
|
||||
let mut map = HashMap::new();
|
||||
for _ in 0..l {
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
let key = String::from_utf8(field_buff)?;
|
||||
|
||||
let blen = rmp::decode::read_str_len(&mut cur)?;
|
||||
let mut val_buf = vec![0u8; blen as usize];
|
||||
cur.read_exact(&mut val_buf)?;
|
||||
let val = String::from_utf8(val_buf)?;
|
||||
|
||||
map.insert(key, val);
|
||||
}
|
||||
|
||||
self.meta_user = map;
|
||||
}
|
||||
}
|
||||
|
||||
name => return Err(Error::other(format!("not suport field name {name}"))),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cur.position())
|
||||
Ok(buf.len() as u64)
|
||||
}
|
||||
// marshal_msg custom messagepack naming consistent with go
|
||||
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
|
||||
let mut len: u32 = 18;
|
||||
let mut mask: u32 = 0;
|
||||
|
||||
if self.part_indices.is_empty() {
|
||||
len -= 1;
|
||||
mask |= 0x2000;
|
||||
}
|
||||
|
||||
let mut wr = Vec::new();
|
||||
|
||||
// Field count
|
||||
rmp::encode::write_map_len(&mut wr, len)?;
|
||||
|
||||
// string "ID"
|
||||
rmp::encode::write_str(&mut wr, "ID")?;
|
||||
rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?;
|
||||
|
||||
// string "DDir"
|
||||
rmp::encode::write_str(&mut wr, "DDir")?;
|
||||
rmp::encode::write_bin(&mut wr, self.data_dir.unwrap_or_default().as_bytes())?;
|
||||
|
||||
// string "EcAlgo"
|
||||
rmp::encode::write_str(&mut wr, "EcAlgo")?;
|
||||
rmp::encode::write_uint(&mut wr, self.erasure_algorithm.to_u8() as u64)?;
|
||||
|
||||
// string "EcM"
|
||||
rmp::encode::write_str(&mut wr, "EcM")?;
|
||||
rmp::encode::write_uint(&mut wr, self.erasure_m.try_into().unwrap())?;
|
||||
|
||||
// string "EcN"
|
||||
rmp::encode::write_str(&mut wr, "EcN")?;
|
||||
rmp::encode::write_uint(&mut wr, self.erasure_n.try_into().unwrap())?;
|
||||
|
||||
// string "EcBSize"
|
||||
rmp::encode::write_str(&mut wr, "EcBSize")?;
|
||||
rmp::encode::write_uint(&mut wr, self.erasure_block_size.try_into().unwrap())?;
|
||||
|
||||
// string "EcIndex"
|
||||
rmp::encode::write_str(&mut wr, "EcIndex")?;
|
||||
rmp::encode::write_uint(&mut wr, self.erasure_index.try_into().unwrap())?;
|
||||
|
||||
// string "EcDist"
|
||||
rmp::encode::write_str(&mut wr, "EcDist")?;
|
||||
rmp::encode::write_array_len(&mut wr, self.erasure_dist.len() as u32)?;
|
||||
for v in self.erasure_dist.iter() {
|
||||
rmp::encode::write_uint(&mut wr, *v as _)?;
|
||||
}
|
||||
|
||||
// string "CSumAlgo"
|
||||
rmp::encode::write_str(&mut wr, "CSumAlgo")?;
|
||||
rmp::encode::write_uint(&mut wr, self.bitrot_checksum_algo.to_u8() as u64)?;
|
||||
|
||||
// string "PartNums"
|
||||
rmp::encode::write_str(&mut wr, "PartNums")?;
|
||||
rmp::encode::write_array_len(&mut wr, self.part_numbers.len() as u32)?;
|
||||
for v in self.part_numbers.iter() {
|
||||
rmp::encode::write_uint(&mut wr, *v as _)?;
|
||||
}
|
||||
|
||||
// string "PartETags"
|
||||
rmp::encode::write_str(&mut wr, "PartETags")?;
|
||||
if self.part_etags.is_empty() {
|
||||
rmp::encode::write_nil(&mut wr)?;
|
||||
} else {
|
||||
rmp::encode::write_array_len(&mut wr, self.part_etags.len() as u32)?;
|
||||
for v in self.part_etags.iter() {
|
||||
rmp::encode::write_str(&mut wr, v.as_str())?;
|
||||
}
|
||||
}
|
||||
|
||||
// string "PartSizes"
|
||||
rmp::encode::write_str(&mut wr, "PartSizes")?;
|
||||
rmp::encode::write_array_len(&mut wr, self.part_sizes.len() as u32)?;
|
||||
for v in self.part_sizes.iter() {
|
||||
rmp::encode::write_uint(&mut wr, *v as _)?;
|
||||
}
|
||||
|
||||
// string "PartASizes"
|
||||
rmp::encode::write_str(&mut wr, "PartASizes")?;
|
||||
if self.part_actual_sizes.is_empty() {
|
||||
rmp::encode::write_nil(&mut wr)?;
|
||||
} else {
|
||||
rmp::encode::write_array_len(&mut wr, self.part_actual_sizes.len() as u32)?;
|
||||
for v in self.part_actual_sizes.iter() {
|
||||
rmp::encode::write_uint(&mut wr, *v as _)?;
|
||||
}
|
||||
}
|
||||
|
||||
if (mask & 0x2000) == 0 {
|
||||
// string "PartIdx"
|
||||
rmp::encode::write_str(&mut wr, "PartIdx")?;
|
||||
rmp::encode::write_array_len(&mut wr, self.part_indices.len() as u32)?;
|
||||
for v in self.part_indices.iter() {
|
||||
rmp::encode::write_bin(&mut wr, v)?;
|
||||
}
|
||||
}
|
||||
|
||||
// string "Size"
|
||||
rmp::encode::write_str(&mut wr, "Size")?;
|
||||
rmp::encode::write_uint(&mut wr, self.size.try_into().unwrap())?;
|
||||
|
||||
// string "MTime"
|
||||
rmp::encode::write_str(&mut wr, "MTime")?;
|
||||
rmp::encode::write_uint(
|
||||
&mut wr,
|
||||
self.mod_time
|
||||
.unwrap_or(OffsetDateTime::UNIX_EPOCH)
|
||||
.unix_timestamp_nanos()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
)?;
|
||||
|
||||
// string "MetaSys"
|
||||
rmp::encode::write_str(&mut wr, "MetaSys")?;
|
||||
if self.meta_sys.is_empty() {
|
||||
rmp::encode::write_nil(&mut wr)?;
|
||||
} else {
|
||||
rmp::encode::write_map_len(&mut wr, self.meta_sys.len() as u32)?;
|
||||
for (k, v) in &self.meta_sys {
|
||||
rmp::encode::write_str(&mut wr, k.as_str())?;
|
||||
rmp::encode::write_bin(&mut wr, v)?;
|
||||
}
|
||||
}
|
||||
|
||||
// string "MetaUsr"
|
||||
rmp::encode::write_str(&mut wr, "MetaUsr")?;
|
||||
if self.meta_user.is_empty() {
|
||||
rmp::encode::write_nil(&mut wr)?;
|
||||
} else {
|
||||
rmp::encode::write_map_len(&mut wr, self.meta_user.len() as u32)?;
|
||||
for (k, v) in &self.meta_user {
|
||||
rmp::encode::write_str(&mut wr, k.as_str())?;
|
||||
rmp::encode::write_str(&mut wr, v.as_str())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(wr)
|
||||
let buf = rmp_serde::to_vec(self)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> FileInfo {
|
||||
@@ -2109,8 +1654,11 @@ impl From<FileInfo> for MetaObject {
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
|
||||
pub struct MetaDeleteMarker {
|
||||
pub version_id: Option<Uuid>, // Version ID for delete marker
|
||||
pub mod_time: Option<OffsetDateTime>, // Object delete marker modified time
|
||||
#[serde(rename = "ID")]
|
||||
pub version_id: Option<Uuid>, // Version ID for delete marker
|
||||
#[serde(rename = "MTime")]
|
||||
pub mod_time: Option<OffsetDateTime>, // Object delete marker modified time
|
||||
#[serde(rename = "MetaSys")]
|
||||
pub meta_sys: Option<HashMap<String, Vec<u8>>>, // Delete marker internal metadata
|
||||
}
|
||||
|
||||
@@ -2140,106 +1688,115 @@ impl MetaDeleteMarker {
|
||||
}
|
||||
|
||||
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
|
||||
let mut cur = Cursor::new(buf);
|
||||
let ret: Self = rmp_serde::from_slice(buf)?;
|
||||
|
||||
let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
|
||||
*self = ret;
|
||||
|
||||
while fields_len > 0 {
|
||||
fields_len -= 1;
|
||||
Ok(buf.len() as u64)
|
||||
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
// let mut cur = Cursor::new(buf);
|
||||
|
||||
// !!! Vec::with_capacity(str_len) fails, vec! works normally
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
// let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
|
||||
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
// while fields_len > 0 {
|
||||
// fields_len -= 1;
|
||||
|
||||
let field = String::from_utf8(field_buff)?;
|
||||
// let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
|
||||
match field.as_str() {
|
||||
"ID" => {
|
||||
rmp::decode::read_bin_len(&mut cur)?;
|
||||
let mut buf = [0u8; 16];
|
||||
cur.read_exact(&mut buf)?;
|
||||
self.version_id = {
|
||||
let id = Uuid::from_bytes(buf);
|
||||
if id.is_nil() { None } else { Some(id) }
|
||||
};
|
||||
}
|
||||
// // !!! Vec::with_capacity(str_len) fails, vec! works normally
|
||||
// let mut field_buff = vec![0u8; str_len as usize];
|
||||
|
||||
"MTime" => {
|
||||
let unix: i64 = rmp::decode::read_int(&mut cur)?;
|
||||
let time = OffsetDateTime::from_unix_timestamp(unix)?;
|
||||
if time == OffsetDateTime::UNIX_EPOCH {
|
||||
self.mod_time = None;
|
||||
} else {
|
||||
self.mod_time = Some(time);
|
||||
}
|
||||
}
|
||||
"MetaSys" => {
|
||||
let l = rmp::decode::read_map_len(&mut cur)?;
|
||||
let mut map = HashMap::new();
|
||||
for _ in 0..l {
|
||||
let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
let mut field_buff = vec![0u8; str_len as usize];
|
||||
cur.read_exact(&mut field_buff)?;
|
||||
let key = String::from_utf8(field_buff)?;
|
||||
// cur.read_exact(&mut field_buff)?;
|
||||
|
||||
let blen = rmp::decode::read_bin_len(&mut cur)?;
|
||||
let mut val = vec![0u8; blen as usize];
|
||||
cur.read_exact(&mut val)?;
|
||||
// let field = String::from_utf8(field_buff)?;
|
||||
|
||||
map.insert(key, val);
|
||||
}
|
||||
// match field.as_str() {
|
||||
// "ID" => {
|
||||
// rmp::decode::read_bin_len(&mut cur)?;
|
||||
// let mut buf = [0u8; 16];
|
||||
// cur.read_exact(&mut buf)?;
|
||||
// self.version_id = {
|
||||
// let id = Uuid::from_bytes(buf);
|
||||
// if id.is_nil() { None } else { Some(id) }
|
||||
// };
|
||||
// }
|
||||
|
||||
self.meta_sys = Some(map);
|
||||
}
|
||||
name => return Err(Error::other(format!("not suport field name {name}"))),
|
||||
}
|
||||
}
|
||||
// "MTime" => {
|
||||
// let unix: i64 = rmp::decode::read_int(&mut cur)?;
|
||||
// let time = OffsetDateTime::from_unix_timestamp(unix)?;
|
||||
// if time == OffsetDateTime::UNIX_EPOCH {
|
||||
// self.mod_time = None;
|
||||
// } else {
|
||||
// self.mod_time = Some(time);
|
||||
// }
|
||||
// }
|
||||
// "MetaSys" => {
|
||||
// let l = rmp::decode::read_map_len(&mut cur)?;
|
||||
// let mut map = HashMap::new();
|
||||
// for _ in 0..l {
|
||||
// let str_len = rmp::decode::read_str_len(&mut cur)?;
|
||||
// let mut field_buff = vec![0u8; str_len as usize];
|
||||
// cur.read_exact(&mut field_buff)?;
|
||||
// let key = String::from_utf8(field_buff)?;
|
||||
|
||||
Ok(cur.position())
|
||||
// let blen = rmp::decode::read_bin_len(&mut cur)?;
|
||||
// let mut val = vec![0u8; blen as usize];
|
||||
// cur.read_exact(&mut val)?;
|
||||
|
||||
// map.insert(key, val);
|
||||
// }
|
||||
|
||||
// self.meta_sys = Some(map);
|
||||
// }
|
||||
// name => return Err(Error::other(format!("not suport field name {name}"))),
|
||||
// }
|
||||
// }
|
||||
|
||||
// Ok(cur.position())
|
||||
}
|
||||
|
||||
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
|
||||
let mut len: u32 = 3;
|
||||
let mut mask: u8 = 0;
|
||||
let buf = rmp_serde::to_vec(self)?;
|
||||
Ok(buf)
|
||||
|
||||
if self.meta_sys.is_none() {
|
||||
len -= 1;
|
||||
mask |= 0x4;
|
||||
}
|
||||
// let mut len: u32 = 3;
|
||||
// let mut mask: u8 = 0;
|
||||
|
||||
let mut wr = Vec::new();
|
||||
// if self.meta_sys.is_none() {
|
||||
// len -= 1;
|
||||
// mask |= 0x4;
|
||||
// }
|
||||
|
||||
// Field count
|
||||
rmp::encode::write_map_len(&mut wr, len)?;
|
||||
// let mut wr = Vec::new();
|
||||
|
||||
// string "ID"
|
||||
rmp::encode::write_str(&mut wr, "ID")?;
|
||||
rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?;
|
||||
// // Field count
|
||||
// rmp::encode::write_map_len(&mut wr, len)?;
|
||||
|
||||
// string "MTime"
|
||||
rmp::encode::write_str(&mut wr, "MTime")?;
|
||||
rmp::encode::write_uint(
|
||||
&mut wr,
|
||||
self.mod_time
|
||||
.unwrap_or(OffsetDateTime::UNIX_EPOCH)
|
||||
.unix_timestamp()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
)?;
|
||||
// // string "ID"
|
||||
// rmp::encode::write_str(&mut wr, "ID")?;
|
||||
// rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?;
|
||||
|
||||
if (mask & 0x4) == 0 {
|
||||
let metas = self.meta_sys.as_ref().unwrap();
|
||||
rmp::encode::write_map_len(&mut wr, metas.len() as u32)?;
|
||||
for (k, v) in metas {
|
||||
rmp::encode::write_str(&mut wr, k.as_str())?;
|
||||
rmp::encode::write_bin(&mut wr, v)?;
|
||||
}
|
||||
}
|
||||
// // string "MTime"
|
||||
// rmp::encode::write_str(&mut wr, "MTime")?;
|
||||
// rmp::encode::write_uint(
|
||||
// &mut wr,
|
||||
// self.mod_time
|
||||
// .unwrap_or(OffsetDateTime::UNIX_EPOCH)
|
||||
// .unix_timestamp()
|
||||
// .try_into()
|
||||
// .unwrap(),
|
||||
// )?;
|
||||
|
||||
Ok(wr)
|
||||
// if (mask & 0x4) == 0 {
|
||||
// let metas = self.meta_sys.as_ref().unwrap();
|
||||
// rmp::encode::write_map_len(&mut wr, metas.len() as u32)?;
|
||||
// for (k, v) in metas {
|
||||
// rmp::encode::write_str(&mut wr, k.as_str())?;
|
||||
// rmp::encode::write_bin(&mut wr, v)?;
|
||||
// }
|
||||
// }
|
||||
|
||||
// Ok(wr)
|
||||
}
|
||||
|
||||
/// Get delete marker signature
|
||||
|
||||
@@ -407,7 +407,6 @@ impl LocalDisk {
|
||||
};
|
||||
|
||||
if immediate_purge || delete_path.to_string_lossy().ends_with(SLASH_SEPARATOR) {
|
||||
warn!("move_to_trash immediate_purge {:?}", &delete_path.to_string_lossy());
|
||||
let trash_path2 = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
|
||||
let _ = rename_all(
|
||||
encode_dir_object(delete_path.to_string_lossy().as_ref()),
|
||||
|
||||
@@ -851,14 +851,14 @@ pub fn error_resp_to_object_err(err: ErrorResponse, params: Vec<&str>) -> std::i
|
||||
err = std::io::Error::other(StorageError::BucketNotFound(bucket));
|
||||
}
|
||||
S3ErrorCode::NoSuchKey => {
|
||||
if object != "" {
|
||||
if !object.is_empty() {
|
||||
err = std::io::Error::other(StorageError::ObjectNotFound(bucket, object));
|
||||
} else {
|
||||
err = std::io::Error::other(StorageError::BucketNotFound(bucket));
|
||||
}
|
||||
}
|
||||
S3ErrorCode::NoSuchVersion => {
|
||||
if object != "" {
|
||||
if !object.is_empty() {
|
||||
err = std::io::Error::other(StorageError::ObjectNotFound(bucket, object)); //, version_id);
|
||||
} else {
|
||||
err = std::io::Error::other(StorageError::BucketNotFound(bucket));
|
||||
|
||||
@@ -146,13 +146,10 @@ impl TierStats {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[allow(dead_code)]
|
||||
struct AllTierStats {
|
||||
tiers: HashMap<String, TierStats>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl AllTierStats {
|
||||
pub fn new() -> Self {
|
||||
Self { tiers: HashMap::new() }
|
||||
|
||||
@@ -15,7 +15,7 @@ current_dir=$(pwd)
|
||||
echo "Current directory: $current_dir"
|
||||
|
||||
# mkdir -p ./target/volume/test
|
||||
mkdir -p ./target/volume/test{0..4}
|
||||
mkdir -p ./target/volume/test{1..4}
|
||||
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
@@ -27,7 +27,7 @@ fi
|
||||
|
||||
# export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB"
|
||||
|
||||
export RUSTFS_VOLUMES="./target/volume/test{0...4}"
|
||||
export RUSTFS_VOLUMES="./target/volume/test{1...4}"
|
||||
# export RUSTFS_VOLUMES="./target/volume/test"
|
||||
export RUSTFS_ADDRESS=":9000"
|
||||
export RUSTFS_CONSOLE_ENABLE=true
|
||||
|
||||
Reference in New Issue
Block a user