From b48f273c7d3a2bfbdb08bd776670fa13487b30de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=AD=A3=E8=B6=85?= Date: Wed, 25 Feb 2026 08:38:32 +0800 Subject: [PATCH] refactor(filemeta): split filemeta into focused submodules (#1946) --- crates/filemeta/src/filemeta.rs | 2012 +------------------ crates/filemeta/src/filemeta/codec.rs | 342 ++++ crates/filemeta/src/filemeta/inline_data.rs | 62 + crates/filemeta/src/filemeta/validation.rs | 181 ++ crates/filemeta/src/filemeta/version.rs | 1484 ++++++++++++++ 5 files changed, 2077 insertions(+), 2004 deletions(-) create mode 100644 crates/filemeta/src/filemeta/codec.rs create mode 100644 crates/filemeta/src/filemeta/inline_data.rs create mode 100644 crates/filemeta/src/filemeta/validation.rs create mode 100644 crates/filemeta/src/filemeta/version.rs diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index 1da04895..7e89ef91 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -68,6 +68,14 @@ pub const TRANSITIONED_OBJECTNAME: &str = "transitioned-object"; pub const TRANSITIONED_VERSION_ID: &str = "transitioned-versionID"; pub const TRANSITION_TIER: &str = "transition-tier"; +mod codec; +mod inline_data; +mod validation; +mod version; + +pub use validation::{DetailedVersionStats, VersionStats}; +pub use version::*; + // type ScanHeaderVersionFn = Box Result<()>>; #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] @@ -86,315 +94,6 @@ impl FileMeta { } } - pub fn is_xl2_v1_format(buf: &[u8]) -> bool { - !matches!(Self::check_xl2_v1(buf), Err(_e)) - } - - pub fn load(buf: &[u8]) -> Result { - let mut xl = FileMeta::default(); - xl.unmarshal_msg(buf)?; - - Ok(xl) - } - - pub fn check_xl2_v1(buf: &[u8]) -> Result<(&[u8], u16, u16)> { - if buf.len() < 8 { - return Err(Error::other("xl file header not exists")); - } - - if buf[0..4] != XL_FILE_HEADER { - return Err(Error::other("xl file header err")); - } - - let major = byteorder::LittleEndian::read_u16(&buf[4..6]); - let minor = byteorder::LittleEndian::read_u16(&buf[6..8]); - if major > XL_FILE_VERSION_MAJOR { - return Err(Error::other("xl file version err")); - } - - Ok((&buf[8..], major, minor)) - } - - // Returns (meta, inline_data) - pub fn is_indexed_meta(buf: &[u8]) -> Result<(&[u8], &[u8])> { - let (buf, major, minor) = Self::check_xl2_v1(buf)?; - if major != 1 || minor < 3 { - return Ok((&[], &[])); - } - - let (mut size_buf, buf) = buf.split_at(5); - - // Get meta data, buf = crc + data - let bin_len = rmp::decode::read_bin_len(&mut size_buf)?; - - if buf.len() < bin_len as usize { - return Ok((&[], &[])); - } - let (meta, buf) = buf.split_at(bin_len as usize); - - if buf.len() < 5 { - return Err(Error::other("insufficient data for CRC")); - } - let (mut crc_buf, inline_data) = buf.split_at(5); - - // crc check - let crc = rmp::decode::read_u32(&mut crc_buf)?; - let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32; - - if crc != meta_crc { - return Err(Error::other("xl file crc check failed")); - } - - Ok((meta, inline_data)) - } - - // Fixed u32 - pub fn read_bytes_header(buf: &[u8]) -> Result<(u32, &[u8])> { - let (mut size_buf, _) = buf.split_at(5); - - // Get meta data, buf = crc + data - let bin_len = rmp::decode::read_bin_len(&mut size_buf)?; - - Ok((bin_len, &buf[5..])) - } - - pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { - let i = buf.len() as u64; - - // check version, buf = buf[8..] - let (buf, _, _) = Self::check_xl2_v1(buf).map_err(|e| { - error!("failed to check XL2 v1 format: {}", e); - e - })?; - - let (mut size_buf, buf) = buf.split_at(5); - - // Get meta data, buf = crc + data - let bin_len = rmp::decode::read_bin_len(&mut size_buf).map_err(|e| { - error!("failed to read binary length for metadata: {}", e); - Error::other(format!("failed to read binary length for metadata: {e}")) - })?; - - if buf.len() < bin_len as usize { - error!("insufficient data for metadata: expected {} bytes, got {} bytes", bin_len, buf.len()); - return Err(Error::other("insufficient data for metadata")); - } - let (meta, buf) = buf.split_at(bin_len as usize); - - if buf.len() < 5 { - error!("insufficient data for CRC: expected 5 bytes, got {} bytes", buf.len()); - return Err(Error::other("insufficient data for CRC")); - } - let (mut crc_buf, buf) = buf.split_at(5); - - // crc check - let crc = rmp::decode::read_u32(&mut crc_buf).map_err(|e| { - error!("failed to read CRC value: {}", e); - Error::other(format!("failed to read CRC value: {e}")) - })?; - let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32; - - if crc != meta_crc { - error!("xl file crc check failed: expected CRC {:#x}, got {:#x}", meta_crc, crc); - return Err(Error::other("xl file crc check failed")); - } - - if !buf.is_empty() { - self.data.update(buf); - self.data.validate().map_err(|e| { - error!("data validation failed: {}", e); - e - })?; - } - - // Parse meta - if !meta.is_empty() { - let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta).map_err(|e| { - error!("failed to decode XL headers: {}", e); - e - })?; - - // let (_, meta) = meta.split_at(read_size as usize); - - self.meta_ver = meta_ver; - - self.versions = Vec::with_capacity(versions_len); - - let mut cur: Cursor<&[u8]> = Cursor::new(meta); - for _ in 0..versions_len { - let bin_len = rmp::decode::read_bin_len(&mut cur).map_err(|e| { - error!("failed to read binary length for version header: {}", e); - Error::other(format!("failed to read binary length for version header: {e}")) - })? as usize; - - let mut header_buf = vec![0u8; bin_len]; - - cur.read_exact(&mut header_buf)?; - - let mut ver = FileMetaShallowVersion::default(); - ver.header.unmarshal_msg(&header_buf).map_err(|e| { - error!("failed to unmarshal version header: {}", e); - e - })?; - - let bin_len = rmp::decode::read_bin_len(&mut cur).map_err(|e| { - error!("failed to read binary length for version metadata: {}", e); - Error::other(format!("failed to read binary length for version metadata: {e}")) - })? as usize; - - let mut ver_meta_buf = vec![0u8; bin_len]; - cur.read_exact(&mut ver_meta_buf)?; - - ver.meta.extend_from_slice(&ver_meta_buf); - - self.versions.push(ver); - } - } - - Ok(i) - } - - // decode_xl_headers parses meta header, returns (versions count, xl_header_version, xl_meta_version, read data length) - fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> { - let mut cur = Cursor::new(buf); - - let header_ver: u8 = rmp::decode::read_int(&mut cur)?; - - if header_ver > XL_HEADER_VERSION { - return Err(Error::other("xl header version invalid")); - } - - let meta_ver: u8 = rmp::decode::read_int(&mut cur)?; - if meta_ver > XL_META_VERSION { - return Err(Error::other("xl meta version invalid")); - } - - let versions_len: usize = rmp::decode::read_int(&mut cur)?; - - Ok((versions_len, header_ver, meta_ver, &buf[cur.position() as usize..])) - } - - fn decode_versions Result<()>>(buf: &[u8], versions: usize, mut fnc: F) -> Result<()> { - let mut cur: Cursor<&[u8]> = Cursor::new(buf); - - for i in 0..versions { - let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize; - let start = cur.position() as usize; - let end = start + bin_len; - let header_buf = &buf[start..end]; - - cur.set_position(end as u64); - - let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize; - let start = cur.position() as usize; - let end = start + bin_len; - let ver_meta_buf = &buf[start..end]; - - cur.set_position(end as u64); - - if let Err(err) = fnc(i, header_buf, ver_meta_buf) { - if err == Error::DoneForNow { - return Ok(()); - } - - return Err(err); - } - } - - Ok(()) - } - - pub fn is_latest_delete_marker(buf: &[u8]) -> bool { - let header = Self::decode_xl_headers(buf).ok(); - if let Some((versions, _hdr_v, _meta_v, meta)) = header { - if versions == 0 { - return false; - } - - let mut is_delete_marker = false; - - let _ = Self::decode_versions(meta, versions, |_: usize, hdr: &[u8], _: &[u8]| { - let mut header = FileMetaVersionHeader::default(); - if header.unmarshal_msg(hdr).is_err() { - return Err(Error::DoneForNow); - } - - is_delete_marker = header.version_type == VersionType::Delete; - - Err(Error::DoneForNow) - }); - - is_delete_marker - } else { - false - } - } - - pub fn marshal_msg(&self) -> Result> { - let mut wr = Vec::new(); - - // header - wr.write_all(XL_FILE_HEADER.as_slice())?; - - let mut major = [0u8; 2]; - byteorder::LittleEndian::write_u16(&mut major, XL_FILE_VERSION_MAJOR); - wr.write_all(major.as_slice())?; - - let mut minor = [0u8; 2]; - byteorder::LittleEndian::write_u16(&mut minor, XL_FILE_VERSION_MINOR); - wr.write_all(minor.as_slice())?; - - // size bin32 reserved for write_bin_len - wr.write_all(&[0xc6, 0, 0, 0, 0])?; - - let offset = wr.len(); - - // xl header - rmp::encode::write_uint8(&mut wr, XL_HEADER_VERSION)?; - rmp::encode::write_uint8(&mut wr, XL_META_VERSION)?; - - // versions - rmp::encode::write_sint(&mut wr, self.versions.len() as i64)?; - - for ver in self.versions.iter() { - let hmsg = ver.header.marshal_msg()?; - rmp::encode::write_bin(&mut wr, &hmsg)?; - - rmp::encode::write_bin(&mut wr, &ver.meta)?; - } - - // Update bin length - let data_len = wr.len() - offset; - byteorder::BigEndian::write_u32(&mut wr[offset - 4..offset], data_len as u32); - - let crc = xxh64::xxh64(&wr[offset..], XXHASH_SEED) as u32; - let mut crc_buf = [0u8; 5]; - crc_buf[0] = 0xce; // u32 - byteorder::BigEndian::write_u32(&mut crc_buf[1..], crc); - - wr.write_all(&crc_buf)?; - - wr.write_all(self.data.as_slice())?; - - Ok(wr) - } - - // pub fn unmarshal(buf: &[u8]) -> Result { - // let mut s = Self::default(); - // s.unmarshal_msg(buf)?; - // Ok(s) - // // let t: FileMeta = rmp_serde::from_slice(buf)?; - // // Ok(t) - // } - - // pub fn marshal_msg(&self) -> Result> { - // let mut buf = Vec::new(); - - // self.serialize(&mut Serializer::new(&mut buf))?; - - // Ok(buf) - // } - fn get_idx(&self, idx: usize) -> Result { if idx > self.versions.len() { return Err(Error::FileNotFound); @@ -456,19 +155,6 @@ impl FileMeta { Err(Error::FileVersionNotFound) } - // shard_data_dir_count queries the count of data_dir under vid - pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { - let vid = vid.unwrap_or_default(); - self.versions - .iter() - .filter(|v| { - v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.uses_data_dir() - }) - .map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default()) - .filter(|v| v == data_dir) - .count() - } - pub fn update_object_version(&mut self, fi: FileInfo) -> Result<()> { for version in self.versions.iter_mut() { match version.header.version_type { @@ -1084,77 +770,6 @@ impl FileMeta { self.versions.first().unwrap().header.mod_time } - /// Check if the metadata format is compatible - pub fn is_compatible_with_meta(&self) -> bool { - // Check version compatibility - if self.meta_ver != XL_META_VERSION { - return false; - } - - // For compatibility, we allow versions with different types - // Just check basic structure validity - true - } - - /// Validate metadata integrity - pub fn validate_integrity(&self) -> Result<()> { - // Check if versions are sorted by modification time - if !self.is_sorted_by_mod_time() { - return Err(Error::other("versions not sorted by modification time")); - } - - // Validate inline data if present - self.data.validate()?; - - Ok(()) - } - - /// Check if versions are sorted by modification time (newest first) - fn is_sorted_by_mod_time(&self) -> bool { - if self.versions.len() <= 1 { - return true; - } - - for i in 1..self.versions.len() { - let prev_time = self.versions[i - 1].header.mod_time; - let curr_time = self.versions[i].header.mod_time; - - match (prev_time, curr_time) { - (Some(prev), Some(curr)) => { - if prev < curr { - return false; - } - } - (None, Some(_)) => return false, - _ => continue, - } - } - - true - } - - /// Get statistics about versions - pub fn get_version_stats(&self) -> VersionStats { - let mut stats = VersionStats { - total_versions: self.versions.len(), - ..Default::default() - }; - - for version in &self.versions { - match version.header.version_type { - VersionType::Object => stats.object_versions += 1, - VersionType::Delete => stats.delete_markers += 1, - VersionType::Invalid | VersionType::Legacy => stats.invalid_versions += 1, - } - - if version.header.free_version() { - stats.free_versions += 1; - } - } - - stats - } - /// Load or convert from buffer pub fn load_or_convert(buf: &[u8]) -> Result { // Try to load as current format first @@ -1174,41 +789,6 @@ impl FileMeta { Err(Error::other("Legacy format not yet implemented")) } - /// Get all data directories used by versions - pub fn get_data_dirs(&self) -> Result>> { - let mut data_dirs = Vec::new(); - for version in &self.versions { - if version.header.version_type == VersionType::Object { - let ver = FileMetaVersion::try_from(version.meta.as_slice())?; - data_dirs.push(ver.get_data_dir()); - } - } - Ok(data_dirs) - } - - /// Count shared data directories - pub fn shared_data_dir_count(&self, version_id: Option, data_dir: Option) -> usize { - let version_id = version_id.unwrap_or_default(); - - if self.data.entries().unwrap_or_default() > 0 - && self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some() - { - return 0; - } - - self.versions - .iter() - .filter(|v| { - v.header.version_type == VersionType::Object - && v.header.version_id != Some(version_id) - && v.header.uses_data_dir() - }) - .filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok()) - .filter(|&dir| dir.is_none() || dir != data_dir) - //.filter(|&dir| dir != data_dir) - .count() - } - /// Add legacy version pub fn add_legacy(&mut self, _legacy_obj: &str) -> Result<()> { // Implementation for adding legacy xl.meta v1 objects @@ -1260,1485 +840,6 @@ impl FileMeta { } } -// impl Display for FileMeta { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// f.write_str("FileMeta:")?; -// for (i, ver) in self.versions.iter().enumerate() { -// let mut meta = FileMetaVersion::default(); -// meta.unmarshal_msg(&ver.meta).unwrap_or_default(); -// f.write_fmt(format_args!("ver:{} header {:?}, meta {:?}", i, ver.header, meta))?; -// } - -// f.write_str("\n") -// } -// } - -#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Eq, PartialOrd, Ord)] -pub struct FileMetaShallowVersion { - pub header: FileMetaVersionHeader, - pub meta: Vec, // FileMetaVersion.marshal_msg -} - -impl FileMetaShallowVersion { - pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> Result { - let file_version = FileMetaVersion::try_from(self.meta.as_slice())?; - - Ok(file_version.into_fileinfo(volume, path, all_parts)) - } -} - -impl TryFrom for FileMetaShallowVersion { - type Error = Error; - - fn try_from(value: FileMetaVersion) -> std::result::Result { - let header = value.header(); - let meta = value.marshal_msg()?; - Ok(Self { meta, header }) - } -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)] -pub struct FileMetaVersion { - #[serde(rename = "Type")] - pub version_type: VersionType, - #[serde(rename = "V2Obj")] - pub object: Option, - #[serde(rename = "DelObj")] - pub delete_marker: Option, - #[serde(rename = "v")] - pub write_version: u64, // rustfs version -} - -impl FileMetaVersion { - pub fn valid(&self) -> bool { - if !self.version_type.valid() { - return false; - } - - match self.version_type { - VersionType::Object => self - .object - .as_ref() - .map(|v| v.erasure_algorithm.valid() && v.bitrot_checksum_algo.valid() && v.mod_time.is_some()) - .unwrap_or_default(), - VersionType::Delete => self - .delete_marker - .as_ref() - .map(|v| v.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH) > OffsetDateTime::UNIX_EPOCH) - .unwrap_or_default(), - _ => false, - } - } - - pub fn get_data_dir(&self) -> Option { - if self.valid() { - { - if self.version_type == VersionType::Object { - self.object.as_ref().map(|v| v.data_dir).unwrap_or_default() - } else { - None - } - } - } else { - Default::default() - } - } - - pub fn get_version_id(&self) -> Option { - match self.version_type { - VersionType::Object => self.object.as_ref().map(|v| v.version_id).unwrap_or_default(), - VersionType::Delete => self.delete_marker.as_ref().map(|v| v.version_id).unwrap_or_default(), - _ => None, - } - } - - pub fn get_mod_time(&self) -> Option { - match self.version_type { - VersionType::Object => self.object.as_ref().map(|v| v.mod_time).unwrap_or_default(), - VersionType::Delete => self.delete_marker.as_ref().map(|v| v.mod_time).unwrap_or_default(), - _ => None, - } - } - - // decode_data_dir_from_meta reads data_dir from meta TODO: directly parse only data_dir from meta buf, msg.skip - pub fn decode_data_dir_from_meta(buf: &[u8]) -> Result> { - let mut ver = Self::default(); - ver.unmarshal_msg(buf)?; - - let data_dir = ver.object.map(|v| v.data_dir).unwrap_or_default(); - Ok(data_dir) - } - - pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { - let ret: Self = rmp_serde::from_slice(buf)?; - - *self = ret; - - Ok(buf.len() as u64) - } - - pub fn marshal_msg(&self) -> Result> { - let buf = rmp_serde::to_vec(self)?; - Ok(buf) - } - - pub fn free_version(&self) -> bool { - self.version_type == VersionType::Delete && self.delete_marker.as_ref().map(|m| m.free_version()).unwrap_or_default() - } - - pub fn header(&self) -> FileMetaVersionHeader { - FileMetaVersionHeader::from(self.clone()) - } - - pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> FileInfo { - match self.version_type { - VersionType::Invalid | VersionType::Legacy => FileInfo { - name: path.to_string(), - volume: volume.to_string(), - ..Default::default() - }, - VersionType::Object => self - .object - .as_ref() - .unwrap_or(&MetaObject::default()) - .into_fileinfo(volume, path, all_parts), - VersionType::Delete => self - .delete_marker - .as_ref() - .unwrap_or(&MetaDeleteMarker::default()) - .into_fileinfo(volume, path, all_parts), - } - } - - /// Support for Legacy version type - pub fn is_legacy(&self) -> bool { - self.version_type == VersionType::Legacy - } - - /// Get signature for version - pub fn get_signature(&self) -> [u8; 4] { - match self.version_type { - VersionType::Object => { - if let Some(ref obj) = self.object { - // Calculate signature based on object metadata - let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); - hasher.update(obj.version_id.unwrap_or_default().as_bytes()); - if let Some(mod_time) = obj.mod_time { - hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); - } - let hash = hasher.finish(); - let bytes = hash.to_le_bytes(); - [bytes[0], bytes[1], bytes[2], bytes[3]] - } else { - [0; 4] - } - } - VersionType::Delete => { - if let Some(ref dm) = self.delete_marker { - // Calculate signature for delete marker - let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); - hasher.update(dm.version_id.unwrap_or_default().as_bytes()); - if let Some(mod_time) = dm.mod_time { - hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); - } - let hash = hasher.finish(); - let bytes = hash.to_le_bytes(); - [bytes[0], bytes[1], bytes[2], bytes[3]] - } else { - [0; 4] - } - } - _ => [0; 4], - } - } - - /// Check if this version uses data directory - pub fn uses_data_dir(&self) -> bool { - match self.version_type { - VersionType::Object => self.object.as_ref().map(|obj| obj.uses_data_dir()).unwrap_or(false), - _ => false, - } - } - - /// Check if this version uses inline data - pub fn uses_inline_data(&self) -> bool { - match self.version_type { - VersionType::Object => self.object.as_ref().map(|obj| obj.inlinedata()).unwrap_or(false), - _ => false, - } - } -} - -impl TryFrom<&[u8]> for FileMetaVersion { - type Error = Error; - - fn try_from(value: &[u8]) -> std::result::Result { - let mut ver = FileMetaVersion::default(); - ver.unmarshal_msg(value)?; - Ok(ver) - } -} - -impl From for FileMetaVersion { - fn from(value: FileInfo) -> Self { - { - if value.deleted { - FileMetaVersion { - version_type: VersionType::Delete, - delete_marker: Some(MetaDeleteMarker::from(value)), - object: None, - write_version: 0, - } - } else { - FileMetaVersion { - version_type: VersionType::Object, - delete_marker: None, - object: Some(MetaObject::from(value)), - write_version: 0, - } - } - } - } -} - -impl TryFrom for FileMetaVersion { - type Error = Error; - - fn try_from(value: FileMetaShallowVersion) -> std::result::Result { - FileMetaVersion::try_from(value.meta.as_slice()) - } -} - -#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)] -pub struct FileMetaVersionHeader { - pub version_id: Option, - pub mod_time: Option, - pub signature: [u8; 4], - pub version_type: VersionType, - pub flags: u8, - pub ec_n: u8, - pub ec_m: u8, -} - -impl FileMetaVersionHeader { - pub fn has_ec(&self) -> bool { - self.ec_m > 0 && self.ec_n > 0 - } - - pub fn matches_not_strict(&self, o: &FileMetaVersionHeader) -> bool { - let mut ok = self.version_id == o.version_id && self.version_type == o.version_type && self.matches_ec(o); - if self.version_id.is_none() { - ok = ok && self.mod_time == o.mod_time; - } - - ok - } - - pub fn matches_ec(&self, o: &FileMetaVersionHeader) -> bool { - if self.has_ec() && o.has_ec() { - return self.ec_n == o.ec_n && self.ec_m == o.ec_m; - } - - true - } - - pub fn free_version(&self) -> bool { - self.flags & XL_FLAG_FREE_VERSION != 0 - } - - pub fn sorts_before(&self, o: &FileMetaVersionHeader) -> bool { - if self == o { - return false; - } - - // Prefer newest modtime. - if self.mod_time != o.mod_time { - return self.mod_time > o.mod_time; - } - - match self.mod_time.cmp(&o.mod_time) { - Ordering::Greater => { - return true; - } - Ordering::Less => { - return false; - } - _ => {} - } - - // The following doesn't make too much sense, but we want sort to be consistent nonetheless. - // Prefer lower types - if self.version_type != o.version_type { - return self.version_type < o.version_type; - } - // Consistent sort on signature - match self.version_id.cmp(&o.version_id) { - Ordering::Greater => { - return true; - } - Ordering::Less => { - return false; - } - _ => {} - } - - if self.flags != o.flags { - return self.flags > o.flags; - } - - false - } - - pub fn uses_data_dir(&self) -> bool { - self.flags & Flags::UsesDataDir as u8 != 0 - } - - pub fn marshal_msg(&self) -> Result> { - let mut wr = Vec::new(); - - // array len 7 - rmp::encode::write_array_len(&mut wr, 7)?; - - // version_id - rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?; - // mod_time - rmp::encode::write_i64(&mut wr, self.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH).unix_timestamp_nanos() as i64)?; - // signature - rmp::encode::write_bin(&mut wr, self.signature.as_slice())?; - // version_type - rmp::encode::write_uint8(&mut wr, self.version_type.to_u8())?; - // flags - rmp::encode::write_uint8(&mut wr, self.flags)?; - // ec_n - rmp::encode::write_uint8(&mut wr, self.ec_n)?; - // ec_m - rmp::encode::write_uint8(&mut wr, self.ec_m)?; - - Ok(wr) - } - - pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { - let mut cur = Cursor::new(buf); - let alen = rmp::decode::read_array_len(&mut cur)?; - if alen != 7 { - return Err(Error::other(format!("version header array len err need 7 got {alen}"))); - } - - // version_id - rmp::decode::read_bin_len(&mut cur)?; - let mut buf = [0u8; 16]; - cur.read_exact(&mut buf)?; - self.version_id = { - let id = Uuid::from_bytes(buf); - // if id.is_nil() { None } else { Some(id) } - Some(id) - }; - - // mod_time - let unix: i128 = rmp::decode::read_int(&mut cur)?; - - let time = OffsetDateTime::from_unix_timestamp_nanos(unix)?; - if time == OffsetDateTime::UNIX_EPOCH { - self.mod_time = None; - } else { - self.mod_time = Some(time); - } - - // signature - rmp::decode::read_bin_len(&mut cur)?; - cur.read_exact(&mut self.signature)?; - - // version_type - let typ: u8 = rmp::decode::read_int(&mut cur)?; - self.version_type = VersionType::from_u8(typ); - - // flags - self.flags = rmp::decode::read_int(&mut cur)?; - // ec_n - self.ec_n = rmp::decode::read_int(&mut cur)?; - // ec_m - self.ec_m = rmp::decode::read_int(&mut cur)?; - - Ok(cur.position()) - } - - /// Get signature for header - pub fn get_signature(&self) -> [u8; 4] { - self.signature - } - - /// Check if this header represents inline data - pub fn inline_data(&self) -> bool { - self.flags & Flags::InlineData as u8 != 0 - } - - /// Update signature based on version content - pub fn update_signature(&mut self, version: &FileMetaVersion) { - self.signature = version.get_signature(); - } -} - -impl PartialOrd for FileMetaVersionHeader { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for FileMetaVersionHeader { - fn cmp(&self, other: &Self) -> Ordering { - match self.mod_time.cmp(&other.mod_time) { - core::cmp::Ordering::Equal => {} - ord => return ord, - } - - match self.version_type.cmp(&other.version_type) { - core::cmp::Ordering::Equal => {} - ord => return ord, - } - match self.signature.cmp(&other.signature) { - core::cmp::Ordering::Equal => {} - ord => return ord, - } - match self.version_id.cmp(&other.version_id) { - core::cmp::Ordering::Equal => {} - ord => return ord, - } - self.flags.cmp(&other.flags) - } -} - -impl From for FileMetaVersionHeader { - fn from(value: FileMetaVersion) -> Self { - let flags = { - let mut f: u8 = 0; - if value.free_version() { - f |= Flags::FreeVersion as u8; - } - - if value.version_type == VersionType::Object && value.object.as_ref().map(|v| v.uses_data_dir()).unwrap_or_default() { - f |= Flags::UsesDataDir as u8; - } - - if value.version_type == VersionType::Object && value.object.as_ref().map(|v| v.inlinedata()).unwrap_or_default() { - f |= Flags::InlineData as u8; - } - - f - }; - - let (ec_n, ec_m) = match (value.version_type == VersionType::Object, value.object.as_ref()) { - (true, Some(obj)) => (obj.erasure_n as u8, obj.erasure_m as u8), - _ => (0, 0), - }; - - Self { - version_id: value.get_version_id(), - mod_time: value.get_mod_time(), - signature: [0, 0, 0, 0], - version_type: value.version_type, - flags, - ec_n, - ec_m, - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] -// Because of custom message_pack, field order must be guaranteed -pub struct MetaObject { - #[serde(rename = "ID")] - pub version_id: Option, // Version ID - #[serde(rename = "DDir")] - pub data_dir: Option, // Data dir ID - #[serde(rename = "EcAlgo")] - pub erasure_algorithm: ErasureAlgo, // Erasure coding algorithm - #[serde(rename = "EcM")] - pub erasure_m: usize, // Erasure data blocks - #[serde(rename = "EcN")] - pub erasure_n: usize, // Erasure parity blocks - #[serde(rename = "EcBSize")] - pub erasure_block_size: usize, // Erasure block size - #[serde(rename = "EcIndex")] - pub erasure_index: usize, // Erasure disk index - #[serde(rename = "EcDist")] - pub erasure_dist: Vec, // Erasure distribution - #[serde(rename = "CSumAlgo")] - pub bitrot_checksum_algo: ChecksumAlgo, // Bitrot checksum algo - #[serde(rename = "PartNums")] - pub part_numbers: Vec, // Part Numbers - #[serde(rename = "PartETags")] - pub part_etags: Vec, // Part ETags - #[serde(rename = "PartSizes")] - pub part_sizes: Vec, // Part Sizes - #[serde(rename = "PartASizes")] - pub part_actual_sizes: Vec, // Part ActualSizes (compression) - #[serde(rename = "PartIdx")] - pub part_indices: Vec, // Part Indexes (compression) - #[serde(rename = "Size")] - pub size: i64, // Object version size - #[serde(rename = "MTime")] - pub mod_time: Option, // Object version modified time - #[serde(rename = "MetaSys")] - pub meta_sys: HashMap>, // Object version internal metadata - #[serde(rename = "MetaUsr")] - pub meta_user: HashMap, // Object version metadata set by user -} - -impl MetaObject { - pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { - let ret: Self = rmp_serde::from_slice(buf)?; - - *self = ret; - - Ok(buf.len() as u64) - } - // marshal_msg custom messagepack naming consistent with go - pub fn marshal_msg(&self) -> Result> { - let buf = rmp_serde::to_vec(self)?; - Ok(buf) - } - - pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> FileInfo { - let version_id = self.version_id.filter(|&vid| !vid.is_nil()); - - let parts = if all_parts { - let mut parts = vec![ObjectPartInfo::default(); self.part_numbers.len()]; - - for (i, part) in parts.iter_mut().enumerate() { - part.number = self.part_numbers[i]; - part.size = self.part_sizes[i]; - part.actual_size = self.part_actual_sizes[i]; - - if self.part_etags.len() == self.part_numbers.len() { - part.etag = self.part_etags[i].clone(); - } - - if self.part_indices.len() == self.part_numbers.len() { - part.index = if self.part_indices[i].is_empty() { - None - } else { - Some(self.part_indices[i].clone()) - }; - } - } - parts - } else { - Vec::new() - }; - - let mut metadata = HashMap::with_capacity(self.meta_user.len() + self.meta_sys.len()); - for (k, v) in &self.meta_user { - if k == AMZ_META_UNENCRYPTED_CONTENT_LENGTH || k == AMZ_META_UNENCRYPTED_CONTENT_MD5 { - continue; - } - - if k == AMZ_STORAGE_CLASS && v == "STANDARD" { - continue; - } - - metadata.insert(k.to_owned(), v.to_owned()); - } - - let tier_fvidkey = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}").to_lowercase(); - let tier_fvmarker_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}").to_lowercase(); - - for (k, v) in &self.meta_sys { - let lower_k = k.to_lowercase(); - - if lower_k == tier_fvidkey || lower_k == tier_fvmarker_key { - continue; - } - - if lower_k == AMZ_STORAGE_CLASS.to_lowercase() && v == b"STANDARD" { - continue; - } - - if k.starts_with(RESERVED_METADATA_PREFIX) - || k.starts_with(RESERVED_METADATA_PREFIX_LOWER) - || lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() - { - metadata.insert(k.to_owned(), String::from_utf8(v.to_owned()).unwrap_or_default()); - } - } - - let replication_state_internal = get_internal_replication_state(&metadata); - - let mut deleted = false; - - if let Some(v) = replication_state_internal.as_ref() { - if !v.composite_version_purge_status().is_empty() { - deleted = true; - } - - let st = v.composite_replication_status(); - if !st.is_empty() { - metadata.insert(AMZ_BUCKET_REPLICATION_STATUS.to_string(), st.to_string()); - } - } - - let checksum = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}crc").as_str()) - .map(|v| Bytes::from(v.clone())); - - let erasure = ErasureInfo { - algorithm: self.erasure_algorithm.to_string(), - data_blocks: self.erasure_m, - parity_blocks: self.erasure_n, - block_size: self.erasure_block_size, - index: self.erasure_index, - distribution: self.erasure_dist.iter().map(|&v| v as usize).collect(), - ..Default::default() - }; - - let transition_status = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}").as_str()) - .map(|v| String::from_utf8_lossy(v).to_string()) - .unwrap_or_default(); - let transitioned_objname = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}").as_str()) - .map(|v| String::from_utf8_lossy(v).to_string()) - .unwrap_or_default(); - let transition_version_id = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}").as_str()) - .map(|v| Uuid::from_slice(v.as_slice()).unwrap_or_default()); - let transition_tier = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}").as_str()) - .map(|v| String::from_utf8_lossy(v).to_string()) - .unwrap_or_default(); - - FileInfo { - version_id, - erasure, - data_dir: self.data_dir, - mod_time: self.mod_time, - size: self.size, - name: path.to_string(), - volume: volume.to_string(), - parts, - metadata, - replication_state_internal, - deleted, - checksum, - transition_status, - transitioned_objname, - transition_version_id, - transition_tier, - ..Default::default() - } - } - - pub fn set_transition(&mut self, fi: &FileInfo) { - self.meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}"), - fi.transition_status.as_bytes().to_vec(), - ); - self.meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}"), - fi.transitioned_objname.as_bytes().to_vec(), - ); - self.meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}"), - fi.transition_version_id.unwrap().as_bytes().to_vec(), - ); - self.meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}"), - fi.transition_tier.as_bytes().to_vec(), - ); - } - - pub fn remove_restore_hdrs(&mut self) { - self.meta_user.remove(X_AMZ_RESTORE.as_str()); - self.meta_user.remove(AMZ_RESTORE_EXPIRY_DAYS); - self.meta_user.remove(AMZ_RESTORE_REQUEST_DATE); - } - - pub fn uses_data_dir(&self) -> bool { - if let Some(status) = self - .meta_sys - .get(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}")) - && *status == TRANSITION_COMPLETE.as_bytes().to_vec() - { - return false; - } - - is_restored_object_on_disk(&self.meta_user) - } - - pub fn inlinedata(&self) -> bool { - self.meta_sys - .contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}inline-data").as_str()) - } - - pub fn reset_inline_data(&mut self) { - self.meta_sys - .remove(format!("{RESERVED_METADATA_PREFIX_LOWER}inline-data").as_str()); - } - - /// Remove restore headers - pub fn remove_restore_headers(&mut self) { - // Remove any restore-related metadata - self.meta_sys.retain(|k, _| !k.starts_with("X-Amz-Restore")); - } - - /// Get object signature - pub fn get_signature(&self) -> [u8; 4] { - let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); - hasher.update(self.version_id.unwrap_or_default().as_bytes()); - if let Some(mod_time) = self.mod_time { - hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); - } - hasher.update(&self.size.to_le_bytes()); - let hash = hasher.finish(); - let bytes = hash.to_le_bytes(); - [bytes[0], bytes[1], bytes[2], bytes[3]] - } - - pub fn init_free_version(&self, fi: &FileInfo) -> (FileMetaVersion, bool) { - if fi.skip_tier_free_version() { - return (FileMetaVersion::default(), false); - } - if let Some(status) = self - .meta_sys - .get(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}")) - && *status == TRANSITION_COMPLETE.as_bytes().to_vec() - { - let vid = Uuid::parse_str(&fi.tier_free_version_id()); - if let Err(err) = vid { - panic!("Invalid Tier Object delete marker versionId {} {}", fi.tier_free_version_id(), err); - } - let vid = vid.unwrap(); - let mut free_entry = FileMetaVersion { - version_type: VersionType::Delete, - write_version: 0, - ..Default::default() - }; - free_entry.delete_marker = Some(MetaDeleteMarker { - version_id: Some(vid), - mod_time: self.mod_time, - meta_sys: HashMap::>::new(), - }); - - let delete_marker = free_entry.delete_marker.as_mut().unwrap(); - - delete_marker - .meta_sys - .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}{FREE_VERSION}"), vec![]); - - let tier_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}"); - let tier_obj_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}"); - let tier_obj_vid_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}"); - - let aa = [tier_key, tier_obj_key, tier_obj_vid_key]; - for (k, v) in &self.meta_sys { - if aa.contains(k) { - delete_marker.meta_sys.insert(k.clone(), v.clone()); - } - } - return (free_entry, true); - } - (FileMetaVersion::default(), false) - } -} - -impl From for MetaObject { - fn from(value: FileInfo) -> Self { - let part_etags = if !value.parts.is_empty() { - value.parts.iter().map(|v| v.etag.clone()).collect() - } else { - vec![] - }; - - let part_indices = if !value.parts.is_empty() { - value.parts.iter().map(|v| v.index.clone().unwrap_or_default()).collect() - } else { - vec![] - }; - - let mut meta_sys = HashMap::new(); - let mut meta_user = HashMap::new(); - for (k, v) in value.metadata.iter() { - if k.len() > RESERVED_METADATA_PREFIX.len() - && (k.starts_with(RESERVED_METADATA_PREFIX) || k.starts_with(RESERVED_METADATA_PREFIX_LOWER)) - { - if k == headers::X_RUSTFS_HEALING || k == headers::X_RUSTFS_DATA_MOV { - continue; - } - - meta_sys.insert(k.to_owned(), v.as_bytes().to_vec()); - } else { - meta_user.insert(k.to_owned(), v.to_owned()); - } - } - - if !value.transition_status.is_empty() { - meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}"), - value.transition_status.as_bytes().to_vec(), - ); - } - - if !value.transitioned_objname.is_empty() { - meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}"), - value.transitioned_objname.as_bytes().to_vec(), - ); - } - - if let Some(vid) = &value.transition_version_id { - meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}"), - vid.as_bytes().to_vec(), - ); - } - - if !value.transition_tier.is_empty() { - meta_sys.insert( - format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}"), - value.transition_tier.as_bytes().to_vec(), - ); - } - - if let Some(content_hash) = value.checksum { - meta_sys.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}crc"), content_hash.to_vec()); - } - - Self { - version_id: value.version_id, - data_dir: value.data_dir, - size: value.size, - mod_time: value.mod_time, - erasure_algorithm: ErasureAlgo::ReedSolomon, - erasure_m: value.erasure.data_blocks, - erasure_n: value.erasure.parity_blocks, - erasure_block_size: value.erasure.block_size, - erasure_index: value.erasure.index, - erasure_dist: value.erasure.distribution.iter().map(|x| *x as u8).collect(), - bitrot_checksum_algo: ChecksumAlgo::HighwayHash, - part_numbers: value.parts.iter().map(|v| v.number).collect(), - part_etags, - part_sizes: value.parts.iter().map(|v| v.size).collect(), - part_actual_sizes: value.parts.iter().map(|v| v.actual_size).collect(), - part_indices, - meta_sys, - meta_user, - } - } -} - -fn get_internal_replication_state(metadata: &HashMap) -> Option { - let mut rs = ReplicationState::default(); - let mut has = false; - - for (k, v) in metadata.iter() { - if k == VERSION_PURGE_STATUS_KEY { - rs.version_purge_status_internal = Some(v.clone()); - rs.purge_targets = version_purge_statuses_map(v.as_str()); - has = true; - continue; - } - - if let Some(sub_key) = k.strip_prefix(RESERVED_METADATA_PREFIX_LOWER) { - match sub_key { - "replica-timestamp" => { - has = true; - rs.replica_timestamp = Some(OffsetDateTime::parse(v, &Rfc3339).unwrap_or(OffsetDateTime::UNIX_EPOCH)); - } - "replica-status" => { - has = true; - rs.replica_status = ReplicationStatusType::from(v.as_str()); - } - "replication-timestamp" => { - has = true; - rs.replication_timestamp = Some(OffsetDateTime::parse(v, &Rfc3339).unwrap_or(OffsetDateTime::UNIX_EPOCH)) - } - "replication-status" => { - has = true; - rs.replication_status_internal = Some(v.clone()); - rs.targets = replication_statuses_map(v.as_str()); - } - _ => { - if let Some(arn) = sub_key.strip_prefix("replication-reset-") { - has = true; - rs.reset_statuses_map.insert(arn.to_string(), v.clone()); - } - } - } - } - } - - if has { Some(rs) } else { None } -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] -pub struct MetaDeleteMarker { - #[serde(rename = "ID")] - pub version_id: Option, // Version ID for delete marker - #[serde(rename = "MTime")] - pub mod_time: Option, // Object delete marker modified time - #[serde(rename = "MetaSys")] - pub meta_sys: HashMap>, // Delete marker internal metadata -} - -impl MetaDeleteMarker { - pub fn free_version(&self) -> bool { - self.meta_sys - .contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}{FREE_VERSION}").as_str()) - } - - pub fn into_fileinfo(&self, volume: &str, path: &str, _all_parts: bool) -> FileInfo { - let metadata = self - .meta_sys - .clone() - .into_iter() - .map(|(k, v)| (k, String::from_utf8_lossy(&v).to_string())) - .collect(); - let replication_state_internal = get_internal_replication_state(&metadata); - - let mut fi = FileInfo { - version_id: self.version_id.filter(|&vid| !vid.is_nil()), - name: path.to_string(), - volume: volume.to_string(), - deleted: true, - mod_time: self.mod_time, - metadata, - replication_state_internal, - ..Default::default() - }; - - if self.free_version() { - fi.set_tier_free_version(); - fi.transition_tier = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}").as_str()) - .map(|v| String::from_utf8_lossy(v).to_string()) - .unwrap_or_default(); - - fi.transitioned_objname = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}").as_str()) - .map(|v| String::from_utf8_lossy(v).to_string()) - .unwrap_or_default(); - - fi.transition_version_id = self - .meta_sys - .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}").as_str()) - .map(|v| Uuid::from_slice(v.as_slice()).unwrap_or_default()); - } - - fi - } - - pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { - let ret: Self = rmp_serde::from_slice(buf)?; - - *self = ret; - - Ok(buf.len() as u64) - - // let mut cur = Cursor::new(buf); - - // let mut fields_len = rmp::decode::read_map_len(&mut cur)?; - - // while fields_len > 0 { - // fields_len -= 1; - - // let str_len = rmp::decode::read_str_len(&mut cur)?; - - // // !!! Vec::with_capacity(str_len) fails, vec! works normally - // let mut field_buff = vec![0u8; str_len as usize]; - - // cur.read_exact(&mut field_buff)?; - - // let field = String::from_utf8(field_buff)?; - - // match field.as_str() { - // "ID" => { - // rmp::decode::read_bin_len(&mut cur)?; - // let mut buf = [0u8; 16]; - // cur.read_exact(&mut buf)?; - // self.version_id = { - // let id = Uuid::from_bytes(buf); - // if id.is_nil() { None } else { Some(id) } - // }; - // } - - // "MTime" => { - // let unix: i64 = rmp::decode::read_int(&mut cur)?; - // let time = OffsetDateTime::from_unix_timestamp(unix)?; - // if time == OffsetDateTime::UNIX_EPOCH { - // self.mod_time = None; - // } else { - // self.mod_time = Some(time); - // } - // } - // "MetaSys" => { - // let l = rmp::decode::read_map_len(&mut cur)?; - // let mut map = HashMap::new(); - // for _ in 0..l { - // let str_len = rmp::decode::read_str_len(&mut cur)?; - // let mut field_buff = vec![0u8; str_len as usize]; - // cur.read_exact(&mut field_buff)?; - // let key = String::from_utf8(field_buff)?; - - // let blen = rmp::decode::read_bin_len(&mut cur)?; - // let mut val = vec![0u8; blen as usize]; - // cur.read_exact(&mut val)?; - - // map.insert(key, val); - // } - - // self.meta_sys = Some(map); - // } - // name => return Err(Error::other(format!("not support field name {name}"))), - // } - // } - - // Ok(cur.position()) - } - - pub fn marshal_msg(&self) -> Result> { - let buf = rmp_serde::to_vec(self)?; - Ok(buf) - - // let mut len: u32 = 3; - // let mut mask: u8 = 0; - - // if self.meta_sys.is_none() { - // len -= 1; - // mask |= 0x4; - // } - - // let mut wr = Vec::new(); - - // // Field count - // rmp::encode::write_map_len(&mut wr, len)?; - - // // string "ID" - // rmp::encode::write_str(&mut wr, "ID")?; - // rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?; - - // // string "MTime" - // rmp::encode::write_str(&mut wr, "MTime")?; - // rmp::encode::write_uint( - // &mut wr, - // self.mod_time - // .unwrap_or(OffsetDateTime::UNIX_EPOCH) - // .unix_timestamp() - // .try_into() - // .unwrap(), - // )?; - - // if (mask & 0x4) == 0 { - // let metas = self.meta_sys.as_ref().unwrap(); - // rmp::encode::write_map_len(&mut wr, metas.len() as u32)?; - // for (k, v) in metas { - // rmp::encode::write_str(&mut wr, k.as_str())?; - // rmp::encode::write_bin(&mut wr, v)?; - // } - // } - - // Ok(wr) - } - - /// Get delete marker signature - pub fn get_signature(&self) -> [u8; 4] { - let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); - hasher.update(self.version_id.unwrap_or_default().as_bytes()); - if let Some(mod_time) = self.mod_time { - hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); - } - let hash = hasher.finish(); - let bytes = hash.to_le_bytes(); - [bytes[0], bytes[1], bytes[2], bytes[3]] - } -} - -impl From for MetaDeleteMarker { - fn from(value: FileInfo) -> Self { - Self { - version_id: value.version_id, - mod_time: value.mod_time, - meta_sys: HashMap::new(), - } - } -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Default, Clone, PartialOrd, Ord, Hash)] -pub enum VersionType { - #[default] - Invalid = 0, - Object = 1, - Delete = 2, - Legacy = 3, -} - -impl VersionType { - pub fn valid(&self) -> bool { - matches!(*self, VersionType::Object | VersionType::Delete | VersionType::Legacy) - } - - pub fn to_u8(&self) -> u8 { - match self { - VersionType::Invalid => 0, - VersionType::Object => 1, - VersionType::Delete => 2, - VersionType::Legacy => 3, - } - } - - pub fn from_u8(n: u8) -> Self { - match n { - 1 => VersionType::Object, - 2 => VersionType::Delete, - 3 => VersionType::Legacy, - _ => VersionType::Invalid, - } - } -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Default, Clone)] -pub enum ChecksumAlgo { - #[default] - Invalid = 0, - HighwayHash = 1, -} - -impl ChecksumAlgo { - pub fn valid(&self) -> bool { - *self > ChecksumAlgo::Invalid - } - pub fn to_u8(&self) -> u8 { - match self { - ChecksumAlgo::Invalid => 0, - ChecksumAlgo::HighwayHash => 1, - } - } - pub fn from_u8(u: u8) -> Self { - match u { - 1 => ChecksumAlgo::HighwayHash, - _ => ChecksumAlgo::Invalid, - } - } -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Default, Clone)] -pub enum Flags { - #[default] - FreeVersion = 1 << 0, - UsesDataDir = 1 << 1, - InlineData = 1 << 2, -} - -// mergeXLV2Versions -pub fn merge_file_meta_versions( - mut quorum: usize, - mut strict: bool, - requested_versions: usize, - versions: &[Vec], -) -> Vec { - if quorum == 0 { - quorum = 1; - } - - if versions.len() < quorum || versions.is_empty() { - return Vec::new(); - } - - if versions.len() == 1 { - return versions[0].clone(); - } - - if quorum == 1 { - strict = true; - } - - let mut versions = versions.to_owned(); - - let mut n_versions = 0; - - let mut merged = Vec::new(); - loop { - let mut tops = Vec::new(); - let mut top_sig = FileMetaVersionHeader::default(); - let mut consistent = true; - for vers in versions.iter() { - if vers.is_empty() { - consistent = false; - continue; - } - if tops.is_empty() { - consistent = true; - top_sig = vers[0].header.clone(); - } else { - consistent = consistent && vers[0].header == top_sig; - } - tops.push(vers[0].clone()); - } - - // check if done... - if tops.len() < quorum { - break; - } - - let mut latest = FileMetaShallowVersion::default(); - if consistent { - merged.push(tops[0].clone()); - if tops[0].header.free_version() { - n_versions += 1; - } - } else { - let mut latest_count = 0; - for (i, ver) in tops.iter().enumerate() { - if ver.header == latest.header { - latest_count += 1; - continue; - } - - if i == 0 || ver.header.sorts_before(&latest.header) { - if i == 0 || latest_count == 0 { - latest_count = 1; - } else if !strict && ver.header.matches_not_strict(&latest.header) { - latest_count += 1; - } else { - latest_count = 1; - } - latest = ver.clone(); - continue; - } - - // Mismatch, but older. - if latest_count > 0 && !strict && ver.header.matches_not_strict(&latest.header) { - latest_count += 1; - continue; - } - - if latest_count > 0 && ver.header.version_id == latest.header.version_id { - let mut x: HashMap = HashMap::new(); - for a in tops.iter() { - if a.header.version_id != ver.header.version_id { - continue; - } - let mut a_clone = a.clone(); - if !strict { - a_clone.header.signature = [0; 4]; - } - *x.entry(a_clone.header).or_insert(1) += 1; - } - latest_count = 0; - for (k, v) in x.iter() { - if *v < latest_count { - continue; - } - if *v == latest_count && latest.header.sorts_before(k) { - continue; - } - tops.iter().for_each(|a| { - let mut hdr = a.header.clone(); - if !strict { - hdr.signature = [0; 4]; - } - if hdr == *k { - latest = a.clone(); - } - }); - - latest_count = *v; - } - break; - } - } - if latest_count >= quorum { - if !latest.header.free_version() { - n_versions += 1; - } - merged.push(latest.clone()); - } - } - - // Remove from all streams up until latest modtime or if selected. - versions.iter_mut().for_each(|vers| { - // // Keep top entry (and remaining)... - let mut bre = false; - vers.retain(|ver| { - if bre { - return true; - } - if let Ordering::Greater = ver.header.mod_time.cmp(&latest.header.mod_time) { - bre = true; - return false; - } - if ver.header == latest.header { - bre = true; - return false; - } - if let Ordering::Equal = latest.header.version_id.cmp(&ver.header.version_id) { - bre = true; - return false; - } - for merged_v in merged.iter() { - if let Ordering::Equal = ver.header.version_id.cmp(&merged_v.header.version_id) { - bre = true; - return false; - } - } - true - }); - }); - if requested_versions > 0 && requested_versions == n_versions { - merged.append(&mut versions[0]); - break; - } - } - - // Sanity check. Enable if duplicates show up. - // todo - merged -} - -pub fn file_info_from_raw( - ri: RawFileInfo, - bucket: &str, - object: &str, - read_data: bool, - include_free_versions: bool, -) -> Result { - get_file_info( - &ri.buf, - bucket, - object, - "", - FileInfoOpts { - data: read_data, - include_free_versions, - }, - ) -} - -pub struct FileInfoOpts { - pub data: bool, - pub include_free_versions: bool, -} - -pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result { - let vid = { - if version_id.is_empty() { - None - } else { - Some(Uuid::parse_str(version_id)?) - } - }; - - let meta = FileMeta::load(buf)?; - if meta.versions.is_empty() { - return Ok(FileInfo { - volume: volume.to_owned(), - name: path.to_owned(), - version_id: vid, - is_latest: true, - deleted: true, - mod_time: Some(OffsetDateTime::from_unix_timestamp(1)?), - ..Default::default() - }); - } - - let fi = meta.into_fileinfo(volume, path, version_id, opts.data, opts.include_free_versions, true)?; - Ok(fi) -} - -async fn read_more( - reader: &mut R, - buf: &mut Vec, - total_size: usize, - read_size: usize, - has_full: bool, -) -> Result<()> { - use tokio::io::AsyncReadExt; - let has = buf.len(); - - if has >= read_size { - return Ok(()); - } - - if has_full || read_size > total_size { - return Err(Error::other(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Unexpected EOF"))); - } - - let extra = read_size - has; - if buf.capacity() >= read_size { - // Extend the buffer if we have enough space. - buf.resize(read_size, 0); - } else { - buf.extend(vec![0u8; extra]); - } - - reader.read_exact(&mut buf[has..]).await?; - Ok(()) -} - -pub async fn read_xl_meta_no_data(reader: &mut R, size: usize) -> Result> { - use tokio::io::AsyncReadExt; - - let mut initial = size; - let mut has_full = true; - - if initial > META_DATA_READ_DEFAULT { - initial = META_DATA_READ_DEFAULT; - has_full = false; - } - - let mut buf = vec![0u8; initial]; - reader.read_exact(&mut buf).await?; - - let (tmp_buf, major, minor) = FileMeta::check_xl2_v1(&buf)?; - - match major { - 1 => match minor { - 0 => { - read_more(reader, &mut buf, size, size, has_full).await?; - Ok(buf) - } - 1..=3 => { - let (sz, tmp_buf) = FileMeta::read_bytes_header(tmp_buf)?; - let mut want = sz as usize + (buf.len() - tmp_buf.len()); - - if minor < 2 { - read_more(reader, &mut buf, size, want, has_full).await?; - buf.truncate(want); - return Ok(buf); - } - - let want_max = usize::min(want + MSGP_UINT32_SIZE, size); - read_more(reader, &mut buf, size, want_max, has_full).await?; - - if buf.len() < want { - return Err(Error::FileCorrupt); - } - - let tmp = &buf[want..]; - let crc_size = 5; - let other_size = tmp.len() - crc_size; - - want += tmp.len() - other_size; - - buf.truncate(want); - Ok(buf) - } - _ => Err(Error::other(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Unknown minor metadata version", - ))), - }, - _ => Err(Error::other(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Unknown major metadata version", - ))), - } -} #[cfg(test)] mod test { use super::*; @@ -3607,100 +1708,3 @@ async fn test_read_xl_meta_no_data() { assert_eq!(fm, newfm) } - -#[derive(Debug, Default, Clone)] -pub struct VersionStats { - pub total_versions: usize, - pub object_versions: usize, - pub delete_markers: usize, - pub invalid_versions: usize, - pub free_versions: usize, -} - -impl FileMetaVersionHeader { - // ... existing code ... - - pub fn is_valid(&self) -> bool { - // Check if version type is valid - if !self.version_type.valid() { - return false; - } - - // Check if modification time is reasonable (not too far in the future) - if let Some(mod_time) = self.mod_time { - let now = OffsetDateTime::now_utc(); - let future_limit = now + time::Duration::hours(24); // Allow 24 hours in future - if mod_time > future_limit { - return false; - } - } - - // Check erasure coding parameters - if self.has_ec() && (self.ec_n == 0 || self.ec_m == 0 || self.ec_m < self.ec_n) { - return false; - } - - true - } - - // ... existing code ... -} - -/// Enhanced version statistics with more detailed information -#[derive(Debug, Default, Clone)] -pub struct DetailedVersionStats { - pub total_versions: usize, - pub object_versions: usize, - pub delete_markers: usize, - pub invalid_versions: usize, - pub legacy_versions: usize, - pub free_versions: usize, - pub versions_with_data_dir: usize, - pub versions_with_inline_data: usize, - pub total_size: i64, - pub latest_mod_time: Option, -} - -impl FileMeta { - /// Get detailed statistics about versions - pub fn get_detailed_version_stats(&self) -> DetailedVersionStats { - let mut stats = DetailedVersionStats { - total_versions: self.versions.len(), - ..Default::default() - }; - - for version in &self.versions { - match version.header.version_type { - VersionType::Object => { - stats.object_versions += 1; - if let Ok(ver) = FileMetaVersion::try_from(version.meta.as_slice()) - && let Some(obj) = &ver.object - { - stats.total_size += obj.size; - if obj.uses_data_dir() { - stats.versions_with_data_dir += 1; - } - if obj.inlinedata() { - stats.versions_with_inline_data += 1; - } - } - } - VersionType::Delete => stats.delete_markers += 1, - VersionType::Legacy => stats.legacy_versions += 1, - VersionType::Invalid => stats.invalid_versions += 1, - } - - if version.header.free_version() { - stats.free_versions += 1; - } - - if stats.latest_mod_time.is_none() - || (version.header.mod_time.is_some() && version.header.mod_time > stats.latest_mod_time) - { - stats.latest_mod_time = version.header.mod_time; - } - } - - stats - } -} diff --git a/crates/filemeta/src/filemeta/codec.rs b/crates/filemeta/src/filemeta/codec.rs new file mode 100644 index 00000000..610f95ac --- /dev/null +++ b/crates/filemeta/src/filemeta/codec.rs @@ -0,0 +1,342 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +impl FileMeta { + pub fn is_xl2_v1_format(buf: &[u8]) -> bool { + !matches!(Self::check_xl2_v1(buf), Err(_e)) + } + + pub fn load(buf: &[u8]) -> Result { + let mut xl = FileMeta::default(); + xl.unmarshal_msg(buf)?; + + Ok(xl) + } + + pub fn check_xl2_v1(buf: &[u8]) -> Result<(&[u8], u16, u16)> { + if buf.len() < 8 { + return Err(Error::other("xl file header not exists")); + } + + if buf[0..4] != XL_FILE_HEADER { + return Err(Error::other("xl file header err")); + } + + let major = byteorder::LittleEndian::read_u16(&buf[4..6]); + let minor = byteorder::LittleEndian::read_u16(&buf[6..8]); + if major > XL_FILE_VERSION_MAJOR { + return Err(Error::other("xl file version err")); + } + + Ok((&buf[8..], major, minor)) + } + + // Returns (meta, inline_data) + pub fn is_indexed_meta(buf: &[u8]) -> Result<(&[u8], &[u8])> { + let (buf, major, minor) = Self::check_xl2_v1(buf)?; + if major != 1 || minor < 3 { + return Ok((&[], &[])); + } + + if buf.len() < 5 { + return Err(Error::other("insufficient data for meta length")); + } + + let (mut size_buf, buf) = buf.split_at(5); + + // Get meta data, buf = crc + data + let bin_len = rmp::decode::read_bin_len(&mut size_buf)?; + + if buf.len() < bin_len as usize { + return Ok((&[], &[])); + } + let (meta, buf) = buf.split_at(bin_len as usize); + + if buf.len() < 5 { + return Err(Error::other("insufficient data for CRC")); + } + let (mut crc_buf, inline_data) = buf.split_at(5); + + // crc check + let crc = rmp::decode::read_u32(&mut crc_buf)?; + let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32; + + if crc != meta_crc { + return Err(Error::other("xl file crc check failed")); + } + + Ok((meta, inline_data)) + } + + // Fixed u32 + pub fn read_bytes_header(buf: &[u8]) -> Result<(u32, &[u8])> { + if buf.len() < 5 { + return Err(Error::other("insufficient data for bytes header")); + } + + let (mut size_buf, remaining) = buf.split_at(5); + + // Get meta data, buf = crc + data + let bin_len = rmp::decode::read_bin_len(&mut size_buf)?; + + Ok((bin_len, remaining)) + } + + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { + let i = buf.len() as u64; + + // check version, buf = buf[8..] + let (buf, _, _) = Self::check_xl2_v1(buf).map_err(|e| { + error!("failed to check XL2 v1 format: {}", e); + e + })?; + + if buf.len() < 5 { + error!( + "insufficient data for metadata length prefix: expected at least 5 bytes, got {}", + buf.len() + ); + return Err(Error::other("insufficient data for metadata length prefix")); + } + + let (mut size_buf, buf) = buf.split_at(5); + + // Get meta data, buf = crc + data + let bin_len = rmp::decode::read_bin_len(&mut size_buf).map_err(|e| { + error!("failed to read binary length for metadata: {}", e); + Error::other(format!("failed to read binary length for metadata: {e}")) + })?; + + if buf.len() < bin_len as usize { + error!("insufficient data for metadata: expected {} bytes, got {} bytes", bin_len, buf.len()); + return Err(Error::other("insufficient data for metadata")); + } + let (meta, buf) = buf.split_at(bin_len as usize); + + if buf.len() < 5 { + error!("insufficient data for CRC: expected 5 bytes, got {} bytes", buf.len()); + return Err(Error::other("insufficient data for CRC")); + } + let (mut crc_buf, buf) = buf.split_at(5); + + // crc check + let crc = rmp::decode::read_u32(&mut crc_buf).map_err(|e| { + error!("failed to read CRC value: {}", e); + Error::other(format!("failed to read CRC value: {e}")) + })?; + let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32; + + if crc != meta_crc { + error!("xl file crc check failed: expected CRC {:#x}, got {:#x}", meta_crc, crc); + return Err(Error::other("xl file crc check failed")); + } + + if !buf.is_empty() { + self.data.update(buf); + self.data.validate().map_err(|e| { + error!("data validation failed: {}", e); + e + })?; + } + + // Parse meta + if !meta.is_empty() { + let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta).map_err(|e| { + error!("failed to decode XL headers: {}", e); + e + })?; + + // let (_, meta) = meta.split_at(read_size as usize); + + self.meta_ver = meta_ver; + + self.versions = Vec::with_capacity(versions_len); + + let mut cur: Cursor<&[u8]> = Cursor::new(meta); + for _ in 0..versions_len { + let bin_len = rmp::decode::read_bin_len(&mut cur).map_err(|e| { + error!("failed to read binary length for version header: {}", e); + Error::other(format!("failed to read binary length for version header: {e}")) + })? as usize; + + let mut header_buf = vec![0u8; bin_len]; + + cur.read_exact(&mut header_buf)?; + + let mut ver = FileMetaShallowVersion::default(); + ver.header.unmarshal_msg(&header_buf).map_err(|e| { + error!("failed to unmarshal version header: {}", e); + e + })?; + + let bin_len = rmp::decode::read_bin_len(&mut cur).map_err(|e| { + error!("failed to read binary length for version metadata: {}", e); + Error::other(format!("failed to read binary length for version metadata: {e}")) + })? as usize; + + let mut ver_meta_buf = vec![0u8; bin_len]; + cur.read_exact(&mut ver_meta_buf)?; + + ver.meta.extend_from_slice(&ver_meta_buf); + + self.versions.push(ver); + } + } + + Ok(i) + } + + // decode_xl_headers parses meta header, returns (versions count, xl_header_version, xl_meta_version, read data length) + fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> { + let mut cur = Cursor::new(buf); + + let header_ver: u8 = rmp::decode::read_int(&mut cur)?; + + if header_ver > XL_HEADER_VERSION { + return Err(Error::other("xl header version invalid")); + } + + let meta_ver: u8 = rmp::decode::read_int(&mut cur)?; + if meta_ver > XL_META_VERSION { + return Err(Error::other("xl meta version invalid")); + } + + let versions_len: usize = rmp::decode::read_int(&mut cur)?; + + Ok((versions_len, header_ver, meta_ver, &buf[cur.position() as usize..])) + } + + fn decode_versions Result<()>>(buf: &[u8], versions: usize, mut fnc: F) -> Result<()> { + let mut cur: Cursor<&[u8]> = Cursor::new(buf); + + for i in 0..versions { + let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize; + let start = cur.position() as usize; + let end = start + bin_len; + let header_buf = &buf[start..end]; + + cur.set_position(end as u64); + + let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize; + let start = cur.position() as usize; + let end = start + bin_len; + let ver_meta_buf = &buf[start..end]; + + cur.set_position(end as u64); + + if let Err(err) = fnc(i, header_buf, ver_meta_buf) { + if err == Error::DoneForNow { + return Ok(()); + } + + return Err(err); + } + } + + Ok(()) + } + + pub fn is_latest_delete_marker(buf: &[u8]) -> bool { + let header = Self::decode_xl_headers(buf).ok(); + if let Some((versions, _hdr_v, _meta_v, meta)) = header { + if versions == 0 { + return false; + } + + let mut is_delete_marker = false; + + let _ = Self::decode_versions(meta, versions, |_: usize, hdr: &[u8], _: &[u8]| { + let mut header = FileMetaVersionHeader::default(); + if header.unmarshal_msg(hdr).is_err() { + return Err(Error::DoneForNow); + } + + is_delete_marker = header.version_type == VersionType::Delete; + + Err(Error::DoneForNow) + }); + + is_delete_marker + } else { + false + } + } + + pub fn marshal_msg(&self) -> Result> { + let mut wr = Vec::new(); + + // header + wr.write_all(XL_FILE_HEADER.as_slice())?; + + let mut major = [0u8; 2]; + byteorder::LittleEndian::write_u16(&mut major, XL_FILE_VERSION_MAJOR); + wr.write_all(major.as_slice())?; + + let mut minor = [0u8; 2]; + byteorder::LittleEndian::write_u16(&mut minor, XL_FILE_VERSION_MINOR); + wr.write_all(minor.as_slice())?; + + // size bin32 reserved for write_bin_len + wr.write_all(&[0xc6, 0, 0, 0, 0])?; + + let offset = wr.len(); + + // xl header + rmp::encode::write_uint8(&mut wr, XL_HEADER_VERSION)?; + rmp::encode::write_uint8(&mut wr, XL_META_VERSION)?; + + // versions + rmp::encode::write_sint(&mut wr, self.versions.len() as i64)?; + + for ver in self.versions.iter() { + let hmsg = ver.header.marshal_msg()?; + rmp::encode::write_bin(&mut wr, &hmsg)?; + + rmp::encode::write_bin(&mut wr, &ver.meta)?; + } + + // Update bin length + let data_len = wr.len() - offset; + byteorder::BigEndian::write_u32(&mut wr[offset - 4..offset], data_len as u32); + + let crc = xxh64::xxh64(&wr[offset..], XXHASH_SEED) as u32; + let mut crc_buf = [0u8; 5]; + crc_buf[0] = 0xce; // u32 + byteorder::BigEndian::write_u32(&mut crc_buf[1..], crc); + + wr.write_all(&crc_buf)?; + + wr.write_all(self.data.as_slice())?; + + Ok(wr) + } + + // pub fn unmarshal(buf: &[u8]) -> Result { + // let mut s = Self::default(); + // s.unmarshal_msg(buf)?; + // Ok(s) + // // let t: FileMeta = rmp_serde::from_slice(buf)?; + // // Ok(t) + // } + + // pub fn marshal_msg(&self) -> Result> { + // let mut buf = Vec::new(); + + // self.serialize(&mut Serializer::new(&mut buf))?; + + // Ok(buf) + // } +} diff --git a/crates/filemeta/src/filemeta/inline_data.rs b/crates/filemeta/src/filemeta/inline_data.rs new file mode 100644 index 00000000..11ff6ccb --- /dev/null +++ b/crates/filemeta/src/filemeta/inline_data.rs @@ -0,0 +1,62 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +impl FileMeta { + pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { + let vid = vid.unwrap_or_default(); + self.versions + .iter() + .filter(|v| { + v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.uses_data_dir() + }) + .map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default()) + .filter(|v| v == data_dir) + .count() + } + + pub fn get_data_dirs(&self) -> Result>> { + let mut data_dirs = Vec::new(); + for version in &self.versions { + if version.header.version_type == VersionType::Object { + let ver = FileMetaVersion::try_from(version.meta.as_slice())?; + data_dirs.push(ver.get_data_dir()); + } + } + Ok(data_dirs) + } + + /// Count shared data directories + pub fn shared_data_dir_count(&self, version_id: Option, data_dir: Option) -> usize { + let version_id = version_id.unwrap_or_default(); + + if self.data.entries().unwrap_or_default() > 0 + && self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some() + { + return 0; + } + + self.versions + .iter() + .filter(|v| { + v.header.version_type == VersionType::Object + && v.header.version_id != Some(version_id) + && v.header.uses_data_dir() + }) + .filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok()) + .filter(|&dir| dir.is_some() && dir == data_dir) + .count() + } +} diff --git a/crates/filemeta/src/filemeta/validation.rs b/crates/filemeta/src/filemeta/validation.rs new file mode 100644 index 00000000..61f1a478 --- /dev/null +++ b/crates/filemeta/src/filemeta/validation.rs @@ -0,0 +1,181 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +impl FileMeta { + /// Check if the metadata format is compatible + pub fn is_compatible_with_meta(&self) -> bool { + // Check version compatibility + if self.meta_ver != XL_META_VERSION { + return false; + } + + // For compatibility, we allow versions with different types + // Just check basic structure validity + true + } + + /// Validate metadata integrity + pub fn validate_integrity(&self) -> Result<()> { + // Check if versions are sorted by modification time + if !self.is_sorted_by_mod_time() { + return Err(Error::other("versions not sorted by modification time")); + } + + // Validate inline data if present + self.data.validate()?; + + Ok(()) + } + + /// Check if versions are sorted by modification time (newest first) + pub(crate) fn is_sorted_by_mod_time(&self) -> bool { + if self.versions.len() <= 1 { + return true; + } + + for i in 1..self.versions.len() { + let prev_time = self.versions[i - 1].header.mod_time; + let curr_time = self.versions[i].header.mod_time; + + match (prev_time, curr_time) { + (Some(prev), Some(curr)) => { + if prev < curr { + return false; + } + } + (None, Some(_)) => return false, + _ => continue, + } + } + + true + } + + /// Get statistics about versions + pub fn get_version_stats(&self) -> VersionStats { + let mut stats = VersionStats { + total_versions: self.versions.len(), + ..Default::default() + }; + + for version in &self.versions { + match version.header.version_type { + VersionType::Object => stats.object_versions += 1, + VersionType::Delete => stats.delete_markers += 1, + VersionType::Invalid | VersionType::Legacy => stats.invalid_versions += 1, + } + + if version.header.free_version() { + stats.free_versions += 1; + } + } + + stats + } +} + +#[derive(Debug, Default, Clone)] +pub struct VersionStats { + pub total_versions: usize, + pub object_versions: usize, + pub delete_markers: usize, + pub invalid_versions: usize, + pub free_versions: usize, +} + +impl FileMetaVersionHeader { + pub fn is_valid(&self) -> bool { + // Check if version type is valid + if !self.version_type.valid() { + return false; + } + + // Check if modification time is reasonable (not too far in the future) + if let Some(mod_time) = self.mod_time { + let now = OffsetDateTime::now_utc(); + let future_limit = now + time::Duration::hours(24); // Allow 24 hours in future + if mod_time > future_limit { + return false; + } + } + + // Check erasure coding parameters + if self.has_ec() && (self.ec_n == 0 || self.ec_m == 0 || self.ec_m < self.ec_n) { + return false; + } + + true + } +} + +/// Enhanced version statistics with more detailed information +#[derive(Debug, Default, Clone)] +pub struct DetailedVersionStats { + pub total_versions: usize, + pub object_versions: usize, + pub delete_markers: usize, + pub invalid_versions: usize, + pub legacy_versions: usize, + pub free_versions: usize, + pub versions_with_data_dir: usize, + pub versions_with_inline_data: usize, + pub total_size: i64, + pub latest_mod_time: Option, +} + +impl FileMeta { + /// Get detailed statistics about versions + pub fn get_detailed_version_stats(&self) -> DetailedVersionStats { + let mut stats = DetailedVersionStats { + total_versions: self.versions.len(), + ..Default::default() + }; + + for version in &self.versions { + match version.header.version_type { + VersionType::Object => { + stats.object_versions += 1; + if let Ok(ver) = FileMetaVersion::try_from(version.meta.as_slice()) + && let Some(obj) = &ver.object + { + stats.total_size += obj.size; + if obj.uses_data_dir() { + stats.versions_with_data_dir += 1; + } + if obj.inlinedata() { + stats.versions_with_inline_data += 1; + } + } + } + VersionType::Delete => stats.delete_markers += 1, + VersionType::Legacy => stats.legacy_versions += 1, + VersionType::Invalid => stats.invalid_versions += 1, + } + + if version.header.free_version() { + stats.free_versions += 1; + } + + if stats.latest_mod_time.is_none() + || (version.header.mod_time.is_some() && version.header.mod_time > stats.latest_mod_time) + { + stats.latest_mod_time = version.header.mod_time; + } + } + + stats + } +} diff --git a/crates/filemeta/src/filemeta/version.rs b/crates/filemeta/src/filemeta/version.rs new file mode 100644 index 00000000..04850244 --- /dev/null +++ b/crates/filemeta/src/filemeta/version.rs @@ -0,0 +1,1484 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Eq, PartialOrd, Ord)] +pub struct FileMetaShallowVersion { + pub header: FileMetaVersionHeader, + pub meta: Vec, // FileMetaVersion.marshal_msg +} + +impl FileMetaShallowVersion { + pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> Result { + let file_version = FileMetaVersion::try_from(self.meta.as_slice())?; + + Ok(file_version.into_fileinfo(volume, path, all_parts)) + } +} + +impl TryFrom for FileMetaShallowVersion { + type Error = Error; + + fn try_from(value: FileMetaVersion) -> std::result::Result { + let header = value.header(); + let meta = value.marshal_msg()?; + Ok(Self { meta, header }) + } +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)] +pub struct FileMetaVersion { + #[serde(rename = "Type")] + pub version_type: VersionType, + #[serde(rename = "V2Obj")] + pub object: Option, + #[serde(rename = "DelObj")] + pub delete_marker: Option, + #[serde(rename = "v")] + pub write_version: u64, // rustfs version +} + +impl FileMetaVersion { + pub fn valid(&self) -> bool { + if !self.version_type.valid() { + return false; + } + + match self.version_type { + VersionType::Object => self + .object + .as_ref() + .map(|v| v.erasure_algorithm.valid() && v.bitrot_checksum_algo.valid() && v.mod_time.is_some()) + .unwrap_or_default(), + VersionType::Delete => self + .delete_marker + .as_ref() + .map(|v| v.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH) > OffsetDateTime::UNIX_EPOCH) + .unwrap_or_default(), + _ => false, + } + } + + pub fn get_data_dir(&self) -> Option { + if self.valid() { + { + if self.version_type == VersionType::Object { + self.object.as_ref().map(|v| v.data_dir).unwrap_or_default() + } else { + None + } + } + } else { + Default::default() + } + } + + pub fn get_version_id(&self) -> Option { + match self.version_type { + VersionType::Object => self.object.as_ref().map(|v| v.version_id).unwrap_or_default(), + VersionType::Delete => self.delete_marker.as_ref().map(|v| v.version_id).unwrap_or_default(), + _ => None, + } + } + + pub fn get_mod_time(&self) -> Option { + match self.version_type { + VersionType::Object => self.object.as_ref().map(|v| v.mod_time).unwrap_or_default(), + VersionType::Delete => self.delete_marker.as_ref().map(|v| v.mod_time).unwrap_or_default(), + _ => None, + } + } + + // decode_data_dir_from_meta reads data_dir from meta TODO: directly parse only data_dir from meta buf, msg.skip + pub fn decode_data_dir_from_meta(buf: &[u8]) -> Result> { + let mut ver = Self::default(); + ver.unmarshal_msg(buf)?; + + let data_dir = ver.object.map(|v| v.data_dir).unwrap_or_default(); + Ok(data_dir) + } + + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { + let ret: Self = rmp_serde::from_slice(buf)?; + + *self = ret; + + Ok(buf.len() as u64) + } + + pub fn marshal_msg(&self) -> Result> { + let buf = rmp_serde::to_vec(self)?; + Ok(buf) + } + + pub fn free_version(&self) -> bool { + self.version_type == VersionType::Delete && self.delete_marker.as_ref().map(|m| m.free_version()).unwrap_or_default() + } + + pub fn header(&self) -> FileMetaVersionHeader { + FileMetaVersionHeader::from(self.clone()) + } + + pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> FileInfo { + match self.version_type { + VersionType::Invalid | VersionType::Legacy => FileInfo { + name: path.to_string(), + volume: volume.to_string(), + ..Default::default() + }, + VersionType::Object => self + .object + .as_ref() + .unwrap_or(&MetaObject::default()) + .into_fileinfo(volume, path, all_parts), + VersionType::Delete => self + .delete_marker + .as_ref() + .unwrap_or(&MetaDeleteMarker::default()) + .into_fileinfo(volume, path, all_parts), + } + } + + /// Support for Legacy version type + pub fn is_legacy(&self) -> bool { + self.version_type == VersionType::Legacy + } + + /// Get signature for version + pub fn get_signature(&self) -> [u8; 4] { + match self.version_type { + VersionType::Object => { + if let Some(ref obj) = self.object { + // Calculate signature based on object metadata + let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); + hasher.update(obj.version_id.unwrap_or_default().as_bytes()); + if let Some(mod_time) = obj.mod_time { + hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); + } + let hash = hasher.finish(); + let bytes = hash.to_le_bytes(); + [bytes[0], bytes[1], bytes[2], bytes[3]] + } else { + [0; 4] + } + } + VersionType::Delete => { + if let Some(ref dm) = self.delete_marker { + // Calculate signature for delete marker + let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); + hasher.update(dm.version_id.unwrap_or_default().as_bytes()); + if let Some(mod_time) = dm.mod_time { + hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); + } + let hash = hasher.finish(); + let bytes = hash.to_le_bytes(); + [bytes[0], bytes[1], bytes[2], bytes[3]] + } else { + [0; 4] + } + } + _ => [0; 4], + } + } + + /// Check if this version uses data directory + pub fn uses_data_dir(&self) -> bool { + match self.version_type { + VersionType::Object => self.object.as_ref().map(|obj| obj.uses_data_dir()).unwrap_or(false), + _ => false, + } + } + + /// Check if this version uses inline data + pub fn uses_inline_data(&self) -> bool { + match self.version_type { + VersionType::Object => self.object.as_ref().map(|obj| obj.inlinedata()).unwrap_or(false), + _ => false, + } + } +} + +impl TryFrom<&[u8]> for FileMetaVersion { + type Error = Error; + + fn try_from(value: &[u8]) -> std::result::Result { + let mut ver = FileMetaVersion::default(); + ver.unmarshal_msg(value)?; + Ok(ver) + } +} + +impl From for FileMetaVersion { + fn from(value: FileInfo) -> Self { + { + if value.deleted { + FileMetaVersion { + version_type: VersionType::Delete, + delete_marker: Some(MetaDeleteMarker::from(value)), + object: None, + write_version: 0, + } + } else { + FileMetaVersion { + version_type: VersionType::Object, + delete_marker: None, + object: Some(MetaObject::from(value)), + write_version: 0, + } + } + } + } +} + +impl TryFrom for FileMetaVersion { + type Error = Error; + + fn try_from(value: FileMetaShallowVersion) -> std::result::Result { + FileMetaVersion::try_from(value.meta.as_slice()) + } +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)] +pub struct FileMetaVersionHeader { + pub version_id: Option, + pub mod_time: Option, + pub signature: [u8; 4], + pub version_type: VersionType, + pub flags: u8, + pub ec_n: u8, + pub ec_m: u8, +} + +impl FileMetaVersionHeader { + pub fn has_ec(&self) -> bool { + self.ec_m > 0 && self.ec_n > 0 + } + + pub fn matches_not_strict(&self, o: &FileMetaVersionHeader) -> bool { + let mut ok = self.version_id == o.version_id && self.version_type == o.version_type && self.matches_ec(o); + if self.version_id.is_none() { + ok = ok && self.mod_time == o.mod_time; + } + + ok + } + + pub fn matches_ec(&self, o: &FileMetaVersionHeader) -> bool { + if self.has_ec() && o.has_ec() { + return self.ec_n == o.ec_n && self.ec_m == o.ec_m; + } + + true + } + + pub fn free_version(&self) -> bool { + self.flags & XL_FLAG_FREE_VERSION != 0 + } + + pub fn sorts_before(&self, o: &FileMetaVersionHeader) -> bool { + if self == o { + return false; + } + + // Prefer newest modtime. + if self.mod_time != o.mod_time { + return self.mod_time > o.mod_time; + } + + match self.mod_time.cmp(&o.mod_time) { + Ordering::Greater => { + return true; + } + Ordering::Less => { + return false; + } + _ => {} + } + + // The following doesn't make too much sense, but we want sort to be consistent nonetheless. + // Prefer lower types + if self.version_type != o.version_type { + return self.version_type < o.version_type; + } + // Consistent sort on signature + match self.version_id.cmp(&o.version_id) { + Ordering::Greater => { + return true; + } + Ordering::Less => { + return false; + } + _ => {} + } + + if self.flags != o.flags { + return self.flags > o.flags; + } + + false + } + + pub fn uses_data_dir(&self) -> bool { + self.flags & Flags::UsesDataDir as u8 != 0 + } + + pub fn marshal_msg(&self) -> Result> { + let mut wr = Vec::new(); + + // array len 7 + rmp::encode::write_array_len(&mut wr, 7)?; + + // version_id + rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?; + // mod_time + rmp::encode::write_i64(&mut wr, self.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH).unix_timestamp_nanos() as i64)?; + // signature + rmp::encode::write_bin(&mut wr, self.signature.as_slice())?; + // version_type + rmp::encode::write_uint8(&mut wr, self.version_type.to_u8())?; + // flags + rmp::encode::write_uint8(&mut wr, self.flags)?; + // ec_n + rmp::encode::write_uint8(&mut wr, self.ec_n)?; + // ec_m + rmp::encode::write_uint8(&mut wr, self.ec_m)?; + + Ok(wr) + } + + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { + let mut cur = Cursor::new(buf); + let alen = rmp::decode::read_array_len(&mut cur)?; + if alen != 7 { + return Err(Error::other(format!("version header array len err need 7 got {alen}"))); + } + + // version_id + rmp::decode::read_bin_len(&mut cur)?; + let mut buf = [0u8; 16]; + cur.read_exact(&mut buf)?; + self.version_id = { + let id = Uuid::from_bytes(buf); + // if id.is_nil() { None } else { Some(id) } + Some(id) + }; + + // mod_time + let unix: i128 = rmp::decode::read_int(&mut cur)?; + + let time = OffsetDateTime::from_unix_timestamp_nanos(unix)?; + if time == OffsetDateTime::UNIX_EPOCH { + self.mod_time = None; + } else { + self.mod_time = Some(time); + } + + // signature + rmp::decode::read_bin_len(&mut cur)?; + cur.read_exact(&mut self.signature)?; + + // version_type + let typ: u8 = rmp::decode::read_int(&mut cur)?; + self.version_type = VersionType::from_u8(typ); + + // flags + self.flags = rmp::decode::read_int(&mut cur)?; + // ec_n + self.ec_n = rmp::decode::read_int(&mut cur)?; + // ec_m + self.ec_m = rmp::decode::read_int(&mut cur)?; + + Ok(cur.position()) + } + + /// Get signature for header + pub fn get_signature(&self) -> [u8; 4] { + self.signature + } + + /// Check if this header represents inline data + pub fn inline_data(&self) -> bool { + self.flags & Flags::InlineData as u8 != 0 + } + + /// Update signature based on version content + pub fn update_signature(&mut self, version: &FileMetaVersion) { + self.signature = version.get_signature(); + } +} + +impl PartialOrd for FileMetaVersionHeader { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for FileMetaVersionHeader { + fn cmp(&self, other: &Self) -> Ordering { + match self.mod_time.cmp(&other.mod_time) { + core::cmp::Ordering::Equal => {} + ord => return ord, + } + + match self.version_type.cmp(&other.version_type) { + core::cmp::Ordering::Equal => {} + ord => return ord, + } + match self.signature.cmp(&other.signature) { + core::cmp::Ordering::Equal => {} + ord => return ord, + } + match self.version_id.cmp(&other.version_id) { + core::cmp::Ordering::Equal => {} + ord => return ord, + } + self.flags.cmp(&other.flags) + } +} + +impl From for FileMetaVersionHeader { + fn from(value: FileMetaVersion) -> Self { + let flags = { + let mut f: u8 = 0; + if value.free_version() { + f |= Flags::FreeVersion as u8; + } + + if value.version_type == VersionType::Object && value.object.as_ref().map(|v| v.uses_data_dir()).unwrap_or_default() { + f |= Flags::UsesDataDir as u8; + } + + if value.version_type == VersionType::Object && value.object.as_ref().map(|v| v.inlinedata()).unwrap_or_default() { + f |= Flags::InlineData as u8; + } + + f + }; + + let (ec_n, ec_m) = match (value.version_type == VersionType::Object, value.object.as_ref()) { + (true, Some(obj)) => (obj.erasure_n as u8, obj.erasure_m as u8), + _ => (0, 0), + }; + + Self { + version_id: value.get_version_id(), + mod_time: value.get_mod_time(), + signature: [0, 0, 0, 0], + version_type: value.version_type, + flags, + ec_n, + ec_m, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +// Because of custom message_pack, field order must be guaranteed +pub struct MetaObject { + #[serde(rename = "ID")] + pub version_id: Option, // Version ID + #[serde(rename = "DDir")] + pub data_dir: Option, // Data dir ID + #[serde(rename = "EcAlgo")] + pub erasure_algorithm: ErasureAlgo, // Erasure coding algorithm + #[serde(rename = "EcM")] + pub erasure_m: usize, // Erasure data blocks + #[serde(rename = "EcN")] + pub erasure_n: usize, // Erasure parity blocks + #[serde(rename = "EcBSize")] + pub erasure_block_size: usize, // Erasure block size + #[serde(rename = "EcIndex")] + pub erasure_index: usize, // Erasure disk index + #[serde(rename = "EcDist")] + pub erasure_dist: Vec, // Erasure distribution + #[serde(rename = "CSumAlgo")] + pub bitrot_checksum_algo: ChecksumAlgo, // Bitrot checksum algo + #[serde(rename = "PartNums")] + pub part_numbers: Vec, // Part Numbers + #[serde(rename = "PartETags")] + pub part_etags: Vec, // Part ETags + #[serde(rename = "PartSizes")] + pub part_sizes: Vec, // Part Sizes + #[serde(rename = "PartASizes")] + pub part_actual_sizes: Vec, // Part ActualSizes (compression) + #[serde(rename = "PartIdx")] + pub part_indices: Vec, // Part Indexes (compression) + #[serde(rename = "Size")] + pub size: i64, // Object version size + #[serde(rename = "MTime")] + pub mod_time: Option, // Object version modified time + #[serde(rename = "MetaSys")] + pub meta_sys: HashMap>, // Object version internal metadata + #[serde(rename = "MetaUsr")] + pub meta_user: HashMap, // Object version metadata set by user +} + +impl MetaObject { + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { + let ret: Self = rmp_serde::from_slice(buf)?; + + *self = ret; + + Ok(buf.len() as u64) + } + // marshal_msg custom messagepack naming consistent with go + pub fn marshal_msg(&self) -> Result> { + let buf = rmp_serde::to_vec(self)?; + Ok(buf) + } + + pub fn into_fileinfo(&self, volume: &str, path: &str, all_parts: bool) -> FileInfo { + let version_id = self.version_id.filter(|&vid| !vid.is_nil()); + + let parts = if all_parts { + let mut parts = vec![ObjectPartInfo::default(); self.part_numbers.len()]; + + for (i, part) in parts.iter_mut().enumerate() { + part.number = self.part_numbers[i]; + part.size = self.part_sizes[i]; + part.actual_size = self.part_actual_sizes[i]; + + if self.part_etags.len() == self.part_numbers.len() { + part.etag = self.part_etags[i].clone(); + } + + if self.part_indices.len() == self.part_numbers.len() { + part.index = if self.part_indices[i].is_empty() { + None + } else { + Some(self.part_indices[i].clone()) + }; + } + } + parts + } else { + Vec::new() + }; + + let mut metadata = HashMap::with_capacity(self.meta_user.len() + self.meta_sys.len()); + for (k, v) in &self.meta_user { + if k == AMZ_META_UNENCRYPTED_CONTENT_LENGTH || k == AMZ_META_UNENCRYPTED_CONTENT_MD5 { + continue; + } + + if k == AMZ_STORAGE_CLASS && v == "STANDARD" { + continue; + } + + metadata.insert(k.to_owned(), v.to_owned()); + } + + let tier_fvidkey = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}").to_lowercase(); + let tier_fvmarker_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}").to_lowercase(); + + for (k, v) in &self.meta_sys { + let lower_k = k.to_lowercase(); + + if lower_k == tier_fvidkey || lower_k == tier_fvmarker_key { + continue; + } + + if lower_k == AMZ_STORAGE_CLASS.to_lowercase() && v == b"STANDARD" { + continue; + } + + if k.starts_with(RESERVED_METADATA_PREFIX) + || k.starts_with(RESERVED_METADATA_PREFIX_LOWER) + || lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() + { + metadata.insert(k.to_owned(), String::from_utf8(v.to_owned()).unwrap_or_default()); + } + } + + let replication_state_internal = get_internal_replication_state(&metadata); + + let mut deleted = false; + + if let Some(v) = replication_state_internal.as_ref() { + if !v.composite_version_purge_status().is_empty() { + deleted = true; + } + + let st = v.composite_replication_status(); + if !st.is_empty() { + metadata.insert(AMZ_BUCKET_REPLICATION_STATUS.to_string(), st.to_string()); + } + } + + let checksum = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}crc").as_str()) + .map(|v| Bytes::from(v.clone())); + + let erasure = ErasureInfo { + algorithm: self.erasure_algorithm.to_string(), + data_blocks: self.erasure_m, + parity_blocks: self.erasure_n, + block_size: self.erasure_block_size, + index: self.erasure_index, + distribution: self.erasure_dist.iter().map(|&v| v as usize).collect(), + ..Default::default() + }; + + let transition_status = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}").as_str()) + .map(|v| String::from_utf8_lossy(v).to_string()) + .unwrap_or_default(); + let transitioned_objname = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}").as_str()) + .map(|v| String::from_utf8_lossy(v).to_string()) + .unwrap_or_default(); + let transition_version_id = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}").as_str()) + .map(|v| Uuid::from_slice(v.as_slice()).unwrap_or_default()); + let transition_tier = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}").as_str()) + .map(|v| String::from_utf8_lossy(v).to_string()) + .unwrap_or_default(); + + FileInfo { + version_id, + erasure, + data_dir: self.data_dir, + mod_time: self.mod_time, + size: self.size, + name: path.to_string(), + volume: volume.to_string(), + parts, + metadata, + replication_state_internal, + deleted, + checksum, + transition_status, + transitioned_objname, + transition_version_id, + transition_tier, + ..Default::default() + } + } + + pub fn set_transition(&mut self, fi: &FileInfo) { + self.meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}"), + fi.transition_status.as_bytes().to_vec(), + ); + self.meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}"), + fi.transitioned_objname.as_bytes().to_vec(), + ); + if let Some(transition_version_id) = fi.transition_version_id.as_ref() { + self.meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}"), + transition_version_id.as_bytes().to_vec(), + ); + } + self.meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}"), + fi.transition_tier.as_bytes().to_vec(), + ); + } + + pub fn remove_restore_hdrs(&mut self) { + self.meta_user.remove(X_AMZ_RESTORE.as_str()); + self.meta_user.remove(AMZ_RESTORE_EXPIRY_DAYS); + self.meta_user.remove(AMZ_RESTORE_REQUEST_DATE); + } + + pub fn uses_data_dir(&self) -> bool { + if let Some(status) = self + .meta_sys + .get(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}")) + && *status == TRANSITION_COMPLETE.as_bytes().to_vec() + { + return false; + } + + is_restored_object_on_disk(&self.meta_user) + } + + pub fn inlinedata(&self) -> bool { + self.meta_sys + .contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}inline-data").as_str()) + } + + pub fn reset_inline_data(&mut self) { + self.meta_sys + .remove(format!("{RESERVED_METADATA_PREFIX_LOWER}inline-data").as_str()); + } + + /// Remove restore headers + pub fn remove_restore_headers(&mut self) { + // Remove any restore-related metadata + self.meta_sys.retain(|k, _| !k.starts_with("X-Amz-Restore")); + } + + /// Get object signature + pub fn get_signature(&self) -> [u8; 4] { + let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); + hasher.update(self.version_id.unwrap_or_default().as_bytes()); + if let Some(mod_time) = self.mod_time { + hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); + } + hasher.update(&self.size.to_le_bytes()); + let hash = hasher.finish(); + let bytes = hash.to_le_bytes(); + [bytes[0], bytes[1], bytes[2], bytes[3]] + } + + pub fn init_free_version(&self, fi: &FileInfo) -> (FileMetaVersion, bool) { + if fi.skip_tier_free_version() { + return (FileMetaVersion::default(), false); + } + if let Some(status) = self + .meta_sys + .get(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}")) + && *status == TRANSITION_COMPLETE.as_bytes().to_vec() + { + let vid = Uuid::parse_str(&fi.tier_free_version_id()); + if let Err(err) = vid { + panic!("Invalid Tier Object delete marker versionId {} {}", fi.tier_free_version_id(), err); + } + let vid = vid.unwrap(); + let mut free_entry = FileMetaVersion { + version_type: VersionType::Delete, + write_version: 0, + ..Default::default() + }; + free_entry.delete_marker = Some(MetaDeleteMarker { + version_id: Some(vid), + mod_time: self.mod_time, + meta_sys: HashMap::>::new(), + }); + + let delete_marker = free_entry.delete_marker.as_mut().unwrap(); + + delete_marker + .meta_sys + .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}{FREE_VERSION}"), vec![]); + + let tier_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}"); + let tier_obj_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}"); + let tier_obj_vid_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}"); + + let aa = [tier_key, tier_obj_key, tier_obj_vid_key]; + for (k, v) in &self.meta_sys { + if aa.contains(k) { + delete_marker.meta_sys.insert(k.clone(), v.clone()); + } + } + return (free_entry, true); + } + (FileMetaVersion::default(), false) + } +} + +impl From for MetaObject { + fn from(value: FileInfo) -> Self { + let part_etags = if !value.parts.is_empty() { + value.parts.iter().map(|v| v.etag.clone()).collect() + } else { + vec![] + }; + + let part_indices = if !value.parts.is_empty() { + value.parts.iter().map(|v| v.index.clone().unwrap_or_default()).collect() + } else { + vec![] + }; + + let mut meta_sys = HashMap::new(); + let mut meta_user = HashMap::new(); + for (k, v) in value.metadata.iter() { + if k.len() > RESERVED_METADATA_PREFIX.len() + && (k.starts_with(RESERVED_METADATA_PREFIX) || k.starts_with(RESERVED_METADATA_PREFIX_LOWER)) + { + if k == headers::X_RUSTFS_HEALING || k == headers::X_RUSTFS_DATA_MOV { + continue; + } + + meta_sys.insert(k.to_owned(), v.as_bytes().to_vec()); + } else { + meta_user.insert(k.to_owned(), v.to_owned()); + } + } + + if !value.transition_status.is_empty() { + meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}"), + value.transition_status.as_bytes().to_vec(), + ); + } + + if !value.transitioned_objname.is_empty() { + meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}"), + value.transitioned_objname.as_bytes().to_vec(), + ); + } + + if let Some(vid) = &value.transition_version_id { + meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}"), + vid.as_bytes().to_vec(), + ); + } + + if !value.transition_tier.is_empty() { + meta_sys.insert( + format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}"), + value.transition_tier.as_bytes().to_vec(), + ); + } + + if let Some(content_hash) = value.checksum { + meta_sys.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}crc"), content_hash.to_vec()); + } + + Self { + version_id: value.version_id, + data_dir: value.data_dir, + size: value.size, + mod_time: value.mod_time, + erasure_algorithm: ErasureAlgo::ReedSolomon, + erasure_m: value.erasure.data_blocks, + erasure_n: value.erasure.parity_blocks, + erasure_block_size: value.erasure.block_size, + erasure_index: value.erasure.index, + erasure_dist: value.erasure.distribution.iter().map(|x| *x as u8).collect(), + bitrot_checksum_algo: ChecksumAlgo::HighwayHash, + part_numbers: value.parts.iter().map(|v| v.number).collect(), + part_etags, + part_sizes: value.parts.iter().map(|v| v.size).collect(), + part_actual_sizes: value.parts.iter().map(|v| v.actual_size).collect(), + part_indices, + meta_sys, + meta_user, + } + } +} + +fn get_internal_replication_state(metadata: &HashMap) -> Option { + let mut rs = ReplicationState::default(); + let mut has = false; + + for (k, v) in metadata.iter() { + if k == VERSION_PURGE_STATUS_KEY { + rs.version_purge_status_internal = Some(v.clone()); + rs.purge_targets = version_purge_statuses_map(v.as_str()); + has = true; + continue; + } + + if let Some(sub_key) = k.strip_prefix(RESERVED_METADATA_PREFIX_LOWER) { + match sub_key { + "replica-timestamp" => { + has = true; + rs.replica_timestamp = Some(OffsetDateTime::parse(v, &Rfc3339).unwrap_or(OffsetDateTime::UNIX_EPOCH)); + } + "replica-status" => { + has = true; + rs.replica_status = ReplicationStatusType::from(v.as_str()); + } + "replication-timestamp" => { + has = true; + rs.replication_timestamp = Some(OffsetDateTime::parse(v, &Rfc3339).unwrap_or(OffsetDateTime::UNIX_EPOCH)) + } + "replication-status" => { + has = true; + rs.replication_status_internal = Some(v.clone()); + rs.targets = replication_statuses_map(v.as_str()); + } + _ => { + if let Some(arn) = sub_key.strip_prefix("replication-reset-") { + has = true; + rs.reset_statuses_map.insert(arn.to_string(), v.clone()); + } + } + } + } + } + + if has { Some(rs) } else { None } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +pub struct MetaDeleteMarker { + #[serde(rename = "ID")] + pub version_id: Option, // Version ID for delete marker + #[serde(rename = "MTime")] + pub mod_time: Option, // Object delete marker modified time + #[serde(rename = "MetaSys")] + pub meta_sys: HashMap>, // Delete marker internal metadata +} + +impl MetaDeleteMarker { + pub fn free_version(&self) -> bool { + self.meta_sys + .contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}{FREE_VERSION}").as_str()) + } + + pub fn into_fileinfo(&self, volume: &str, path: &str, _all_parts: bool) -> FileInfo { + let metadata = self + .meta_sys + .clone() + .into_iter() + .map(|(k, v)| (k, String::from_utf8_lossy(&v).to_string())) + .collect(); + let replication_state_internal = get_internal_replication_state(&metadata); + + let mut fi = FileInfo { + version_id: self.version_id.filter(|&vid| !vid.is_nil()), + name: path.to_string(), + volume: volume.to_string(), + deleted: true, + mod_time: self.mod_time, + metadata, + replication_state_internal, + ..Default::default() + }; + + if self.free_version() { + fi.set_tier_free_version(); + fi.transition_tier = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_TIER}").as_str()) + .map(|v| String::from_utf8_lossy(v).to_string()) + .unwrap_or_default(); + + fi.transitioned_objname = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_OBJECTNAME}").as_str()) + .map(|v| String::from_utf8_lossy(v).to_string()) + .unwrap_or_default(); + + fi.transition_version_id = self + .meta_sys + .get(format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITIONED_VERSION_ID}").as_str()) + .map(|v| Uuid::from_slice(v.as_slice()).unwrap_or_default()); + } + + fi + } + + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { + let ret: Self = rmp_serde::from_slice(buf)?; + + *self = ret; + + Ok(buf.len() as u64) + + // let mut cur = Cursor::new(buf); + + // let mut fields_len = rmp::decode::read_map_len(&mut cur)?; + + // while fields_len > 0 { + // fields_len -= 1; + + // let str_len = rmp::decode::read_str_len(&mut cur)?; + + // // !!! Vec::with_capacity(str_len) fails, vec! works normally + // let mut field_buff = vec![0u8; str_len as usize]; + + // cur.read_exact(&mut field_buff)?; + + // let field = String::from_utf8(field_buff)?; + + // match field.as_str() { + // "ID" => { + // rmp::decode::read_bin_len(&mut cur)?; + // let mut buf = [0u8; 16]; + // cur.read_exact(&mut buf)?; + // self.version_id = { + // let id = Uuid::from_bytes(buf); + // if id.is_nil() { None } else { Some(id) } + // }; + // } + + // "MTime" => { + // let unix: i64 = rmp::decode::read_int(&mut cur)?; + // let time = OffsetDateTime::from_unix_timestamp(unix)?; + // if time == OffsetDateTime::UNIX_EPOCH { + // self.mod_time = None; + // } else { + // self.mod_time = Some(time); + // } + // } + // "MetaSys" => { + // let l = rmp::decode::read_map_len(&mut cur)?; + // let mut map = HashMap::new(); + // for _ in 0..l { + // let str_len = rmp::decode::read_str_len(&mut cur)?; + // let mut field_buff = vec![0u8; str_len as usize]; + // cur.read_exact(&mut field_buff)?; + // let key = String::from_utf8(field_buff)?; + + // let blen = rmp::decode::read_bin_len(&mut cur)?; + // let mut val = vec![0u8; blen as usize]; + // cur.read_exact(&mut val)?; + + // map.insert(key, val); + // } + + // self.meta_sys = Some(map); + // } + // name => return Err(Error::other(format!("not support field name {name}"))), + // } + // } + + // Ok(cur.position()) + } + + pub fn marshal_msg(&self) -> Result> { + let buf = rmp_serde::to_vec(self)?; + Ok(buf) + + // let mut len: u32 = 3; + // let mut mask: u8 = 0; + + // if self.meta_sys.is_none() { + // len -= 1; + // mask |= 0x4; + // } + + // let mut wr = Vec::new(); + + // // Field count + // rmp::encode::write_map_len(&mut wr, len)?; + + // // string "ID" + // rmp::encode::write_str(&mut wr, "ID")?; + // rmp::encode::write_bin(&mut wr, self.version_id.unwrap_or_default().as_bytes())?; + + // // string "MTime" + // rmp::encode::write_str(&mut wr, "MTime")?; + // rmp::encode::write_uint( + // &mut wr, + // self.mod_time + // .unwrap_or(OffsetDateTime::UNIX_EPOCH) + // .unix_timestamp() + // .try_into() + // .unwrap(), + // )?; + + // if (mask & 0x4) == 0 { + // let metas = self.meta_sys.as_ref().unwrap(); + // rmp::encode::write_map_len(&mut wr, metas.len() as u32)?; + // for (k, v) in metas { + // rmp::encode::write_str(&mut wr, k.as_str())?; + // rmp::encode::write_bin(&mut wr, v)?; + // } + // } + + // Ok(wr) + } + + /// Get delete marker signature + pub fn get_signature(&self) -> [u8; 4] { + let mut hasher = xxhash_rust::xxh64::Xxh64::new(XXHASH_SEED); + hasher.update(self.version_id.unwrap_or_default().as_bytes()); + if let Some(mod_time) = self.mod_time { + hasher.update(&mod_time.unix_timestamp_nanos().to_le_bytes()); + } + let hash = hasher.finish(); + let bytes = hash.to_le_bytes(); + [bytes[0], bytes[1], bytes[2], bytes[3]] + } +} + +impl From for MetaDeleteMarker { + fn from(value: FileInfo) -> Self { + Self { + version_id: value.version_id, + mod_time: value.mod_time, + meta_sys: HashMap::new(), + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Default, Clone, PartialOrd, Ord, Hash)] +pub enum VersionType { + #[default] + Invalid = 0, + Object = 1, + Delete = 2, + Legacy = 3, +} + +impl VersionType { + pub fn valid(&self) -> bool { + matches!(*self, VersionType::Object | VersionType::Delete | VersionType::Legacy) + } + + pub fn to_u8(&self) -> u8 { + match self { + VersionType::Invalid => 0, + VersionType::Object => 1, + VersionType::Delete => 2, + VersionType::Legacy => 3, + } + } + + pub fn from_u8(n: u8) -> Self { + match n { + 1 => VersionType::Object, + 2 => VersionType::Delete, + 3 => VersionType::Legacy, + _ => VersionType::Invalid, + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Default, Clone)] +pub enum ChecksumAlgo { + #[default] + Invalid = 0, + HighwayHash = 1, +} + +impl ChecksumAlgo { + pub fn valid(&self) -> bool { + *self > ChecksumAlgo::Invalid + } + pub fn to_u8(&self) -> u8 { + match self { + ChecksumAlgo::Invalid => 0, + ChecksumAlgo::HighwayHash => 1, + } + } + pub fn from_u8(u: u8) -> Self { + match u { + 1 => ChecksumAlgo::HighwayHash, + _ => ChecksumAlgo::Invalid, + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Default, Clone)] +pub enum Flags { + #[default] + FreeVersion = 1 << 0, + UsesDataDir = 1 << 1, + InlineData = 1 << 2, +} + +// mergeXLV2Versions +pub fn merge_file_meta_versions( + mut quorum: usize, + mut strict: bool, + requested_versions: usize, + versions: &[Vec], +) -> Vec { + if quorum == 0 { + quorum = 1; + } + + if versions.len() < quorum || versions.is_empty() { + return Vec::new(); + } + + if versions.len() == 1 { + return versions[0].clone(); + } + + if quorum == 1 { + strict = true; + } + + let mut versions = versions.to_owned(); + + let mut n_versions = 0; + + let mut merged = Vec::new(); + loop { + let mut tops = Vec::new(); + let mut top_sig = FileMetaVersionHeader::default(); + let mut consistent = true; + for vers in versions.iter() { + if vers.is_empty() { + consistent = false; + continue; + } + if tops.is_empty() { + consistent = true; + top_sig = vers[0].header.clone(); + } else { + consistent = consistent && vers[0].header == top_sig; + } + tops.push(vers[0].clone()); + } + + // check if done... + if tops.len() < quorum { + break; + } + + let mut latest = FileMetaShallowVersion::default(); + if consistent { + merged.push(tops[0].clone()); + if !tops[0].header.free_version() { + n_versions += 1; + } + } else { + let mut latest_count = 0; + for (i, ver) in tops.iter().enumerate() { + if ver.header == latest.header { + latest_count += 1; + continue; + } + + if i == 0 || ver.header.sorts_before(&latest.header) { + if i == 0 || latest_count == 0 { + latest_count = 1; + } else if !strict && ver.header.matches_not_strict(&latest.header) { + latest_count += 1; + } else { + latest_count = 1; + } + latest = ver.clone(); + continue; + } + + // Mismatch, but older. + if latest_count > 0 && !strict && ver.header.matches_not_strict(&latest.header) { + latest_count += 1; + continue; + } + + if latest_count > 0 && ver.header.version_id == latest.header.version_id { + let mut x: HashMap = HashMap::new(); + for a in tops.iter() { + if a.header.version_id != ver.header.version_id { + continue; + } + let mut a_clone = a.clone(); + if !strict { + a_clone.header.signature = [0; 4]; + } + *x.entry(a_clone.header).or_insert(0) += 1; + } + latest_count = 0; + for (k, v) in x.iter() { + if *v < latest_count { + continue; + } + if *v == latest_count && latest.header.sorts_before(k) { + continue; + } + tops.iter().for_each(|a| { + let mut hdr = a.header.clone(); + if !strict { + hdr.signature = [0; 4]; + } + if hdr == *k { + latest = a.clone(); + } + }); + + latest_count = *v; + } + break; + } + } + if latest_count >= quorum { + if !latest.header.free_version() { + n_versions += 1; + } + merged.push(latest.clone()); + } + } + + // Remove from all streams up until latest modtime or if selected. + versions.iter_mut().for_each(|vers| { + // // Keep top entry (and remaining)... + let mut bre = false; + vers.retain(|ver| { + if bre { + return true; + } + if let Ordering::Greater = ver.header.mod_time.cmp(&latest.header.mod_time) { + bre = true; + return false; + } + if ver.header == latest.header { + bre = true; + return false; + } + if let Ordering::Equal = latest.header.version_id.cmp(&ver.header.version_id) { + bre = true; + return false; + } + for merged_v in merged.iter() { + if let Ordering::Equal = ver.header.version_id.cmp(&merged_v.header.version_id) { + bre = true; + return false; + } + } + true + }); + }); + if requested_versions > 0 && requested_versions == n_versions { + merged.append(&mut versions[0]); + break; + } + } + + // Sanity check. Enable if duplicates show up. + // todo + merged +} + +pub fn file_info_from_raw( + ri: RawFileInfo, + bucket: &str, + object: &str, + read_data: bool, + include_free_versions: bool, +) -> Result { + get_file_info( + &ri.buf, + bucket, + object, + "", + FileInfoOpts { + data: read_data, + include_free_versions, + }, + ) +} + +pub struct FileInfoOpts { + pub data: bool, + pub include_free_versions: bool, +} + +pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result { + let vid = { + if version_id.is_empty() { + None + } else { + Some(Uuid::parse_str(version_id)?) + } + }; + + let meta = FileMeta::load(buf)?; + if meta.versions.is_empty() { + return Ok(FileInfo { + volume: volume.to_owned(), + name: path.to_owned(), + version_id: vid, + is_latest: true, + deleted: true, + mod_time: Some(OffsetDateTime::from_unix_timestamp(1)?), + ..Default::default() + }); + } + + let fi = meta.into_fileinfo(volume, path, version_id, opts.data, opts.include_free_versions, true)?; + Ok(fi) +} + +async fn read_more( + reader: &mut R, + buf: &mut Vec, + total_size: usize, + read_size: usize, + has_full: bool, +) -> Result<()> { + use tokio::io::AsyncReadExt; + let has = buf.len(); + + if has >= read_size { + return Ok(()); + } + + if has_full || read_size > total_size { + return Err(Error::other(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Unexpected EOF"))); + } + + let extra = read_size - has; + if buf.capacity() >= read_size { + // Extend the buffer if we have enough space. + buf.resize(read_size, 0); + } else { + buf.extend(vec![0u8; extra]); + } + + reader.read_exact(&mut buf[has..]).await?; + Ok(()) +} + +pub async fn read_xl_meta_no_data(reader: &mut R, size: usize) -> Result> { + use tokio::io::AsyncReadExt; + + let mut initial = size; + let mut has_full = true; + + if initial > META_DATA_READ_DEFAULT { + initial = META_DATA_READ_DEFAULT; + has_full = false; + } + + let mut buf = vec![0u8; initial]; + reader.read_exact(&mut buf).await?; + + let (tmp_buf, major, minor) = FileMeta::check_xl2_v1(&buf)?; + + match major { + 1 => match minor { + 0 => { + read_more(reader, &mut buf, size, size, has_full).await?; + Ok(buf) + } + 1..=3 => { + let (sz, tmp_buf) = FileMeta::read_bytes_header(tmp_buf)?; + let mut want = sz as usize + (buf.len() - tmp_buf.len()); + + if minor < 2 { + read_more(reader, &mut buf, size, want, has_full).await?; + buf.truncate(want); + return Ok(buf); + } + + let want_max = usize::min(want + MSGP_UINT32_SIZE, size); + read_more(reader, &mut buf, size, want_max, has_full).await?; + + if buf.len() < want { + return Err(Error::FileCorrupt); + } + + let tmp = &buf[want..]; + let crc_size = 5; + let other_size = tmp.len() - crc_size; + + want += tmp.len() - other_size; + + buf.truncate(want); + Ok(buf) + } + _ => Err(Error::other(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Unknown minor metadata version", + ))), + }, + _ => Err(Error::other(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Unknown major metadata version", + ))), + } +}