refactor(filemeta): split filemeta into focused submodules (#1946)

This commit is contained in:
安正超
2026-02-25 08:38:32 +08:00
committed by GitHub
parent 7f132a290c
commit b48f273c7d
5 changed files with 2077 additions and 2004 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,342 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
impl FileMeta {
pub fn is_xl2_v1_format(buf: &[u8]) -> bool {
!matches!(Self::check_xl2_v1(buf), Err(_e))
}
pub fn load(buf: &[u8]) -> Result<FileMeta> {
let mut xl = FileMeta::default();
xl.unmarshal_msg(buf)?;
Ok(xl)
}
pub fn check_xl2_v1(buf: &[u8]) -> Result<(&[u8], u16, u16)> {
if buf.len() < 8 {
return Err(Error::other("xl file header not exists"));
}
if buf[0..4] != XL_FILE_HEADER {
return Err(Error::other("xl file header err"));
}
let major = byteorder::LittleEndian::read_u16(&buf[4..6]);
let minor = byteorder::LittleEndian::read_u16(&buf[6..8]);
if major > XL_FILE_VERSION_MAJOR {
return Err(Error::other("xl file version err"));
}
Ok((&buf[8..], major, minor))
}
// Returns (meta, inline_data)
pub fn is_indexed_meta(buf: &[u8]) -> Result<(&[u8], &[u8])> {
let (buf, major, minor) = Self::check_xl2_v1(buf)?;
if major != 1 || minor < 3 {
return Ok((&[], &[]));
}
if buf.len() < 5 {
return Err(Error::other("insufficient data for meta length"));
}
let (mut size_buf, buf) = buf.split_at(5);
// Get meta data, buf = crc + data
let bin_len = rmp::decode::read_bin_len(&mut size_buf)?;
if buf.len() < bin_len as usize {
return Ok((&[], &[]));
}
let (meta, buf) = buf.split_at(bin_len as usize);
if buf.len() < 5 {
return Err(Error::other("insufficient data for CRC"));
}
let (mut crc_buf, inline_data) = buf.split_at(5);
// crc check
let crc = rmp::decode::read_u32(&mut crc_buf)?;
let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32;
if crc != meta_crc {
return Err(Error::other("xl file crc check failed"));
}
Ok((meta, inline_data))
}
// Fixed u32
pub fn read_bytes_header(buf: &[u8]) -> Result<(u32, &[u8])> {
if buf.len() < 5 {
return Err(Error::other("insufficient data for bytes header"));
}
let (mut size_buf, remaining) = buf.split_at(5);
// Get meta data, buf = crc + data
let bin_len = rmp::decode::read_bin_len(&mut size_buf)?;
Ok((bin_len, remaining))
}
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let i = buf.len() as u64;
// check version, buf = buf[8..]
let (buf, _, _) = Self::check_xl2_v1(buf).map_err(|e| {
error!("failed to check XL2 v1 format: {}", e);
e
})?;
if buf.len() < 5 {
error!(
"insufficient data for metadata length prefix: expected at least 5 bytes, got {}",
buf.len()
);
return Err(Error::other("insufficient data for metadata length prefix"));
}
let (mut size_buf, buf) = buf.split_at(5);
// Get meta data, buf = crc + data
let bin_len = rmp::decode::read_bin_len(&mut size_buf).map_err(|e| {
error!("failed to read binary length for metadata: {}", e);
Error::other(format!("failed to read binary length for metadata: {e}"))
})?;
if buf.len() < bin_len as usize {
error!("insufficient data for metadata: expected {} bytes, got {} bytes", bin_len, buf.len());
return Err(Error::other("insufficient data for metadata"));
}
let (meta, buf) = buf.split_at(bin_len as usize);
if buf.len() < 5 {
error!("insufficient data for CRC: expected 5 bytes, got {} bytes", buf.len());
return Err(Error::other("insufficient data for CRC"));
}
let (mut crc_buf, buf) = buf.split_at(5);
// crc check
let crc = rmp::decode::read_u32(&mut crc_buf).map_err(|e| {
error!("failed to read CRC value: {}", e);
Error::other(format!("failed to read CRC value: {e}"))
})?;
let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32;
if crc != meta_crc {
error!("xl file crc check failed: expected CRC {:#x}, got {:#x}", meta_crc, crc);
return Err(Error::other("xl file crc check failed"));
}
if !buf.is_empty() {
self.data.update(buf);
self.data.validate().map_err(|e| {
error!("data validation failed: {}", e);
e
})?;
}
// Parse meta
if !meta.is_empty() {
let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta).map_err(|e| {
error!("failed to decode XL headers: {}", e);
e
})?;
// let (_, meta) = meta.split_at(read_size as usize);
self.meta_ver = meta_ver;
self.versions = Vec::with_capacity(versions_len);
let mut cur: Cursor<&[u8]> = Cursor::new(meta);
for _ in 0..versions_len {
let bin_len = rmp::decode::read_bin_len(&mut cur).map_err(|e| {
error!("failed to read binary length for version header: {}", e);
Error::other(format!("failed to read binary length for version header: {e}"))
})? as usize;
let mut header_buf = vec![0u8; bin_len];
cur.read_exact(&mut header_buf)?;
let mut ver = FileMetaShallowVersion::default();
ver.header.unmarshal_msg(&header_buf).map_err(|e| {
error!("failed to unmarshal version header: {}", e);
e
})?;
let bin_len = rmp::decode::read_bin_len(&mut cur).map_err(|e| {
error!("failed to read binary length for version metadata: {}", e);
Error::other(format!("failed to read binary length for version metadata: {e}"))
})? as usize;
let mut ver_meta_buf = vec![0u8; bin_len];
cur.read_exact(&mut ver_meta_buf)?;
ver.meta.extend_from_slice(&ver_meta_buf);
self.versions.push(ver);
}
}
Ok(i)
}
// decode_xl_headers parses meta header, returns (versions count, xl_header_version, xl_meta_version, read data length)
fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> {
let mut cur = Cursor::new(buf);
let header_ver: u8 = rmp::decode::read_int(&mut cur)?;
if header_ver > XL_HEADER_VERSION {
return Err(Error::other("xl header version invalid"));
}
let meta_ver: u8 = rmp::decode::read_int(&mut cur)?;
if meta_ver > XL_META_VERSION {
return Err(Error::other("xl meta version invalid"));
}
let versions_len: usize = rmp::decode::read_int(&mut cur)?;
Ok((versions_len, header_ver, meta_ver, &buf[cur.position() as usize..]))
}
fn decode_versions<F: FnMut(usize, &[u8], &[u8]) -> Result<()>>(buf: &[u8], versions: usize, mut fnc: F) -> Result<()> {
let mut cur: Cursor<&[u8]> = Cursor::new(buf);
for i in 0..versions {
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
let start = cur.position() as usize;
let end = start + bin_len;
let header_buf = &buf[start..end];
cur.set_position(end as u64);
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
let start = cur.position() as usize;
let end = start + bin_len;
let ver_meta_buf = &buf[start..end];
cur.set_position(end as u64);
if let Err(err) = fnc(i, header_buf, ver_meta_buf) {
if err == Error::DoneForNow {
return Ok(());
}
return Err(err);
}
}
Ok(())
}
pub fn is_latest_delete_marker(buf: &[u8]) -> bool {
let header = Self::decode_xl_headers(buf).ok();
if let Some((versions, _hdr_v, _meta_v, meta)) = header {
if versions == 0 {
return false;
}
let mut is_delete_marker = false;
let _ = Self::decode_versions(meta, versions, |_: usize, hdr: &[u8], _: &[u8]| {
let mut header = FileMetaVersionHeader::default();
if header.unmarshal_msg(hdr).is_err() {
return Err(Error::DoneForNow);
}
is_delete_marker = header.version_type == VersionType::Delete;
Err(Error::DoneForNow)
});
is_delete_marker
} else {
false
}
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut wr = Vec::new();
// header
wr.write_all(XL_FILE_HEADER.as_slice())?;
let mut major = [0u8; 2];
byteorder::LittleEndian::write_u16(&mut major, XL_FILE_VERSION_MAJOR);
wr.write_all(major.as_slice())?;
let mut minor = [0u8; 2];
byteorder::LittleEndian::write_u16(&mut minor, XL_FILE_VERSION_MINOR);
wr.write_all(minor.as_slice())?;
// size bin32 reserved for write_bin_len
wr.write_all(&[0xc6, 0, 0, 0, 0])?;
let offset = wr.len();
// xl header
rmp::encode::write_uint8(&mut wr, XL_HEADER_VERSION)?;
rmp::encode::write_uint8(&mut wr, XL_META_VERSION)?;
// versions
rmp::encode::write_sint(&mut wr, self.versions.len() as i64)?;
for ver in self.versions.iter() {
let hmsg = ver.header.marshal_msg()?;
rmp::encode::write_bin(&mut wr, &hmsg)?;
rmp::encode::write_bin(&mut wr, &ver.meta)?;
}
// Update bin length
let data_len = wr.len() - offset;
byteorder::BigEndian::write_u32(&mut wr[offset - 4..offset], data_len as u32);
let crc = xxh64::xxh64(&wr[offset..], XXHASH_SEED) as u32;
let mut crc_buf = [0u8; 5];
crc_buf[0] = 0xce; // u32
byteorder::BigEndian::write_u32(&mut crc_buf[1..], crc);
wr.write_all(&crc_buf)?;
wr.write_all(self.data.as_slice())?;
Ok(wr)
}
// pub fn unmarshal(buf: &[u8]) -> Result<Self> {
// let mut s = Self::default();
// s.unmarshal_msg(buf)?;
// Ok(s)
// // let t: FileMeta = rmp_serde::from_slice(buf)?;
// // Ok(t)
// }
// pub fn marshal_msg(&self) -> Result<Vec<u8>> {
// let mut buf = Vec::new();
// self.serialize(&mut Serializer::new(&mut buf))?;
// Ok(buf)
// }
}

View File

@@ -0,0 +1,62 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
impl FileMeta {
pub fn shard_data_dir_count(&self, vid: &Option<Uuid>, data_dir: &Option<Uuid>) -> usize {
let vid = vid.unwrap_or_default();
self.versions
.iter()
.filter(|v| {
v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.uses_data_dir()
})
.map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default())
.filter(|v| v == data_dir)
.count()
}
pub fn get_data_dirs(&self) -> Result<Vec<Option<Uuid>>> {
let mut data_dirs = Vec::new();
for version in &self.versions {
if version.header.version_type == VersionType::Object {
let ver = FileMetaVersion::try_from(version.meta.as_slice())?;
data_dirs.push(ver.get_data_dir());
}
}
Ok(data_dirs)
}
/// Count shared data directories
pub fn shared_data_dir_count(&self, version_id: Option<Uuid>, data_dir: Option<Uuid>) -> usize {
let version_id = version_id.unwrap_or_default();
if self.data.entries().unwrap_or_default() > 0
&& self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some()
{
return 0;
}
self.versions
.iter()
.filter(|v| {
v.header.version_type == VersionType::Object
&& v.header.version_id != Some(version_id)
&& v.header.uses_data_dir()
})
.filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok())
.filter(|&dir| dir.is_some() && dir == data_dir)
.count()
}
}

View File

@@ -0,0 +1,181 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
impl FileMeta {
/// Check if the metadata format is compatible
pub fn is_compatible_with_meta(&self) -> bool {
// Check version compatibility
if self.meta_ver != XL_META_VERSION {
return false;
}
// For compatibility, we allow versions with different types
// Just check basic structure validity
true
}
/// Validate metadata integrity
pub fn validate_integrity(&self) -> Result<()> {
// Check if versions are sorted by modification time
if !self.is_sorted_by_mod_time() {
return Err(Error::other("versions not sorted by modification time"));
}
// Validate inline data if present
self.data.validate()?;
Ok(())
}
/// Check if versions are sorted by modification time (newest first)
pub(crate) fn is_sorted_by_mod_time(&self) -> bool {
if self.versions.len() <= 1 {
return true;
}
for i in 1..self.versions.len() {
let prev_time = self.versions[i - 1].header.mod_time;
let curr_time = self.versions[i].header.mod_time;
match (prev_time, curr_time) {
(Some(prev), Some(curr)) => {
if prev < curr {
return false;
}
}
(None, Some(_)) => return false,
_ => continue,
}
}
true
}
/// Get statistics about versions
pub fn get_version_stats(&self) -> VersionStats {
let mut stats = VersionStats {
total_versions: self.versions.len(),
..Default::default()
};
for version in &self.versions {
match version.header.version_type {
VersionType::Object => stats.object_versions += 1,
VersionType::Delete => stats.delete_markers += 1,
VersionType::Invalid | VersionType::Legacy => stats.invalid_versions += 1,
}
if version.header.free_version() {
stats.free_versions += 1;
}
}
stats
}
}
#[derive(Debug, Default, Clone)]
pub struct VersionStats {
pub total_versions: usize,
pub object_versions: usize,
pub delete_markers: usize,
pub invalid_versions: usize,
pub free_versions: usize,
}
impl FileMetaVersionHeader {
pub fn is_valid(&self) -> bool {
// Check if version type is valid
if !self.version_type.valid() {
return false;
}
// Check if modification time is reasonable (not too far in the future)
if let Some(mod_time) = self.mod_time {
let now = OffsetDateTime::now_utc();
let future_limit = now + time::Duration::hours(24); // Allow 24 hours in future
if mod_time > future_limit {
return false;
}
}
// Check erasure coding parameters
if self.has_ec() && (self.ec_n == 0 || self.ec_m == 0 || self.ec_m < self.ec_n) {
return false;
}
true
}
}
/// Enhanced version statistics with more detailed information
#[derive(Debug, Default, Clone)]
pub struct DetailedVersionStats {
pub total_versions: usize,
pub object_versions: usize,
pub delete_markers: usize,
pub invalid_versions: usize,
pub legacy_versions: usize,
pub free_versions: usize,
pub versions_with_data_dir: usize,
pub versions_with_inline_data: usize,
pub total_size: i64,
pub latest_mod_time: Option<OffsetDateTime>,
}
impl FileMeta {
/// Get detailed statistics about versions
pub fn get_detailed_version_stats(&self) -> DetailedVersionStats {
let mut stats = DetailedVersionStats {
total_versions: self.versions.len(),
..Default::default()
};
for version in &self.versions {
match version.header.version_type {
VersionType::Object => {
stats.object_versions += 1;
if let Ok(ver) = FileMetaVersion::try_from(version.meta.as_slice())
&& let Some(obj) = &ver.object
{
stats.total_size += obj.size;
if obj.uses_data_dir() {
stats.versions_with_data_dir += 1;
}
if obj.inlinedata() {
stats.versions_with_inline_data += 1;
}
}
}
VersionType::Delete => stats.delete_markers += 1,
VersionType::Legacy => stats.legacy_versions += 1,
VersionType::Invalid => stats.invalid_versions += 1,
}
if version.header.free_version() {
stats.free_versions += 1;
}
if stats.latest_mod_time.is_none()
|| (version.header.mod_time.is_some() && version.header.mod_time > stats.latest_mod_time)
{
stats.latest_mod_time = version.header.mod_time;
}
}
stats
}
}

File diff suppressed because it is too large Load Diff