mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-16 17:20:33 +00:00
merge main
This commit is contained in:
@@ -306,7 +306,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
|
||||
versions_count = versions_count.saturating_add(1);
|
||||
|
||||
if latest_file_info.is_none()
|
||||
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false)
|
||||
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false, false)
|
||||
{
|
||||
latest_file_info = Some(info);
|
||||
}
|
||||
|
||||
@@ -96,22 +96,22 @@ impl DiskHealthTracker {
|
||||
|
||||
/// Check if disk is faulty
|
||||
pub fn is_faulty(&self) -> bool {
|
||||
self.status.load(Ordering::Relaxed) == DISK_HEALTH_FAULTY
|
||||
self.status.load(Ordering::Acquire) == DISK_HEALTH_FAULTY
|
||||
}
|
||||
|
||||
/// Set disk as faulty
|
||||
pub fn set_faulty(&self) {
|
||||
self.status.store(DISK_HEALTH_FAULTY, Ordering::Relaxed);
|
||||
self.status.store(DISK_HEALTH_FAULTY, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Set disk as OK
|
||||
pub fn set_ok(&self) {
|
||||
self.status.store(DISK_HEALTH_OK, Ordering::Relaxed);
|
||||
self.status.store(DISK_HEALTH_OK, Ordering::Release);
|
||||
}
|
||||
|
||||
pub fn swap_ok_to_faulty(&self) -> bool {
|
||||
self.status
|
||||
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ impl DiskHealthTracker {
|
||||
|
||||
/// Get last success timestamp
|
||||
pub fn last_success(&self) -> i64 {
|
||||
self.last_success.load(Ordering::Relaxed)
|
||||
self.last_success.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use super::{
|
||||
};
|
||||
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
|
||||
|
||||
use crate::config::storageclass::DEFAULT_INLINE_BLOCK;
|
||||
use crate::data_usage::local_snapshot::ensure_data_usage_layout;
|
||||
use crate::disk::error::FileAccessDeniedWithContext;
|
||||
use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error};
|
||||
@@ -489,9 +490,17 @@ impl LocalDisk {
|
||||
let file_dir = self.get_bucket_path(volume)?;
|
||||
let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?;
|
||||
|
||||
get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data })
|
||||
.await
|
||||
.map_err(|_e| DiskError::Unexpected)
|
||||
get_file_info(
|
||||
&data,
|
||||
volume,
|
||||
path,
|
||||
version_id,
|
||||
FileInfoOpts {
|
||||
data: opts.read_data,
|
||||
include_free_versions: false,
|
||||
},
|
||||
)
|
||||
.map_err(|_e| DiskError::Unexpected)
|
||||
}
|
||||
|
||||
// Batch metadata reading for multiple objects
|
||||
@@ -2236,20 +2245,93 @@ impl DiskAPI for LocalDisk {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn read_version(
|
||||
&self,
|
||||
_org_volume: &str,
|
||||
org_volume: &str,
|
||||
volume: &str,
|
||||
path: &str,
|
||||
version_id: &str,
|
||||
opts: &ReadOptions,
|
||||
) -> Result<FileInfo> {
|
||||
if !org_volume.is_empty() {
|
||||
let org_volume_path = self.get_bucket_path(org_volume)?;
|
||||
if !skip_access_checks(org_volume) {
|
||||
access(&org_volume_path)
|
||||
.await
|
||||
.map_err(|e| to_access_error(e, DiskError::VolumeAccessDenied))?;
|
||||
}
|
||||
}
|
||||
|
||||
let file_path = self.get_object_path(volume, path)?;
|
||||
let file_dir = self.get_bucket_path(volume)?;
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
|
||||
check_path_length(file_path.to_string_lossy().as_ref())?;
|
||||
|
||||
let read_data = opts.read_data;
|
||||
|
||||
let (data, _) = self.read_raw(volume, file_dir, file_path, read_data).await?;
|
||||
let (data, _) = self
|
||||
.read_raw(volume, volume_dir.clone(), file_path, read_data)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e == DiskError::FileNotFound && !version_id.is_empty() {
|
||||
DiskError::FileVersionNotFound
|
||||
} else {
|
||||
e
|
||||
}
|
||||
})?;
|
||||
|
||||
let fi = get_file_info(&data, volume, path, version_id, FileInfoOpts { data: read_data }).await?;
|
||||
let mut fi = get_file_info(
|
||||
&data,
|
||||
volume,
|
||||
path,
|
||||
version_id,
|
||||
FileInfoOpts {
|
||||
data: read_data,
|
||||
include_free_versions: opts.incl_free_versions,
|
||||
},
|
||||
)?;
|
||||
|
||||
if opts.read_data {
|
||||
if fi.data.as_ref().is_some_and(|d| !d.is_empty()) || fi.size == 0 {
|
||||
if fi.inline_data() {
|
||||
return Ok(fi);
|
||||
}
|
||||
|
||||
if fi.size == 0 || fi.version_id.is_none_or(|v| v.is_nil()) {
|
||||
fi.set_inline_data();
|
||||
return Ok(fi);
|
||||
};
|
||||
if let Some(part) = fi.parts.first() {
|
||||
let part_path = format!("part.{}", part.number);
|
||||
let part_path = path_join_buf(&[
|
||||
path,
|
||||
fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(),
|
||||
part_path.as_str(),
|
||||
]);
|
||||
let part_path = self.get_object_path(volume, part_path.as_str())?;
|
||||
if lstat(&part_path).await.is_err() {
|
||||
fi.set_inline_data();
|
||||
return Ok(fi);
|
||||
}
|
||||
}
|
||||
|
||||
fi.data = None;
|
||||
}
|
||||
|
||||
let inline = fi.transition_status.is_empty() && fi.data_dir.is_some() && fi.parts.len() == 1;
|
||||
if inline && fi.shard_file_size(fi.parts[0].actual_size) < DEFAULT_INLINE_BLOCK as i64 {
|
||||
let part_path = path_join_buf(&[
|
||||
path,
|
||||
fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(),
|
||||
format!("part.{}", fi.parts[0].number).as_str(),
|
||||
]);
|
||||
let part_path = self.get_object_path(volume, part_path.as_str())?;
|
||||
|
||||
let data = self.read_all_data(volume, volume_dir, part_path.clone()).await.map_err(|e| {
|
||||
warn!("read_version read_all_data {:?} failed: {e}", part_path);
|
||||
e
|
||||
})?;
|
||||
fi.data = Some(Bytes::from(data));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(fi)
|
||||
}
|
||||
|
||||
@@ -1615,7 +1615,7 @@ impl SetDisks {
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
read_data: bool,
|
||||
_incl_free_vers: bool,
|
||||
incl_free_vers: bool,
|
||||
) -> (Vec<FileInfo>, Vec<Option<DiskError>>) {
|
||||
let mut metadata_array = vec![None; fileinfos.len()];
|
||||
let mut meta_file_infos = vec![FileInfo::default(); fileinfos.len()];
|
||||
@@ -1665,7 +1665,7 @@ impl SetDisks {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let finfo = match meta.into_fileinfo(bucket, object, "", true, true) {
|
||||
let finfo = match meta.into_fileinfo(bucket, object, "", true, incl_free_vers, true) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
for item in errs.iter_mut() {
|
||||
@@ -1692,7 +1692,7 @@ impl SetDisks {
|
||||
|
||||
for (idx, meta_op) in metadata_array.iter().enumerate() {
|
||||
if let Some(meta) = meta_op {
|
||||
match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, true) {
|
||||
match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, incl_free_vers, true) {
|
||||
Ok(res) => meta_file_infos[idx] = res,
|
||||
Err(err) => errs[idx] = Some(err.into()),
|
||||
}
|
||||
|
||||
@@ -28,15 +28,16 @@ use http::{HeaderMap, HeaderValue};
|
||||
use rustfs_common::heal_channel::HealOpts;
|
||||
use rustfs_filemeta::{
|
||||
FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState,
|
||||
ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
|
||||
ReplicationStatusType, RestoreStatusOps as _, VersionPurgeStatusType, parse_restore_obj_status, replication_statuses_map,
|
||||
version_purge_statuses_map,
|
||||
};
|
||||
use rustfs_lock::FastLockGuard;
|
||||
use rustfs_madmin::heal_commands::HealResultItem;
|
||||
use rustfs_rio::Checksum;
|
||||
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::http::AMZ_STORAGE_CLASS;
|
||||
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_RESTORE, AMZ_STORAGE_CLASS};
|
||||
use rustfs_utils::path::decode_dir_object;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
@@ -757,7 +758,24 @@ impl ObjectInfo {
|
||||
.ok()
|
||||
});
|
||||
|
||||
// TODO:ReplicationState
|
||||
let replication_status_internal = fi
|
||||
.replication_state_internal
|
||||
.as_ref()
|
||||
.and_then(|v| v.replication_status_internal.clone());
|
||||
let version_purge_status_internal = fi
|
||||
.replication_state_internal
|
||||
.as_ref()
|
||||
.and_then(|v| v.version_purge_status_internal.clone());
|
||||
|
||||
let mut replication_status = fi.replication_status();
|
||||
if replication_status.is_empty()
|
||||
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS).cloned()
|
||||
&& status == ReplicationStatusType::Replica.as_str()
|
||||
{
|
||||
replication_status = ReplicationStatusType::Replica;
|
||||
}
|
||||
|
||||
let version_purge_status = fi.version_purge_status();
|
||||
|
||||
let transitioned_object = TransitionedObject {
|
||||
name: fi.transitioned_objname.clone(),
|
||||
@@ -778,10 +796,24 @@ impl ObjectInfo {
|
||||
};
|
||||
|
||||
// Extract storage class from metadata, default to STANDARD if not found
|
||||
let storage_class = metadata
|
||||
.get(AMZ_STORAGE_CLASS)
|
||||
.cloned()
|
||||
.or_else(|| Some(storageclass::STANDARD.to_string()));
|
||||
let storage_class = if !fi.transition_tier.is_empty() {
|
||||
Some(fi.transition_tier.clone())
|
||||
} else {
|
||||
fi.metadata
|
||||
.get(AMZ_STORAGE_CLASS)
|
||||
.cloned()
|
||||
.or_else(|| Some(storageclass::STANDARD.to_string()))
|
||||
};
|
||||
|
||||
let mut restore_ongoing = false;
|
||||
let mut restore_expires = None;
|
||||
if let Some(restore_status) = fi.metadata.get(AMZ_RESTORE).cloned() {
|
||||
//
|
||||
if let Ok(restore_status) = parse_restore_obj_status(&restore_status) {
|
||||
restore_ongoing = restore_status.on_going();
|
||||
restore_expires = restore_status.expiry();
|
||||
}
|
||||
}
|
||||
|
||||
// Convert parts from rustfs_filemeta::ObjectPartInfo to store_api::ObjectPartInfo
|
||||
let parts = fi
|
||||
@@ -799,6 +831,8 @@ impl ObjectInfo {
|
||||
})
|
||||
.collect();
|
||||
|
||||
// TODO: part checksums
|
||||
|
||||
ObjectInfo {
|
||||
bucket: bucket.to_string(),
|
||||
name,
|
||||
@@ -823,6 +857,12 @@ impl ObjectInfo {
|
||||
transitioned_object,
|
||||
checksum: fi.checksum.clone(),
|
||||
storage_class,
|
||||
restore_ongoing,
|
||||
restore_expires,
|
||||
replication_status_internal,
|
||||
replication_status,
|
||||
version_purge_status_internal,
|
||||
version_purge_status,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -505,6 +505,10 @@ impl FileInfo {
|
||||
ReplicationStatusType::Empty
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shard_file_size(&self, total_length: i64) -> i64 {
|
||||
self.erasure.shard_file_size(total_length)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
@@ -590,7 +594,7 @@ impl RestoreStatusOps for RestoreStatus {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
|
||||
pub fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
|
||||
let tokens: Vec<&str> = restore_hdr.splitn(2, ",").collect();
|
||||
let progress_tokens: Vec<&str> = tokens[0].splitn(2, "=").collect();
|
||||
if progress_tokens.len() != 2 {
|
||||
|
||||
@@ -14,7 +14,8 @@
|
||||
|
||||
use crate::{
|
||||
ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState,
|
||||
ReplicationStatusType, Result, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
|
||||
ReplicationStatusType, Result, TIER_FV_ID, TIER_FV_MARKER, VersionPurgeStatusType, replication_statuses_map,
|
||||
version_purge_statuses_map,
|
||||
};
|
||||
use byteorder::ByteOrder;
|
||||
use bytes::Bytes;
|
||||
@@ -909,6 +910,7 @@ impl FileMeta {
|
||||
path: &str,
|
||||
version_id: &str,
|
||||
read_data: bool,
|
||||
include_free_versions: bool,
|
||||
all_parts: bool,
|
||||
) -> Result<FileInfo> {
|
||||
let vid = {
|
||||
@@ -921,11 +923,35 @@ impl FileMeta {
|
||||
|
||||
let mut is_latest = true;
|
||||
let mut succ_mod_time = None;
|
||||
let mut non_free_versions = self.versions.len();
|
||||
|
||||
let mut found = false;
|
||||
let mut found_free_version = None;
|
||||
let mut found_fi = None;
|
||||
|
||||
for ver in self.versions.iter() {
|
||||
let header = &ver.header;
|
||||
|
||||
// TODO: freeVersion
|
||||
if header.free_version() {
|
||||
non_free_versions -= 1;
|
||||
if include_free_versions && found_free_version.is_none() {
|
||||
let mut found_free_fi = FileMetaVersion::default();
|
||||
if found_free_fi.unmarshal_msg(&ver.meta).is_ok() && found_free_fi.version_type != VersionType::Invalid {
|
||||
let mut free_fi = found_free_fi.into_fileinfo(volume, path, all_parts);
|
||||
free_fi.is_latest = true;
|
||||
found_free_version = Some(free_fi);
|
||||
}
|
||||
}
|
||||
|
||||
if header.version_id != Some(vid) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !version_id.is_empty() && header.version_id != Some(vid) {
|
||||
is_latest = false;
|
||||
@@ -933,6 +959,8 @@ impl FileMeta {
|
||||
continue;
|
||||
}
|
||||
|
||||
found = true;
|
||||
|
||||
let mut fi = ver.into_fileinfo(volume, path, all_parts)?;
|
||||
fi.is_latest = is_latest;
|
||||
|
||||
@@ -947,7 +975,25 @@ impl FileMeta {
|
||||
.map(bytes::Bytes::from);
|
||||
}
|
||||
|
||||
fi.num_versions = self.versions.len();
|
||||
found_fi = Some(fi);
|
||||
}
|
||||
|
||||
if !found {
|
||||
if version_id.is_empty() {
|
||||
if include_free_versions
|
||||
&& non_free_versions == 0
|
||||
&& let Some(free_version) = found_free_version
|
||||
{
|
||||
return Ok(free_version);
|
||||
}
|
||||
return Err(Error::FileNotFound);
|
||||
} else {
|
||||
return Err(Error::FileVersionNotFound);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(mut fi) = found_fi {
|
||||
fi.num_versions = non_free_versions;
|
||||
|
||||
return Ok(fi);
|
||||
}
|
||||
@@ -1802,14 +1848,27 @@ impl MetaObject {
|
||||
metadata.insert(k.to_owned(), v.to_owned());
|
||||
}
|
||||
|
||||
let tier_fvidkey = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}").to_lowercase();
|
||||
let tier_fvmarker_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}").to_lowercase();
|
||||
|
||||
for (k, v) in &self.meta_sys {
|
||||
if k == AMZ_STORAGE_CLASS && v == b"STANDARD" {
|
||||
let lower_k = k.to_lowercase();
|
||||
|
||||
if lower_k == tier_fvidkey || lower_k == tier_fvmarker_key {
|
||||
continue;
|
||||
}
|
||||
|
||||
if lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if lower_k == AMZ_STORAGE_CLASS.to_lowercase() && v == b"STANDARD" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if k.starts_with(RESERVED_METADATA_PREFIX)
|
||||
|| k.starts_with(RESERVED_METADATA_PREFIX_LOWER)
|
||||
|| k == VERSION_PURGE_STATUS_KEY
|
||||
|| lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase()
|
||||
{
|
||||
metadata.insert(k.to_owned(), String::from_utf8(v.to_owned()).unwrap_or_default());
|
||||
}
|
||||
@@ -2546,15 +2605,31 @@ pub fn merge_file_meta_versions(
|
||||
merged
|
||||
}
|
||||
|
||||
pub async fn file_info_from_raw(ri: RawFileInfo, bucket: &str, object: &str, read_data: bool) -> Result<FileInfo> {
|
||||
get_file_info(&ri.buf, bucket, object, "", FileInfoOpts { data: read_data }).await
|
||||
pub fn file_info_from_raw(
|
||||
ri: RawFileInfo,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
read_data: bool,
|
||||
include_free_versions: bool,
|
||||
) -> Result<FileInfo> {
|
||||
get_file_info(
|
||||
&ri.buf,
|
||||
bucket,
|
||||
object,
|
||||
"",
|
||||
FileInfoOpts {
|
||||
data: read_data,
|
||||
include_free_versions,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub struct FileInfoOpts {
|
||||
pub data: bool,
|
||||
pub include_free_versions: bool,
|
||||
}
|
||||
|
||||
pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result<FileInfo> {
|
||||
pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result<FileInfo> {
|
||||
let vid = {
|
||||
if version_id.is_empty() {
|
||||
None
|
||||
@@ -2576,7 +2651,7 @@ pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &st
|
||||
});
|
||||
}
|
||||
|
||||
let fi = meta.into_fileinfo(volume, path, version_id, opts.data, true)?;
|
||||
let fi = meta.into_fileinfo(volume, path, version_id, opts.data, opts.include_free_versions, true)?;
|
||||
Ok(fi)
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{Error, FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, merge_file_meta_versions};
|
||||
use crate::{
|
||||
Error, FileInfo, FileInfoOpts, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, get_file_info,
|
||||
merge_file_meta_versions,
|
||||
};
|
||||
use rmp::Marker;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::Ordering;
|
||||
@@ -141,8 +144,7 @@ impl MetaCacheEntry {
|
||||
});
|
||||
}
|
||||
|
||||
if self.cached.is_some() {
|
||||
let fm = self.cached.as_ref().unwrap();
|
||||
if let Some(fm) = &self.cached {
|
||||
if fm.versions.is_empty() {
|
||||
return Ok(FileInfo {
|
||||
volume: bucket.to_owned(),
|
||||
@@ -154,14 +156,20 @@ impl MetaCacheEntry {
|
||||
});
|
||||
}
|
||||
|
||||
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
|
||||
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false, true)?;
|
||||
return Ok(fi);
|
||||
}
|
||||
|
||||
let mut fm = FileMeta::new();
|
||||
fm.unmarshal_msg(&self.metadata)?;
|
||||
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
|
||||
Ok(fi)
|
||||
get_file_info(
|
||||
&self.metadata,
|
||||
bucket,
|
||||
self.name.as_str(),
|
||||
"",
|
||||
FileInfoOpts {
|
||||
data: false,
|
||||
include_free_versions: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn file_info_versions(&self, bucket: &str) -> Result<FileInfoVersions> {
|
||||
|
||||
@@ -63,16 +63,23 @@ pub struct Policy {
|
||||
|
||||
impl Policy {
|
||||
pub async fn is_allowed(&self, args: &Args<'_>) -> bool {
|
||||
// First, check all Deny statements - if any Deny matches, deny the request
|
||||
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Deny)) {
|
||||
if !statement.is_allowed(args).await {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if args.deny_only || args.is_owner {
|
||||
// Owner has all permissions
|
||||
if args.is_owner {
|
||||
return true;
|
||||
}
|
||||
|
||||
if args.deny_only {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check Allow statements
|
||||
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Allow)) {
|
||||
if statement.is_allowed(args).await {
|
||||
return true;
|
||||
@@ -594,6 +601,102 @@ mod test {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_deny_only_security_fix() -> Result<()> {
|
||||
let data = r#"
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": ["s3:GetObject"],
|
||||
"Resource": ["arn:aws:s3:::bucket1/*"]
|
||||
}
|
||||
]
|
||||
}
|
||||
"#;
|
||||
|
||||
let policy = Policy::parse_config(data.as_bytes())?;
|
||||
let conditions = HashMap::new();
|
||||
let claims = HashMap::new();
|
||||
|
||||
// Test with deny_only=true but no matching Allow statement
|
||||
let args_deny_only = Args {
|
||||
account: "testuser",
|
||||
groups: &None,
|
||||
action: Action::S3Action(crate::policy::action::S3Action::PutObjectAction),
|
||||
bucket: "bucket2",
|
||||
conditions: &conditions,
|
||||
is_owner: false,
|
||||
object: "test.txt",
|
||||
claims: &claims,
|
||||
deny_only: true, // Should NOT automatically allow
|
||||
};
|
||||
|
||||
// Should return false because deny_only=true, regardless of whether there's a matching Allow statement
|
||||
assert!(
|
||||
!policy.is_allowed(&args_deny_only).await,
|
||||
"deny_only should return false when deny_only=true, regardless of Allow statements"
|
||||
);
|
||||
|
||||
// Test with deny_only=true and matching Allow statement
|
||||
let args_deny_only_allowed = Args {
|
||||
account: "testuser",
|
||||
groups: &None,
|
||||
action: Action::S3Action(crate::policy::action::S3Action::GetObjectAction),
|
||||
bucket: "bucket1",
|
||||
conditions: &conditions,
|
||||
is_owner: false,
|
||||
object: "test.txt",
|
||||
claims: &claims,
|
||||
deny_only: true,
|
||||
};
|
||||
|
||||
// Should return false because deny_only=true prevents checking Allow statements (unless is_owner=true)
|
||||
assert!(
|
||||
!policy.is_allowed(&args_deny_only_allowed).await,
|
||||
"deny_only should return false even with matching Allow statement"
|
||||
);
|
||||
|
||||
// Test with deny_only=false (normal case)
|
||||
let args_normal = Args {
|
||||
account: "testuser",
|
||||
groups: &None,
|
||||
action: Action::S3Action(crate::policy::action::S3Action::GetObjectAction),
|
||||
bucket: "bucket1",
|
||||
conditions: &conditions,
|
||||
is_owner: false,
|
||||
object: "test.txt",
|
||||
claims: &claims,
|
||||
deny_only: false,
|
||||
};
|
||||
|
||||
// Should return true because there's an Allow statement
|
||||
assert!(
|
||||
policy.is_allowed(&args_normal).await,
|
||||
"normal policy evaluation should allow with matching Allow statement"
|
||||
);
|
||||
|
||||
let args_owner_deny_only = Args {
|
||||
account: "testuser",
|
||||
groups: &None,
|
||||
action: Action::S3Action(crate::policy::action::S3Action::PutObjectAction),
|
||||
bucket: "bucket2",
|
||||
conditions: &conditions,
|
||||
is_owner: true, // Owner has all permissions
|
||||
object: "test.txt",
|
||||
claims: &claims,
|
||||
deny_only: true, // Even with deny_only=true, owner should be allowed
|
||||
};
|
||||
|
||||
assert!(
|
||||
policy.is_allowed(&args_owner_deny_only).await,
|
||||
"owner should retain all permissions even when deny_only=true"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_aws_username_policy_variable() -> Result<()> {
|
||||
let data = r#"
|
||||
|
||||
@@ -51,6 +51,7 @@ pub const AMZ_TAG_COUNT: &str = "x-amz-tagging-count";
|
||||
pub const AMZ_TAG_DIRECTIVE: &str = "X-Amz-Tagging-Directive";
|
||||
|
||||
// S3 transition restore
|
||||
pub const AMZ_RESTORE: &str = "x-amz-restore";
|
||||
pub const AMZ_RESTORE_EXPIRY_DAYS: &str = "X-Amz-Restore-Expiry-Days";
|
||||
pub const AMZ_RESTORE_REQUEST_DATE: &str = "X-Amz-Restore-Request-Date";
|
||||
|
||||
|
||||
@@ -112,8 +112,6 @@ impl Operation for AddServiceAccount {
|
||||
return Err(s3_error!(InvalidRequest, "iam not init"));
|
||||
};
|
||||
|
||||
let deny_only = constant_time_eq(&cred.access_key, &target_user) || constant_time_eq(&cred.parent_user, &target_user);
|
||||
|
||||
if !iam_store
|
||||
.is_allowed(&Args {
|
||||
account: &cred.access_key,
|
||||
@@ -130,7 +128,7 @@ impl Operation for AddServiceAccount {
|
||||
is_owner: owner,
|
||||
object: "",
|
||||
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
|
||||
deny_only,
|
||||
deny_only: false, // Always require explicit Allow permission
|
||||
})
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -118,12 +118,14 @@ impl Operation for AddUser {
|
||||
return Err(s3_error!(InvalidArgument, "access key is not utf8"));
|
||||
}
|
||||
|
||||
let deny_only = ak == cred.access_key;
|
||||
// Security fix: Always require explicit Allow permission for CreateUser
|
||||
// Do not use deny_only to bypass permission checks, even when creating for self
|
||||
// This ensures consistent security semantics and prevents privilege escalation
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
deny_only,
|
||||
false, // Always require explicit Allow permission
|
||||
vec![Action::AdminAction(AdminAction::CreateUserAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
@@ -375,12 +377,14 @@ impl Operation for GetUserInfo {
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let deny_only = ak == cred.access_key;
|
||||
// Security fix: Always require explicit Allow permission for GetUser
|
||||
// Users should have explicit GetUser permission to view account information
|
||||
// This ensures consistent security semantics across all admin operations
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
deny_only,
|
||||
false, // Always require explicit Allow permission
|
||||
vec![Action::AdminAction(AdminAction::GetUserAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
@@ -651,7 +655,7 @@ impl Operation for ImportIam {
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ExportIAMAction)],
|
||||
vec![Action::AdminAction(AdminAction::ImportIAMAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -743,10 +743,26 @@ fn check_auth(req: Request<()>) -> std::result::Result<Request<()>, Status> {
|
||||
Ok(req)
|
||||
}
|
||||
|
||||
/// Determines the listen backlog size.
|
||||
///
|
||||
/// It tries to read the system's maximum connection queue length (`somaxconn`).
|
||||
/// If reading fails, it falls back to a default value (e.g., 1024).
|
||||
/// This makes the backlog size adaptive to the system configuration.
|
||||
#[cfg(target_os = "linux")]
|
||||
fn get_listen_backlog() -> i32 {
|
||||
const DEFAULT_BACKLOG: i32 = 1024;
|
||||
|
||||
// For Linux, read from /proc/sys/net/core/somaxconn
|
||||
match std::fs::read_to_string("/proc/sys/net/core/somaxconn") {
|
||||
Ok(s) => s.trim().parse().unwrap_or(DEFAULT_BACKLOG),
|
||||
Err(_) => DEFAULT_BACKLOG,
|
||||
}
|
||||
}
|
||||
|
||||
// For macOS and BSD variants use the syscall way of getting the connection queue length.
|
||||
#[cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
|
||||
#[allow(unsafe_code)]
|
||||
fn get_conn_queue_len() -> i32 {
|
||||
fn get_listen_backlog() -> i32 {
|
||||
const DEFAULT_BACKLOG: i32 = 1024;
|
||||
|
||||
#[cfg(target_os = "openbsd")]
|
||||
@@ -773,37 +789,15 @@ fn get_conn_queue_len() -> i32 {
|
||||
buf[0]
|
||||
}
|
||||
|
||||
/// Determines the listen backlog size.
|
||||
///
|
||||
/// It tries to read the system's maximum connection queue length (`somaxconn`).
|
||||
/// If reading fails, it falls back to a default value (e.g., 1024).
|
||||
/// This makes the backlog size adaptive to the system configuration.
|
||||
// Fallback for Windows and other operating systems
|
||||
#[cfg(not(any(
|
||||
target_os = "linux",
|
||||
target_os = "macos",
|
||||
target_os = "freebsd",
|
||||
target_os = "netbsd",
|
||||
target_os = "openbsd"
|
||||
)))]
|
||||
fn get_listen_backlog() -> i32 {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
const DEFAULT_BACKLOG: i32 = 1024;
|
||||
|
||||
// For Linux, read from /proc/sys/net/core/somaxconn
|
||||
match std::fs::read_to_string("/proc/sys/net/core/somaxconn") {
|
||||
Ok(s) => s.trim().parse().unwrap_or(DEFAULT_BACKLOG),
|
||||
Err(_) => DEFAULT_BACKLOG,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
|
||||
{
|
||||
get_conn_queue_len()
|
||||
}
|
||||
|
||||
#[cfg(not(any(
|
||||
target_os = "linux",
|
||||
target_os = "macos",
|
||||
target_os = "freebsd",
|
||||
target_os = "netbsd",
|
||||
target_os = "openbsd"
|
||||
)))]
|
||||
{
|
||||
// Fallback for Windows and other operating systems
|
||||
DEFAULT_BACKLOG
|
||||
}
|
||||
const DEFAULT_BACKLOG: i32 = 1024;
|
||||
DEFAULT_BACKLOG
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
# - Metadata: User-defined metadata
|
||||
# - Conditional GET: If-Match, If-None-Match, If-Modified-Since
|
||||
#
|
||||
# Total: 109 tests
|
||||
# Total: 118 tests
|
||||
|
||||
test_basic_key_count
|
||||
test_bucket_create_naming_bad_short_one
|
||||
@@ -63,6 +63,15 @@ test_bucket_list_prefix_none
|
||||
test_bucket_list_prefix_not_exist
|
||||
test_bucket_list_prefix_unreadable
|
||||
test_bucket_list_special_prefix
|
||||
test_bucket_list_delimiter_alt
|
||||
test_bucket_list_delimiter_dot
|
||||
test_bucket_list_delimiter_empty
|
||||
test_bucket_list_delimiter_none
|
||||
test_bucket_list_delimiter_not_exist
|
||||
test_bucket_list_delimiter_percentage
|
||||
test_bucket_list_delimiter_prefix_ends_with_delimiter
|
||||
test_bucket_list_delimiter_unreadable
|
||||
test_bucket_list_delimiter_whitespace
|
||||
test_bucket_listv2_continuationtoken
|
||||
test_bucket_listv2_continuationtoken_empty
|
||||
test_bucket_listv2_fetchowner_defaultempty
|
||||
|
||||
@@ -105,16 +105,8 @@ test_versioning_obj_plain_null_version_removal
|
||||
test_versioning_obj_suspend_versions
|
||||
|
||||
# Teardown issues (list_object_versions on non-versioned buckets)
|
||||
test_bucket_list_delimiter_alt
|
||||
# These tests pass but have cleanup issues with list_object_versions
|
||||
test_bucket_list_delimiter_basic
|
||||
test_bucket_list_delimiter_dot
|
||||
test_bucket_list_delimiter_empty
|
||||
test_bucket_list_delimiter_none
|
||||
test_bucket_list_delimiter_not_exist
|
||||
test_bucket_list_delimiter_percentage
|
||||
test_bucket_list_delimiter_prefix_ends_with_delimiter
|
||||
test_bucket_list_delimiter_unreadable
|
||||
test_bucket_list_delimiter_whitespace
|
||||
test_bucket_list_encoding_basic
|
||||
test_bucket_listv2_delimiter_alt
|
||||
test_bucket_listv2_delimiter_basic
|
||||
|
||||
Reference in New Issue
Block a user