diff --git a/crates/ahm/src/scanner/local_scan/mod.rs b/crates/ahm/src/scanner/local_scan/mod.rs index 24a5a260..16b3cc55 100644 --- a/crates/ahm/src/scanner/local_scan/mod.rs +++ b/crates/ahm/src/scanner/local_scan/mod.rs @@ -306,7 +306,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res versions_count = versions_count.saturating_add(1); if latest_file_info.is_none() - && let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false) + && let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false, false) { latest_file_info = Some(info); } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 7f2c5c48..09991fc5 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -21,6 +21,7 @@ use super::{ }; use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; +use crate::config::storageclass::DEFAULT_INLINE_BLOCK; use crate::data_usage::local_snapshot::ensure_data_usage_layout; use crate::disk::error::FileAccessDeniedWithContext; use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error}; @@ -489,9 +490,17 @@ impl LocalDisk { let file_dir = self.get_bucket_path(volume)?; let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?; - get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data }) - .await - .map_err(|_e| DiskError::Unexpected) + get_file_info( + &data, + volume, + path, + version_id, + FileInfoOpts { + data: opts.read_data, + include_free_versions: false, + }, + ) + .map_err(|_e| DiskError::Unexpected) } // Batch metadata reading for multiple objects @@ -2240,20 +2249,93 @@ impl DiskAPI for LocalDisk { #[tracing::instrument(level = "debug", skip(self))] async fn read_version( &self, - _org_volume: &str, + org_volume: &str, volume: &str, path: &str, version_id: &str, opts: &ReadOptions, ) -> Result { + if !org_volume.is_empty() { + let org_volume_path = self.get_bucket_path(org_volume)?; + if !skip_access_checks(org_volume) { + access(&org_volume_path) + .await + .map_err(|e| to_access_error(e, DiskError::VolumeAccessDenied))?; + } + } + let file_path = self.get_object_path(volume, path)?; - let file_dir = self.get_bucket_path(volume)?; + let volume_dir = self.get_bucket_path(volume)?; + + check_path_length(file_path.to_string_lossy().as_ref())?; let read_data = opts.read_data; - let (data, _) = self.read_raw(volume, file_dir, file_path, read_data).await?; + let (data, _) = self + .read_raw(volume, volume_dir.clone(), file_path, read_data) + .await + .map_err(|e| { + if e == DiskError::FileNotFound && !version_id.is_empty() { + DiskError::FileVersionNotFound + } else { + e + } + })?; - let fi = get_file_info(&data, volume, path, version_id, FileInfoOpts { data: read_data }).await?; + let mut fi = get_file_info( + &data, + volume, + path, + version_id, + FileInfoOpts { + data: read_data, + include_free_versions: opts.incl_free_versions, + }, + )?; + + if opts.read_data { + if fi.data.as_ref().is_some_and(|d| !d.is_empty()) || fi.size == 0 { + if fi.inline_data() { + return Ok(fi); + } + + if fi.size == 0 || fi.version_id.is_none_or(|v| v.is_nil()) { + fi.set_inline_data(); + return Ok(fi); + }; + if let Some(part) = fi.parts.first() { + let part_path = format!("part.{}", part.number); + let part_path = path_join_buf(&[ + path, + fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(), + part_path.as_str(), + ]); + let part_path = self.get_object_path(volume, part_path.as_str())?; + if lstat(&part_path).await.is_err() { + fi.set_inline_data(); + return Ok(fi); + } + } + + fi.data = None; + } + + let inline = fi.transition_status.is_empty() && fi.data_dir.is_some() && fi.parts.len() == 1; + if inline && fi.shard_file_size(fi.parts[0].actual_size) < DEFAULT_INLINE_BLOCK as i64 { + let part_path = path_join_buf(&[ + path, + fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(), + format!("part.{}", fi.parts[0].number).as_str(), + ]); + let part_path = self.get_object_path(volume, part_path.as_str())?; + + let data = self.read_all_data(volume, volume_dir, part_path.clone()).await.map_err(|e| { + warn!("read_version read_all_data {:?} failed: {e}", part_path); + e + })?; + fi.data = Some(Bytes::from(data)); + } + } Ok(fi) } diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 782c3690..ccf2bcd9 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -1488,17 +1488,7 @@ impl SetDisks { if let Some(disk) = disk && disk.is_online().await { - if version_id.is_empty() { - match disk.read_xl(&bucket, &object, read_data).await { - Ok(info) => { - let fi = file_info_from_raw(info, &bucket, &object, read_data).await?; - Ok(fi) - } - Err(err) => Err(err), - } - } else { - disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await - } + disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await } else { Err(DiskError::DiskNotFound) } @@ -1626,7 +1616,7 @@ impl SetDisks { bucket: &str, object: &str, read_data: bool, - _incl_free_vers: bool, + incl_free_vers: bool, ) -> (Vec, Vec>) { let mut metadata_array = vec![None; fileinfos.len()]; let mut meta_file_infos = vec![FileInfo::default(); fileinfos.len()]; @@ -1676,7 +1666,7 @@ impl SetDisks { ..Default::default() }; - let finfo = match meta.into_fileinfo(bucket, object, "", true, true) { + let finfo = match meta.into_fileinfo(bucket, object, "", true, incl_free_vers, true) { Ok(res) => res, Err(err) => { for item in errs.iter_mut() { @@ -1703,7 +1693,7 @@ impl SetDisks { for (idx, meta_op) in metadata_array.iter().enumerate() { if let Some(meta) = meta_op { - match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, true) { + match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, incl_free_vers, true) { Ok(res) => meta_file_infos[idx] = res, Err(err) => errs[idx] = Some(err.into()), } diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index 58d62f87..2071e78a 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -28,14 +28,15 @@ use http::{HeaderMap, HeaderValue}; use rustfs_common::heal_channel::HealOpts; use rustfs_filemeta::{ FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState, - ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map, + ReplicationStatusType, RestoreStatusOps as _, VersionPurgeStatusType, parse_restore_obj_status, replication_statuses_map, + version_purge_statuses_map, }; use rustfs_madmin::heal_commands::HealResultItem; use rustfs_rio::Checksum; use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader}; use rustfs_utils::CompressionAlgorithm; -use rustfs_utils::http::AMZ_STORAGE_CLASS; use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER}; +use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_RESTORE, AMZ_STORAGE_CLASS}; use rustfs_utils::path::decode_dir_object; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -756,7 +757,24 @@ impl ObjectInfo { .ok() }); - // TODO:ReplicationState + let replication_status_internal = fi + .replication_state_internal + .as_ref() + .and_then(|v| v.replication_status_internal.clone()); + let version_purge_status_internal = fi + .replication_state_internal + .as_ref() + .and_then(|v| v.version_purge_status_internal.clone()); + + let mut replication_status = fi.replication_status(); + if replication_status.is_empty() + && let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS).cloned() + && status == ReplicationStatusType::Replica.as_str() + { + replication_status = ReplicationStatusType::Replica; + } + + let version_purge_status = fi.version_purge_status(); let transitioned_object = TransitionedObject { name: fi.transitioned_objname.clone(), @@ -777,10 +795,24 @@ impl ObjectInfo { }; // Extract storage class from metadata, default to STANDARD if not found - let storage_class = metadata - .get(AMZ_STORAGE_CLASS) - .cloned() - .or_else(|| Some(storageclass::STANDARD.to_string())); + let storage_class = if !fi.transition_tier.is_empty() { + Some(fi.transition_tier.clone()) + } else { + fi.metadata + .get(AMZ_STORAGE_CLASS) + .cloned() + .or_else(|| Some(storageclass::STANDARD.to_string())) + }; + + let mut restore_ongoing = false; + let mut restore_expires = None; + if let Some(restore_status) = fi.metadata.get(AMZ_RESTORE).cloned() { + // + if let Ok(restore_status) = parse_restore_obj_status(&restore_status) { + restore_ongoing = restore_status.on_going(); + restore_expires = restore_status.expiry(); + } + } // Convert parts from rustfs_filemeta::ObjectPartInfo to store_api::ObjectPartInfo let parts = fi @@ -798,6 +830,8 @@ impl ObjectInfo { }) .collect(); + // TODO: part checksums + ObjectInfo { bucket: bucket.to_string(), name, @@ -822,6 +856,12 @@ impl ObjectInfo { transitioned_object, checksum: fi.checksum.clone(), storage_class, + restore_ongoing, + restore_expires, + replication_status_internal, + replication_status, + version_purge_status_internal, + version_purge_status, ..Default::default() } } diff --git a/crates/filemeta/src/fileinfo.rs b/crates/filemeta/src/fileinfo.rs index d56420ba..2b5755a2 100644 --- a/crates/filemeta/src/fileinfo.rs +++ b/crates/filemeta/src/fileinfo.rs @@ -505,6 +505,10 @@ impl FileInfo { ReplicationStatusType::Empty } } + + pub fn shard_file_size(&self, total_length: i64) -> i64 { + self.erasure.shard_file_size(total_length) + } } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -590,7 +594,7 @@ impl RestoreStatusOps for RestoreStatus { } } -fn parse_restore_obj_status(restore_hdr: &str) -> Result { +pub fn parse_restore_obj_status(restore_hdr: &str) -> Result { let tokens: Vec<&str> = restore_hdr.splitn(2, ",").collect(); let progress_tokens: Vec<&str> = tokens[0].splitn(2, "=").collect(); if progress_tokens.len() != 2 { diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index 38d0b677..1939d16c 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -14,7 +14,8 @@ use crate::{ ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState, - ReplicationStatusType, Result, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map, + ReplicationStatusType, Result, TIER_FV_ID, TIER_FV_MARKER, VersionPurgeStatusType, replication_statuses_map, + version_purge_statuses_map, }; use byteorder::ByteOrder; use bytes::Bytes; @@ -909,6 +910,7 @@ impl FileMeta { path: &str, version_id: &str, read_data: bool, + include_free_versions: bool, all_parts: bool, ) -> Result { let vid = { @@ -921,11 +923,35 @@ impl FileMeta { let mut is_latest = true; let mut succ_mod_time = None; + let mut non_free_versions = self.versions.len(); + + let mut found = false; + let mut found_free_version = None; + let mut found_fi = None; for ver in self.versions.iter() { let header = &ver.header; // TODO: freeVersion + if header.free_version() { + non_free_versions -= 1; + if include_free_versions && found_free_version.is_none() { + let mut found_free_fi = FileMetaVersion::default(); + if found_free_fi.unmarshal_msg(&ver.meta).is_ok() && found_free_fi.version_type != VersionType::Invalid { + let mut free_fi = found_free_fi.into_fileinfo(volume, path, all_parts); + free_fi.is_latest = true; + found_free_version = Some(free_fi); + } + } + + if header.version_id != Some(vid) { + continue; + } + } + + if found { + continue; + } if !version_id.is_empty() && header.version_id != Some(vid) { is_latest = false; @@ -933,6 +959,8 @@ impl FileMeta { continue; } + found = true; + let mut fi = ver.into_fileinfo(volume, path, all_parts)?; fi.is_latest = is_latest; @@ -947,7 +975,25 @@ impl FileMeta { .map(bytes::Bytes::from); } - fi.num_versions = self.versions.len(); + found_fi = Some(fi); + } + + if !found { + if version_id.is_empty() { + if include_free_versions + && non_free_versions == 0 + && let Some(free_version) = found_free_version + { + return Ok(free_version); + } + return Err(Error::FileNotFound); + } else { + return Err(Error::FileVersionNotFound); + } + } + + if let Some(mut fi) = found_fi { + fi.num_versions = non_free_versions; return Ok(fi); } @@ -1767,14 +1813,27 @@ impl MetaObject { metadata.insert(k.to_owned(), v.to_owned()); } + let tier_fvidkey = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}").to_lowercase(); + let tier_fvmarker_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}").to_lowercase(); + for (k, v) in &self.meta_sys { - if k == AMZ_STORAGE_CLASS && v == b"STANDARD" { + let lower_k = k.to_lowercase(); + + if lower_k == tier_fvidkey || lower_k == tier_fvmarker_key { + continue; + } + + if lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() { + continue; + } + + if lower_k == AMZ_STORAGE_CLASS.to_lowercase() && v == b"STANDARD" { continue; } if k.starts_with(RESERVED_METADATA_PREFIX) || k.starts_with(RESERVED_METADATA_PREFIX_LOWER) - || k == VERSION_PURGE_STATUS_KEY + || lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() { metadata.insert(k.to_owned(), String::from_utf8(v.to_owned()).unwrap_or_default()); } @@ -2511,15 +2570,31 @@ pub fn merge_file_meta_versions( merged } -pub async fn file_info_from_raw(ri: RawFileInfo, bucket: &str, object: &str, read_data: bool) -> Result { - get_file_info(&ri.buf, bucket, object, "", FileInfoOpts { data: read_data }).await +pub fn file_info_from_raw( + ri: RawFileInfo, + bucket: &str, + object: &str, + read_data: bool, + include_free_versions: bool, +) -> Result { + get_file_info( + &ri.buf, + bucket, + object, + "", + FileInfoOpts { + data: read_data, + include_free_versions, + }, + ) } pub struct FileInfoOpts { pub data: bool, + pub include_free_versions: bool, } -pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result { +pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result { let vid = { if version_id.is_empty() { None @@ -2541,7 +2616,7 @@ pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &st }); } - let fi = meta.into_fileinfo(volume, path, version_id, opts.data, true)?; + let fi = meta.into_fileinfo(volume, path, version_id, opts.data, opts.include_free_versions, true)?; Ok(fi) } diff --git a/crates/filemeta/src/metacache.rs b/crates/filemeta/src/metacache.rs index 4cc2c9b1..bfb20f80 100644 --- a/crates/filemeta/src/metacache.rs +++ b/crates/filemeta/src/metacache.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Error, FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, merge_file_meta_versions}; +use crate::{ + Error, FileInfo, FileInfoOpts, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, get_file_info, + merge_file_meta_versions, +}; use rmp::Marker; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; @@ -141,8 +144,7 @@ impl MetaCacheEntry { }); } - if self.cached.is_some() { - let fm = self.cached.as_ref().unwrap(); + if let Some(fm) = &self.cached { if fm.versions.is_empty() { return Ok(FileInfo { volume: bucket.to_owned(), @@ -154,14 +156,20 @@ impl MetaCacheEntry { }); } - let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?; + let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false, true)?; return Ok(fi); } - let mut fm = FileMeta::new(); - fm.unmarshal_msg(&self.metadata)?; - let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?; - Ok(fi) + get_file_info( + &self.metadata, + bucket, + self.name.as_str(), + "", + FileInfoOpts { + data: false, + include_free_versions: false, + }, + ) } pub fn file_info_versions(&self, bucket: &str) -> Result { diff --git a/crates/utils/src/http/headers.rs b/crates/utils/src/http/headers.rs index 8e05dfcd..cea4bff7 100644 --- a/crates/utils/src/http/headers.rs +++ b/crates/utils/src/http/headers.rs @@ -51,6 +51,7 @@ pub const AMZ_TAG_COUNT: &str = "x-amz-tagging-count"; pub const AMZ_TAG_DIRECTIVE: &str = "X-Amz-Tagging-Directive"; // S3 transition restore +pub const AMZ_RESTORE: &str = "x-amz-restore"; pub const AMZ_RESTORE_EXPIRY_DAYS: &str = "X-Amz-Restore-Expiry-Days"; pub const AMZ_RESTORE_REQUEST_DATE: &str = "X-Amz-Restore-Request-Date";