move disk::utils to crates::utils

This commit is contained in:
weisd
2025-06-11 11:50:14 +08:00
parent 2eeb9dbcbc
commit e8a59d7c07
57 changed files with 289 additions and 1158 deletions

20
Cargo.lock generated
View File

@@ -4918,6 +4918,7 @@ dependencies = [
"policy",
"rand 0.9.1",
"regex",
"rustfs-utils",
"serde",
"serde_json",
"strum",
@@ -8445,6 +8446,7 @@ dependencies = [
"rustfs-utils",
"snap",
"tokio",
"tokio-test",
"tokio-util",
"zstd",
]
@@ -8480,19 +8482,24 @@ dependencies = [
name = "rustfs-utils"
version = "0.0.1"
dependencies = [
"base64-simd",
"blake3",
"crc32fast",
"hex-simd",
"highway",
"lazy_static",
"local-ip-address",
"md-5",
"netif",
"nix 0.30.1",
"regex",
"rustfs-config",
"rustls 0.23.27",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"serde",
"sha2 0.10.9",
"siphasher 1.0.1",
"tempfile",
"tokio",
"tracing",
@@ -10035,6 +10042,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-test"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-util"
version = "0.7.15"

View File

@@ -148,6 +148,7 @@ opentelemetry-semantic-conventions = { version = "0.30.0", features = [
parking_lot = "0.12.4"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
blake3 = { version = "1.8.2" }
pbkdf2 = "0.12.2"
percent-encoding = "2.3.1"
pin-project-lite = "0.2.16"

View File

@@ -102,8 +102,8 @@ pub fn get_logger() -> &'static Arc<tokio::sync::Mutex<Logger>> {
/// ```rust
/// use rustfs_obs::{ init_obs, set_global_guard};
///
/// fn init() -> Result<(), Box<dyn std::error::Error>> {
/// let guard = init_obs(None);
/// async fn init() -> Result<(), Box<dyn std::error::Error>> {
/// let (_, guard) = init_obs(None).await;
/// set_global_guard(guard)?;
/// Ok(())
/// }

View File

@@ -34,3 +34,4 @@ rustfs-utils = {workspace = true, features= ["io","hash"]}
[dev-dependencies]
criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] }
tokio-test = "0.4"

View File

@@ -16,8 +16,17 @@ The `EtagResolvable` trait provides a clean way to handle recursive unwrapping:
## Usage Examples
```rust
use rustfs_rio::{CompressReader, EtagReader, resolve_etag_generic};
use rustfs_rio::compress::CompressionAlgorithm;
use tokio::io::BufReader;
use std::io::Cursor;
// Direct usage with trait-based approach
let mut reader = CompressReader::new(EtagReader::new(some_async_read, Some("test_etag".to_string())));
let data = b"test data";
let reader = BufReader::new(Cursor::new(&data[..]));
let reader = Box::new(reader);
let etag_reader = EtagReader::new(reader, Some("test_etag".to_string()));
let mut reader = CompressReader::new(etag_reader, CompressionAlgorithm::Gzip);
let etag = resolve_etag_generic(&mut reader);
```
*/

View File

@@ -28,33 +28,35 @@
//! # tokio_test::block_on(async {
//! let data = b"hello world";
//! let reader = BufReader::new(Cursor::new(&data[..]));
//! let reader = Box::new(reader);
//! let size = data.len() as i64;
//! let actual_size = size;
//! let etag = None;
//! let diskable_md5 = false;
//!
//! // Method 1: Simple creation (recommended for most cases)
//! let hash_reader = HashReader::new(reader, size, actual_size, etag, diskable_md5);
//! let hash_reader = HashReader::new(reader, size, actual_size, etag.clone(), diskable_md5).unwrap();
//!
//! // Method 2: With manual wrapping to recreate original logic
//! let reader2 = BufReader::new(Cursor::new(&data[..]));
//! let wrapped_reader = if size > 0 {
//! let reader2 = Box::new(reader2);
//! let wrapped_reader: Box<dyn rustfs_rio::Reader> = if size > 0 {
//! if !diskable_md5 {
//! // Wrap with both HardLimitReader and EtagReader
//! let hard_limit = HardLimitReader::new(reader2, size);
//! EtagReader::new(hard_limit, etag.clone())
//! Box::new(EtagReader::new(Box::new(hard_limit), etag.clone()))
//! } else {
//! // Only wrap with HardLimitReader
//! HardLimitReader::new(reader2, size)
//! Box::new(HardLimitReader::new(reader2, size))
//! }
//! } else if !diskable_md5 {
//! // Only wrap with EtagReader
//! EtagReader::new(reader2, etag.clone())
//! Box::new(EtagReader::new(reader2, etag.clone()))
//! } else {
//! // No wrapping needed
//! reader2
//! };
//! let hash_reader2 = HashReader::new(wrapped_reader, size, actual_size, etag, diskable_md5);
//! let hash_reader2 = HashReader::new(wrapped_reader, size, actual_size, etag, diskable_md5).unwrap();
//! # });
//! ```
//!
@@ -70,14 +72,14 @@
//! # tokio_test::block_on(async {
//! let data = b"test";
//! let reader = BufReader::new(Cursor::new(&data[..]));
//! let hash_reader = HashReader::new(reader, 4, 4, None, false);
//! let hash_reader = HashReader::new(Box::new(reader), 4, 4, None, false).unwrap();
//!
//! // Check if a type is a HashReader
//! assert!(hash_reader.is_hash_reader());
//!
//! // Use new for compatibility (though it's simpler to use new() directly)
//! let reader2 = BufReader::new(Cursor::new(&data[..]));
//! let result = HashReader::new(reader2, 4, 4, None, false);
//! let result = HashReader::new(Box::new(reader2), 4, 4, None, false);
//! assert!(result.is_ok());
//! # });
//! ```

View File

@@ -7,19 +7,24 @@ rust-version.workspace = true
version.workspace = true
[dependencies]
blake3 = { version = "1.8.2", optional = true }
base64-simd= { workspace = true , optional = true}
blake3 = { workspace = true, optional = true }
crc32fast.workspace = true
hex-simd= { workspace = true , optional = true}
highway = { workspace = true, optional = true }
lazy_static= { workspace = true , optional = true}
local-ip-address = { workspace = true, optional = true }
md-5 = { workspace = true, optional = true }
netif= { workspace = true , optional = true}
nix = { workspace = true, optional = true }
regex= { workspace = true, optional = true }
rustfs-config = { workspace = true }
rustls = { workspace = true, optional = true }
rustls-pemfile = { workspace = true, optional = true }
rustls-pki-types = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
siphasher = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["io-util", "macros"] }
tracing = { workspace = true }
@@ -42,7 +47,9 @@ tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls charac
net = ["ip","dep:url", "dep:netif", "dep:lazy_static"] # empty network features
io = ["dep:tokio"]
path = []
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde"]
string = ["dep:regex","dep:lazy_static"]
crypto = ["dep:base64-simd","dep:hex-simd"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher"]
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
integration = [] # integration test features
full = ["ip", "tls", "net", "io","hash", "os", "integration","path"] # all features
full = ["ip", "tls", "net", "io","hash", "os", "integration","path","crypto", "string"] # all features

View File

@@ -58,6 +58,28 @@ impl HashAlgorithm {
}
}
use crc32fast::Hasher;
use siphasher::sip::SipHasher;
pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize {
// 你的密钥,必须是 16 字节
// 计算字符串的 SipHash 值
let result = SipHasher::new_with_key(id).hash(key.as_bytes());
result as usize % cardinality
}
pub fn crc_hash(key: &str, cardinality: usize) -> usize {
let mut hasher = Hasher::new(); // 创建一个新的哈希器
hasher.update(key.as_bytes()); // 更新哈希状态,添加数据
let checksum = hasher.finalize();
checksum as usize % cardinality
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,17 +1,17 @@
#[cfg(feature = "tls")]
mod certs;
pub mod certs;
#[cfg(feature = "ip")]
mod ip;
pub mod ip;
#[cfg(feature = "net")]
mod net;
pub mod net;
#[cfg(feature = "net")]
pub use net::*;
#[cfg(feature = "io")]
mod io;
pub mod io;
#[cfg(feature = "hash")]
mod hash;
pub mod hash;
#[cfg(feature = "os")]
pub mod os;
@@ -19,6 +19,12 @@ pub mod os;
#[cfg(feature = "path")]
pub mod path;
#[cfg(feature = "string")]
pub mod string;
#[cfg(feature = "crypto")]
pub mod crypto;
#[cfg(feature = "tls")]
pub use certs::*;
#[cfg(feature = "hash")]
@@ -27,3 +33,6 @@ pub use hash::*;
pub use io::*;
#[cfg(feature = "ip")]
pub use ip::*;
#[cfg(feature = "crypto")]
pub use crypto::*;

View File

@@ -98,7 +98,7 @@ pub fn get_available_port() -> u16 {
}
/// returns IPs of local interface
pub(crate) fn must_get_local_ips() -> std::io::Result<Vec<IpAddr>> {
pub fn must_get_local_ips() -> std::io::Result<Vec<IpAddr>> {
match netif::up() {
Ok(up) => Ok(up.map(|x| x.address().to_owned()).collect()),
Err(err) => Err(std::io::Error::other(format!("Unable to get IP addresses of this host: {}", err))),

View File

@@ -2,6 +2,82 @@ use lazy_static::*;
use regex::Regex;
use std::io::{Error, Result};
pub fn parse_bool(str: &str) -> Result<bool> {
match str {
"1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => Ok(true),
"0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => Ok(false),
_ => Err(Error::other(format!("ParseBool: parsing {}", str))),
}
}
pub fn match_simple(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
}
if pattern == "*" {
return true;
}
// Do an extended wildcard '*' and '?' match.
deep_match_rune(name.as_bytes(), pattern.as_bytes(), true)
}
pub fn match_pattern(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
}
if pattern == "*" {
return true;
}
// Do an extended wildcard '*' and '?' match.
deep_match_rune(name.as_bytes(), pattern.as_bytes(), false)
}
fn deep_match_rune(str_: &[u8], pattern: &[u8], simple: bool) -> bool {
let (mut str_, mut pattern) = (str_, pattern);
while !pattern.is_empty() {
match pattern[0] as char {
'*' => {
return if pattern.len() == 1 {
true
} else {
deep_match_rune(str_, &pattern[1..], simple)
|| (!str_.is_empty() && deep_match_rune(&str_[1..], pattern, simple))
};
}
'?' => {
if str_.is_empty() {
return simple;
}
}
_ => {
if str_.is_empty() || str_[0] != pattern[0] {
return false;
}
}
}
str_ = &str_[1..];
pattern = &pattern[1..];
}
str_.is_empty() && pattern.is_empty()
}
pub fn match_as_pattern_prefix(pattern: &str, text: &str) -> bool {
let mut i = 0;
while i < text.len() && i < pattern.len() {
match pattern.as_bytes()[i] as char {
'*' => return true,
'?' => i += 1,
_ => {
if pattern.as_bytes()[i] != text.as_bytes()[i] {
return false;
}
}
}
i += 1;
}
text.len() <= pattern.len()
}
lazy_static! {
static ref ELLIPSES_RE: Regex = Regex::new(r"(.*)(\{[0-9a-z]*\.\.\.[0-9a-z]*\})(.*)").unwrap();
}
@@ -15,9 +91,9 @@ const ELLIPSES: &str = "...";
/// associated prefix and suffixes.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Pattern {
pub(crate) prefix: String,
pub(crate) suffix: String,
pub(crate) seq: Vec<String>,
pub prefix: String,
pub suffix: String,
pub seq: Vec<String>,
}
impl Pattern {

View File

@@ -17,13 +17,13 @@ use time::OffsetDateTime;
use tracing::error;
use crate::bucket::target::BucketTarget;
use crate::bucket::utils::deserialize;
use crate::config::com::{read_config, save_config};
use crate::error::{Error, Result};
use crate::new_object_layer_fn;
use crate::disk::BUCKET_META_PREFIX;
use crate::store::ECStore;
use crate::utils::xml::deserialize;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;

View File

@@ -6,13 +6,12 @@ use std::{collections::HashMap, sync::Arc};
use crate::StorageAPI;
use crate::bucket::error::BucketMetadataError;
use crate::bucket::metadata::{BUCKET_LIFECYCLE_CONFIG, load_bucket_metadata_parse};
use crate::bucket::utils::is_meta_bucketname;
use crate::bucket::utils::{deserialize, is_meta_bucketname};
use crate::cmd::bucket_targets;
use crate::error::{Error, Result, is_err_bucket_not_found};
use crate::global::{GLOBAL_Endpoints, is_dist_erasure, is_erasure, new_object_layer_fn};
use crate::heal::heal_commands::HealOpts;
use crate::store::ECStore;
use crate::utils::xml::deserialize;
use futures::future::join_all;
use policy::policy::BucketPolicy;
use s3s::dto::{

View File

@@ -1,5 +1,6 @@
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use s3s::xml;
pub fn is_meta_bucketname(name: &str) -> bool {
name.starts_with(RUSTFS_META_BUCKET)
@@ -70,3 +71,31 @@ pub fn check_valid_object_name(object_name: &str) -> Result<()> {
}
check_valid_object_name_prefix(object_name)
}
pub fn deserialize<T>(input: &[u8]) -> xml::DeResult<T>
where
T: for<'xml> xml::Deserialize<'xml>,
{
let mut d = xml::Deserializer::new(input);
let ans = T::deserialize(&mut d)?;
d.expect_eof()?;
Ok(ans)
}
pub fn serialize_content<T: xml::SerializeContent>(val: &T) -> xml::SerResult<String> {
let mut buf = Vec::with_capacity(256);
{
let mut ser = xml::Serializer::new(&mut buf);
val.serialize_content(&mut ser)?;
}
Ok(String::from_utf8(buf).unwrap())
}
pub fn serialize<T: xml::Serialize>(val: &T) -> xml::SerResult<Vec<u8>> {
let mut buf = Vec::with_capacity(256);
{
let mut ser = xml::Serializer::new(&mut buf);
val.serialize(&mut ser)?;
}
Ok(buf)
}

View File

@@ -1,6 +1,6 @@
use s3s::dto::{BucketVersioningStatus, VersioningConfiguration};
use crate::utils::wildcard;
use rustfs_utils::string::match_simple;
pub trait VersioningApi {
fn enabled(&self) -> bool;
@@ -33,7 +33,7 @@ impl VersioningApi for VersioningConfiguration {
for p in excluded_prefixes.iter() {
if let Some(ref sprefix) = p.prefix {
let pattern = format!("{}*", sprefix);
if wildcard::match_simple(&pattern, prefix) {
if match_simple(&pattern, prefix) {
return false;
}
}
@@ -63,7 +63,7 @@ impl VersioningApi for VersioningConfiguration {
for p in excluded_prefixes.iter() {
if let Some(ref sprefix) = p.prefix {
let pattern = format!("{}*", sprefix);
if wildcard::match_simple(&pattern, prefix) {
if match_simple(&pattern, prefix) {
return true;
}
}

View File

@@ -2,9 +2,9 @@ use super::{Config, GLOBAL_StorageClass, storageclass};
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};
use crate::utils::path::SLASH_SEPARATOR;
use http::HeaderMap;
use lazy_static::lazy_static;
use rustfs_utils::path::SLASH_SEPARATOR;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::{error, warn};

View File

@@ -1,5 +1,5 @@
use crate::error::{Error, Result};
use crate::utils::bool_flag::parse_bool;
use rustfs_utils::string::parse_bool;
use std::time::Duration;
#[derive(Debug, Default)]

View File

@@ -1,7 +1,6 @@
use super::error::{Error, Result};
use crate::utils::net;
use path_absolutize::Absolutize;
use rustfs_utils::is_local_host;
use rustfs_utils::{is_local_host, is_socket_addr};
use std::{fmt::Display, path::Path};
use url::{ParseError, Url};
@@ -186,7 +185,7 @@ fn url_parse_from_file_path(value: &str) -> Result<Url> {
// localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as
// /mnt/export1. So we go ahead and start the rustfs server in FS modes in these cases.
let addr: Vec<&str> = value.splitn(2, '/').collect();
if net::is_socket_addr(addr[0]) {
if is_socket_addr(addr[0]) {
return Err(Error::other("invalid URL endpoint format: missing scheme http or https"));
}

View File

@@ -32,8 +32,7 @@ use crate::heal::heal_commands::{HealScanMode, HealingTracker};
use crate::heal::heal_ops::HEALING_TRACKER_FILENAME;
use crate::new_object_layer_fn;
use crate::store_api::{ObjectInfo, StorageAPI};
// use crate::utils::os::get_info;
use crate::utils::path::{
use rustfs_utils::path::{
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix,
path_join, path_join_buf,
};

View File

@@ -1,4 +1,4 @@
use crate::utils::ellipses::*;
use rustfs_utils::string::{ArgPattern, find_ellipses_patterns, has_ellipses};
use serde::Deserialize;
use std::collections::HashSet;
use std::env;
@@ -443,6 +443,8 @@ fn get_total_sizes(arg_patterns: &[ArgPattern]) -> Vec<usize> {
#[cfg(test)]
mod test {
use rustfs_utils::string::Pattern;
use super::*;
impl PartialEq for EndpointSet {

View File

@@ -1,10 +1,11 @@
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use tracing::{instrument, warn};
use crate::{
disk::endpoint::{Endpoint, EndpointType},
disks_layout::DisksLayout,
global::global_rustfs_port,
utils::net::{self, XHost},
// utils::net::{self, XHost},
};
use std::io::{Error, Result};
use std::{
@@ -159,7 +160,7 @@ impl PoolEndpointList {
return Err(Error::other("invalid number of endpoints"));
}
let server_addr = net::check_local_server_addr(server_addr)?;
let server_addr = check_local_server_addr(server_addr)?;
// For single arg, return single drive EC setup.
if disks_layout.is_single_drive_layout() {
@@ -227,7 +228,7 @@ impl PoolEndpointList {
let host = ep.url.host().unwrap();
let host_ip_set = host_ip_cache.entry(host.clone()).or_insert({
net::get_host_ip(host.clone()).map_err(|e| Error::other(format!("host '{}' cannot resolve: {}", host, e)))?
get_host_ip(host.clone()).map_err(|e| Error::other(format!("host '{}' cannot resolve: {}", host, e)))?
});
let path = ep.get_file_path();
@@ -331,7 +332,7 @@ impl PoolEndpointList {
ep.is_local = true;
}
Some(host) => {
ep.is_local = net::is_local_host(host, ep.url.port().unwrap_or_default(), local_port)?;
ep.is_local = is_local_host(host, ep.url.port().unwrap_or_default(), local_port)?;
}
}
}
@@ -370,7 +371,7 @@ impl PoolEndpointList {
resolved_set.insert((i, j));
continue;
}
Some(host) => match net::is_local_host(host, ep.url.port().unwrap_or_default(), local_port) {
Some(host) => match is_local_host(host, ep.url.port().unwrap_or_default(), local_port) {
Ok(is_local) => {
if !found_local {
found_local = is_local;
@@ -605,6 +606,8 @@ impl EndpointServerPools {
#[cfg(test)]
mod test {
use rustfs_utils::must_get_local_ips;
use super::*;
use std::path::Path;
@@ -736,7 +739,7 @@ mod test {
// Filter ipList by IPs those do not start with '127.'.
let non_loop_back_i_ps =
net::must_get_local_ips().map_or(vec![], |v| v.into_iter().filter(|ip| ip.is_ipv4() && ip.is_loopback()).collect());
must_get_local_ips().map_or(vec![], |v| v.into_iter().filter(|ip| ip.is_ipv4() && ip.is_loopback()).collect());
if non_loop_back_i_ps.is_empty() {
panic!("No non-loop back IP address found for this host");
}

View File

@@ -1,5 +1,6 @@
use futures::future::join_all;
use madmin::heal_commands::HealResultItem;
use rustfs_utils::path::{SLASH_SEPARATOR, path_join};
use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration};
use tokio::{
spawn,
@@ -34,7 +35,6 @@ use crate::{
new_object_layer_fn,
store::get_disk_via_endpoint,
store_api::{BucketInfo, BucketOptions, StorageAPI},
utils::path::{SLASH_SEPARATOR, path_join},
};
pub static DEFAULT_MONITOR_NEW_DISK_INTERVAL: Duration = Duration::from_secs(10);

View File

@@ -43,7 +43,6 @@ use crate::{
new_object_layer_fn,
peer::is_reserved_or_invalid_bucket,
store::ECStore,
utils::path::{SLASH_SEPARATOR, path_join, path_to_bucket_object, path_to_bucket_object_with_base_path},
};
use crate::{disk::DiskAPI, store_api::ObjectInfo};
use crate::{
@@ -56,6 +55,7 @@ use lazy_static::lazy_static;
use rand::Rng;
use rmp_serde::{Deserializer, Serializer};
use rustfs_filemeta::{FileInfo, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams};
use rustfs_utils::path::{SLASH_SEPARATOR, path_join, path_to_bucket_object, path_to_bucket_object_with_base_path};
use s3s::dto::{BucketLifecycleConfiguration, ExpirationStatus, LifecycleRule, ReplicationConfiguration, ReplicationRuleStatus};
use serde::{Deserialize, Serialize};
use tokio::{

View File

@@ -6,9 +6,9 @@ use crate::{
error::to_object_err,
new_object_layer_fn,
store::ECStore,
utils::path::SLASH_SEPARATOR,
};
use lazy_static::lazy_static;
use rustfs_utils::path::SLASH_SEPARATOR;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use tokio::sync::mpsc::Receiver;

View File

@@ -6,12 +6,11 @@ use std::{
use crate::{
config::storageclass::{RRS, STANDARD},
disk::{BUCKET_META_PREFIX, DeleteOptions, DiskAPI, DiskStore, RUSTFS_META_BUCKET, error::DiskError},
disk::{BUCKET_META_PREFIX, DeleteOptions, DiskAPI, DiskStore, RUSTFS_META_BUCKET, error::DiskError, fs::read_file},
global::GLOBAL_BackgroundHealState,
heal::heal_ops::HEALING_TRACKER_FILENAME,
new_object_layer_fn,
store_api::{BucketInfo, StorageAPI},
utils::fs::read_file,
};
use crate::{disk, error::Result};
use chrono::{DateTime, Utc};

View File

@@ -5,6 +5,7 @@ use super::{
heal_commands::{HEAL_ITEM_BUCKET_METADATA, HealOpts, HealScanMode, HealStopSuccess, HealingTracker},
};
use crate::error::{Error, Result};
use crate::heal::heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT};
use crate::store_api::StorageAPI;
use crate::{
config::com::CONFIG_PREFIX,
@@ -18,17 +19,14 @@ use crate::{
global::GLOBAL_IsDistErasure,
heal::heal_commands::{HEAL_UNKNOWN_SCAN, HealStartSuccess},
new_object_layer_fn,
utils::path::has_prefix,
};
use crate::{
heal::heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT},
utils::path::path_join,
};
use chrono::Utc;
use futures::join;
use lazy_static::lazy_static;
use madmin::heal_commands::{HealDriveInfo, HealItemType, HealResultItem};
use rustfs_filemeta::MetaCacheEntry;
use rustfs_utils::path::has_prefix;
use rustfs_utils::path::path_join;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,

View File

@@ -1,10 +1,10 @@
use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::heal::background_heal_ops::{heal_bucket, heal_object};
use crate::heal::heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN};
use crate::utils::path::SLASH_SEPARATOR;
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use regex::Regex;
use rustfs_utils::path::SLASH_SEPARATOR;
use std::ops::Sub;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

View File

@@ -10,18 +10,13 @@ pub mod disks_layout;
pub mod endpoints;
pub mod erasure_coding;
pub mod error;
// mod file_meta;
// pub mod file_meta_inline;
pub mod global;
pub mod heal;
// pub mod io;
// pub mod metacache;
pub mod metrics_realtime;
pub mod notification_sys;
pub mod peer;
pub mod peer_rest_client;
pub mod pools;
// mod quorum;
pub mod rebalance;
pub mod set_disk;
mod sets;
@@ -30,7 +25,6 @@ pub mod store_api;
mod store_init;
pub mod store_list_objects;
mod store_utils;
pub mod utils;
pub mod xhttp;
pub use global::new_object_layer_fn;

View File

@@ -8,7 +8,6 @@ use crate::heal::heal_commands::{
};
use crate::heal::heal_ops::RUSTFS_RESERVED_BUCKET;
use crate::store::all_local_disk;
use crate::utils::wildcard::is_rustfs_meta_bucket_name;
use crate::{
disk::{self, VolumeInfo},
endpoints::{EndpointServerPools, Node},
@@ -750,7 +749,7 @@ pub async fn heal_bucket_local(bucket: &str, opts: &HealOpts) -> Result<HealResu
});
}
if opts.remove && !is_rustfs_meta_bucket_name(bucket) && !is_all_buckets_not_found(&errs) {
if opts.remove && !bucket.starts_with(disk::RUSTFS_META_BUCKET) && !is_all_buckets_not_found(&errs) {
let mut futures = Vec::new();
for disk in disks.iter() {
let disk = disk.clone();

View File

@@ -4,7 +4,6 @@ use crate::{
global::is_dist_erasure,
heal::heal_commands::BgHealState,
metrics_realtime::{CollectMetricsOpts, MetricType},
utils::net::XHost,
};
use madmin::{
ServerProperties,
@@ -25,6 +24,7 @@ use protos::{
},
};
use rmp_serde::{Deserializer, Serializer};
use rustfs_utils::XHost;
use serde::{Deserialize, Serialize as _};
use std::{collections::HashMap, io::Cursor, time::SystemTime};
use tonic::Request;

View File

@@ -16,7 +16,6 @@ use crate::set_disk::SetDisks;
use crate::store_api::{
BucketOptions, CompletePart, GetObjectReader, MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI,
};
use crate::utils::path::{SLASH_SEPARATOR, encode_dir_object, path_join};
use crate::{sets::Sets, store::ECStore};
use ::workers::workers::Workers;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
@@ -26,6 +25,7 @@ use http::HeaderMap;
use rmp_serde::{Deserializer, Serializer};
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams};
use rustfs_rio::HashReader;
use rustfs_utils::path::{SLASH_SEPARATOR, encode_dir_object, path_join};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Display;

View File

@@ -13,11 +13,11 @@ use crate::pools::ListCallback;
use crate::set_disk::SetDisks;
use crate::store::ECStore;
use crate::store_api::{CompletePart, GetObjectReader, ObjectIO, ObjectOptions, PutObjReader};
use crate::utils::path::encode_dir_object;
use common::defer;
use http::HeaderMap;
use rustfs_filemeta::{FileInfo, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams};
use rustfs_rio::HashReader;
use rustfs_utils::path::encode_dir_object;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};

View File

@@ -9,6 +9,7 @@ use crate::erasure_coding::bitrot_verify;
use crate::error::{Error, Result};
use crate::global::GLOBAL_MRFState;
use crate::heal::data_usage_cache::DataUsageCache;
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::store_api::ObjectToDelete;
use crate::{
cache_value::metacache_set::{ListPathRawOptions, list_path_raw},
@@ -38,10 +39,7 @@ use crate::{
ObjectOptions, PartInfo, PutObjReader, StorageAPI,
},
store_init::load_format_erasure,
utils::{
crypto::{base64_decode, base64_encode, hex},
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix},
},
// utils::crypto::{base64_decode, base64_encode, hex},
xhttp,
};
use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation};
@@ -49,10 +47,6 @@ use crate::{
heal::data_scanner::{HEAL_DELETE_DANGLING, globalHealConfig},
store_api::ListObjectVersionsInfo,
};
use crate::{
heal::heal_ops::{HealEntryFn, HealSequence},
utils::path::path_join_buf,
};
use bytesize::ByteSize;
use chrono::Utc;
use futures::future::join_all;
@@ -67,7 +61,11 @@ use rustfs_filemeta::{
RawFileInfo, file_info_from_raw, merge_file_meta_versions,
};
use rustfs_rio::{EtagResolvable, HashReader};
use rustfs_utils::HashAlgorithm;
use rustfs_utils::{
HashAlgorithm,
crypto::{base64_decode, base64_encode, hex},
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf},
};
use sha2::{Digest, Sha256};
use std::hash::Hash;
use std::mem;

View File

@@ -23,13 +23,13 @@ use crate::{
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
},
store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file},
utils::{hash, path::path_join_buf},
};
use common::globals::GLOBAL_Local_Node_Name;
use futures::future::join_all;
use http::HeaderMap;
use lock::{LockApi, namespace_lock::NsLockMap, new_lock_api};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_utils::{crc_hash, path::path_join_buf, sip_hash};
use tokio::sync::RwLock;
use uuid::Uuid;
@@ -232,11 +232,9 @@ impl Sets {
fn get_hashed_set_index(&self, input: &str) -> usize {
match self.distribution_algo {
DistributionAlgoVersion::V1 => hash::crc_hash(input, self.disk_set.len()),
DistributionAlgoVersion::V1 => crc_hash(input, self.disk_set.len()),
DistributionAlgoVersion::V2 | DistributionAlgoVersion::V3 => {
hash::sip_hash(input, self.disk_set.len(), self.id.as_bytes())
}
DistributionAlgoVersion::V2 | DistributionAlgoVersion::V3 => sip_hash(input, self.disk_set.len(), self.id.as_bytes()),
}
}

View File

@@ -25,9 +25,6 @@ use crate::pools::PoolMeta;
use crate::rebalance::RebalanceMeta;
use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO};
use crate::store_init::{check_disk_fatal_errs, ec_drives_no_config};
use crate::utils::crypto::base64_decode;
use crate::utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf};
use crate::utils::xml;
use crate::{
bucket::metadata::BucketMetadata,
disk::{BUCKET_META_PREFIX, DiskOption, DiskStore, RUSTFS_META_BUCKET, new_disk},
@@ -41,6 +38,8 @@ use crate::{
},
store_init,
};
use rustfs_utils::crypto::base64_decode;
use rustfs_utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf};
use crate::error::{Error, Result};
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port};
@@ -1347,12 +1346,12 @@ impl StorageAPI for ECStore {
meta.set_created(opts.created_at);
if opts.lock_enabled {
meta.object_lock_config_xml = xml::serialize::<ObjectLockConfiguration>(&enableObjcetLockConfig)?;
meta.versioning_config_xml = xml::serialize::<VersioningConfiguration>(&enableVersioningConfig)?;
meta.object_lock_config_xml = crate::bucket::utils::serialize::<ObjectLockConfiguration>(&enableObjcetLockConfig)?;
meta.versioning_config_xml = crate::bucket::utils::serialize::<VersioningConfiguration>(&enableVersioningConfig)?;
}
if opts.versioning_enabled {
meta.versioning_config_xml = xml::serialize::<VersioningConfiguration>(&enableVersioningConfig)?;
meta.versioning_config_xml = crate::bucket::utils::serialize::<VersioningConfiguration>(&enableVersioningConfig)?;
}
meta.save().await?;

View File

@@ -4,11 +4,12 @@ use crate::cmd::bucket_replication::{ReplicationStatusType, VersionPurgeStatusTy
use crate::error::{Error, Result};
use crate::heal::heal_ops::HealSequence;
use crate::store_utils::clean_metadata;
use crate::{disk::DiskStore, heal::heal_commands::HealOpts, utils::path::decode_dir_object, xhttp};
use crate::{disk::DiskStore, heal::heal_commands::HealOpts, xhttp};
use http::{HeaderMap, HeaderValue};
use madmin::heal_commands::HealResultItem;
use rustfs_filemeta::{FileInfo, MetaCacheEntriesSorted, ObjectPartInfo};
use rustfs_rio::{HashReader, Reader};
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;

View File

@@ -11,7 +11,6 @@ use crate::peer::is_reserved_or_invalid_bucket;
use crate::set_disk::SetDisks;
use crate::store::check_list_objs_args;
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix};
use crate::{store::ECStore, store_api::ListObjectsV2Info};
use futures::future::join_all;
use rand::seq::SliceRandom;
@@ -19,6 +18,7 @@ use rustfs_filemeta::{
FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, MetadataResolutionParams,
merge_file_meta_versions,
};
use rustfs_utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};

View File

@@ -1,9 +0,0 @@
use std::io::{Error, Result};
pub fn parse_bool(str: &str) -> Result<bool> {
match str {
"1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => Ok(true),
"0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => Ok(false),
_ => Err(Error::other(format!("ParseBool: parsing {}", str))),
}
}

View File

@@ -1,179 +0,0 @@
use std::{fs::Metadata, path::Path};
use tokio::{
fs::{self, File},
io,
};
#[cfg(not(windows))]
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
use std::os::unix::fs::MetadataExt;
if f1.dev() != f2.dev() {
return false;
}
if f1.ino() != f2.ino() {
return false;
}
if f1.size() != f2.size() {
return false;
}
if f1.permissions() != f2.permissions() {
return false;
}
if f1.mtime() != f2.mtime() {
return false;
}
true
}
#[cfg(windows)]
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
if f1.permissions() != f2.permissions() {
return false;
}
if f1.file_type() != f2.file_type() {
return false;
}
if f1.len() != f2.len() {
return false;
}
true
}
type FileMode = usize;
pub const O_RDONLY: FileMode = 0x00000;
pub const O_WRONLY: FileMode = 0x00001;
pub const O_RDWR: FileMode = 0x00002;
pub const O_CREATE: FileMode = 0x00040;
// pub const O_EXCL: FileMode = 0x00080;
// pub const O_NOCTTY: FileMode = 0x00100;
pub const O_TRUNC: FileMode = 0x00200;
// pub const O_NONBLOCK: FileMode = 0x00800;
pub const O_APPEND: FileMode = 0x00400;
// pub const O_SYNC: FileMode = 0x01000;
// pub const O_ASYNC: FileMode = 0x02000;
// pub const O_CLOEXEC: FileMode = 0x80000;
// read: bool,
// write: bool,
// append: bool,
// truncate: bool,
// create: bool,
// create_new: bool,
pub async fn open_file(path: impl AsRef<Path>, mode: FileMode) -> io::Result<File> {
let mut opts = fs::OpenOptions::new();
match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
O_RDONLY => {
opts.read(true);
}
O_WRONLY => {
opts.write(true);
}
O_RDWR => {
opts.read(true);
opts.write(true);
}
_ => (),
};
if mode & O_CREATE != 0 {
opts.create(true);
}
if mode & O_APPEND != 0 {
opts.append(true);
}
if mode & O_TRUNC != 0 {
opts.truncate(true);
}
opts.open(path.as_ref()).await
}
pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
fs::metadata(path).await?;
Ok(())
}
pub fn access_std(path: impl AsRef<Path>) -> io::Result<()> {
std::fs::metadata(path)?;
Ok(())
}
pub async fn lstat(path: impl AsRef<Path>) -> io::Result<Metadata> {
fs::metadata(path).await
}
pub fn lstat_std(path: impl AsRef<Path>) -> io::Result<Metadata> {
std::fs::metadata(path)
}
pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir_all(path.as_ref()).await
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
let meta = fs::metadata(path.as_ref()).await?;
if meta.is_dir() {
fs::remove_dir(path.as_ref()).await
} else {
fs::remove_file(path.as_ref()).await
}
}
pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
let meta = fs::metadata(path.as_ref()).await?;
if meta.is_dir() {
fs::remove_dir_all(path.as_ref()).await
} else {
fs::remove_file(path.as_ref()).await
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
std::fs::remove_dir(path.as_ref())
} else {
std::fs::remove_file(path.as_ref())
}
}
pub fn remove_all_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
std::fs::remove_dir_all(path.as_ref())
} else {
std::fs::remove_file(path.as_ref())
}
}
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir(path.as_ref()).await
}
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
fs::rename(from, to).await
}
pub fn rename_std(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
std::fs::rename(from, to)
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn read_file(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
fs::read(path.as_ref()).await
}

View File

@@ -1,21 +0,0 @@
use crc32fast::Hasher;
use siphasher::sip::SipHasher;
pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize {
// 你的密钥,必须是 16 字节
// 计算字符串的 SipHash 值
let result = SipHasher::new_with_key(id).hash(key.as_bytes());
result as usize % cardinality
}
pub fn crc_hash(key: &str, cardinality: usize) -> usize {
let mut hasher = Hasher::new(); // 创建一个新的哈希器
hasher.update(key.as_bytes()); // 更新哈希状态,添加数据
let checksum = hasher.finalize();
checksum as usize % cardinality
}

View File

@@ -1,116 +0,0 @@
pub mod bool_flag;
pub mod crypto;
pub mod ellipses;
pub mod fs;
pub mod hash;
pub mod net;
// pub mod os;
pub mod path;
pub mod wildcard;
pub mod xml;
// use crate::bucket::error::BucketMetadataError;
// use crate::disk::error::DiskError;
// use crate::error::StorageError;
// use protos::proto_gen::node_service::Error as Proto_Error;
// const ERROR_MODULE_MASK: u32 = 0xFF00;
// pub const ERROR_TYPE_MASK: u32 = 0x00FF;
// const DISK_ERROR_MASK: u32 = 0x0100;
// const STORAGE_ERROR_MASK: u32 = 0x0200;
// const BUCKET_METADATA_ERROR_MASK: u32 = 0x0300;
// const CONFIG_ERROR_MASK: u32 = 0x04000;
// const QUORUM_ERROR_MASK: u32 = 0x0500;
// const ERASURE_ERROR_MASK: u32 = 0x0600;
// // error to u8
// pub fn error_to_u32(err: &Error) -> u32 {
// if let Some(e) = err.downcast_ref::<DiskError>() {
// DISK_ERROR_MASK | e.to_u32()
// } else if let Some(e) = err.downcast_ref::<StorageError>() {
// STORAGE_ERROR_MASK | e.to_u32()
// } else if let Some(e) = err.downcast_ref::<BucketMetadataError>() {
// BUCKET_METADATA_ERROR_MASK | e.to_u32()
// } else if let Some(e) = err.downcast_ref::<ConfigError>() {
// CONFIG_ERROR_MASK | e.to_u32()
// } else if let Some(e) = err.downcast_ref::<QuorumError>() {
// QUORUM_ERROR_MASK | e.to_u32()
// } else if let Some(e) = err.downcast_ref::<ErasureError>() {
// ERASURE_ERROR_MASK | e.to_u32()
// } else {
// 0
// }
// }
// pub fn u32_to_error(e: u32) -> Option<Error> {
// match e & ERROR_MODULE_MASK {
// DISK_ERROR_MASK => DiskError::from_u32(e & ERROR_TYPE_MASK).map(|e| Error::new(e)),
// STORAGE_ERROR_MASK => StorageError::from_u32(e & ERROR_TYPE_MASK).map(|e| Error::new(e)),
// BUCKET_METADATA_ERROR_MASK => BucketMetadataError::from_u32(e & ERROR_TYPE_MASK).map(|e| Error::new(e)),
// CONFIG_ERROR_MASK => ConfigError::from_u32(e & ERROR_TYPE_MASK).map(|e| Error::new(e)),
// QUORUM_ERROR_MASK => QuorumError::from_u32(e & ERROR_TYPE_MASK).map(|e| Error::new(e)),
// ERASURE_ERROR_MASK => ErasureError::from_u32(e & ERROR_TYPE_MASK).map(|e| Error::new(e)),
// _ => None,
// }
// }
// pub fn err_to_proto_err(err: &Error, msg: &str) -> Proto_Error {
// let num = error_to_u32(err);
// Proto_Error {
// code: num,
// error_info: msg.to_string(),
// }
// }
// pub fn proto_err_to_err(err: &Proto_Error) -> Error {
// if let Some(e) = u32_to_error(err.code) {
// e
// } else {
// Error::from_string(err.error_info.clone())
// }
// }
// #[test]
// fn test_u32_to_error() {
// let error = Error::new(DiskError::FileCorrupt);
// let num = error_to_u32(&error);
// let new_error = u32_to_error(num);
// assert!(new_error.is_some());
// assert_eq!(new_error.unwrap().downcast_ref::<DiskError>(), Some(&DiskError::FileCorrupt));
// let error = Error::new(StorageError::BucketNotEmpty(Default::default()));
// let num = error_to_u32(&error);
// let new_error = u32_to_error(num);
// assert!(new_error.is_some());
// assert_eq!(
// new_error.unwrap().downcast_ref::<StorageError>(),
// Some(&StorageError::BucketNotEmpty(Default::default()))
// );
// let error = Error::new(BucketMetadataError::BucketObjectLockConfigNotFound);
// let num = error_to_u32(&error);
// let new_error = u32_to_error(num);
// assert!(new_error.is_some());
// assert_eq!(
// new_error.unwrap().downcast_ref::<BucketMetadataError>(),
// Some(&BucketMetadataError::BucketObjectLockConfigNotFound)
// );
// let error = Error::new(ConfigError::NotFound);
// let num = error_to_u32(&error);
// let new_error = u32_to_error(num);
// assert!(new_error.is_some());
// assert_eq!(new_error.unwrap().downcast_ref::<ConfigError>(), Some(&ConfigError::NotFound));
// let error = Error::new(QuorumError::Read);
// let num = error_to_u32(&error);
// let new_error = u32_to_error(num);
// assert!(new_error.is_some());
// assert_eq!(new_error.unwrap().downcast_ref::<QuorumError>(), Some(&QuorumError::Read));
// let error = Error::new(ErasureError::ErasureReadQuorum);
// let num = error_to_u32(&error);
// let new_error = u32_to_error(num);
// assert!(new_error.is_some());
// assert_eq!(new_error.unwrap().downcast_ref::<ErasureError>(), Some(&ErasureError::ErasureReadQuorum));
// }

View File

@@ -1,223 +0,0 @@
use lazy_static::lazy_static;
use std::io::{Error, Result};
use std::{
collections::HashSet,
fmt::Display,
net::{IpAddr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs},
};
use url::Host;
lazy_static! {
static ref LOCAL_IPS: Vec<IpAddr> = must_get_local_ips().unwrap();
}
/// helper for validating if the provided arg is an ip address.
pub fn is_socket_addr(addr: &str) -> bool {
// TODO IPv6 zone information?
addr.parse::<SocketAddr>().is_ok() || addr.parse::<IpAddr>().is_ok()
}
/// checks if server_addr is valid and local host.
pub fn check_local_server_addr(server_addr: &str) -> Result<SocketAddr> {
let addr: Vec<SocketAddr> = match server_addr.to_socket_addrs() {
Ok(addr) => addr.collect(),
Err(err) => return Err(err),
};
// 0.0.0.0 is a wildcard address and refers to local network
// addresses. I.e, 0.0.0.0:9000 like ":9000" refers to port
// 9000 on localhost.
for a in addr {
if a.ip().is_unspecified() {
return Ok(a);
}
let host = match a {
SocketAddr::V4(a) => Host::<&str>::Ipv4(*a.ip()),
SocketAddr::V6(a) => Host::Ipv6(*a.ip()),
};
if is_local_host(host, 0, 0)? {
return Ok(a);
}
}
Err(Error::other("host in server address should be this server"))
}
/// checks if the given parameter correspond to one of
/// the local IP of the current machine
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> Result<bool> {
let local_set: HashSet<IpAddr> = LOCAL_IPS.iter().copied().collect();
let is_local_host = match host {
Host::Domain(domain) => {
let ips = (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::<Vec<_>>())?;
ips.iter().any(|ip| local_set.contains(ip))
}
Host::Ipv4(ip) => local_set.contains(&IpAddr::V4(ip)),
Host::Ipv6(ip) => local_set.contains(&IpAddr::V6(ip)),
};
if port > 0 {
return Ok(is_local_host && port == local_port);
}
Ok(is_local_host)
}
/// returns IP address of given host.
pub fn get_host_ip(host: Host<&str>) -> Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => match (domain, 0)
.to_socket_addrs()
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
{
Ok(ips) => Ok(ips),
Err(err) => Err(err),
},
Host::Ipv4(ip) => {
let mut set = HashSet::with_capacity(1);
set.insert(IpAddr::V4(ip));
Ok(set)
}
Host::Ipv6(ip) => {
let mut set = HashSet::with_capacity(1);
set.insert(IpAddr::V6(ip));
Ok(set)
}
}
}
pub fn get_available_port() -> u16 {
TcpListener::bind("0.0.0.0:0").unwrap().local_addr().unwrap().port()
}
/// returns IPs of local interface
pub(crate) fn must_get_local_ips() -> Result<Vec<IpAddr>> {
match netif::up() {
Ok(up) => Ok(up.map(|x| x.address().to_owned()).collect()),
Err(err) => Err(Error::other(format!("Unable to get IP addresses of this host: {}", err))),
}
}
#[derive(Debug, Clone)]
pub struct XHost {
pub name: String,
pub port: u16,
pub is_port_set: bool,
}
impl Display for XHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if !self.is_port_set {
write!(f, "{}", self.name)
} else if self.name.contains(':') {
write!(f, "[{}]:{}", self.name, self.port)
} else {
write!(f, "{}:{}", self.name, self.port)
}
}
}
impl TryFrom<String> for XHost {
type Error = std::io::Error;
fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
if let Some(addr) = value.to_socket_addrs()?.next() {
Ok(Self {
name: addr.ip().to_string(),
port: addr.port(),
is_port_set: addr.port() > 0,
})
} else {
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "value invalid"))
}
}
}
/// parses the address string, process the ":port" format for double-stack binding,
/// and resolve the host name or IP address. If the port is 0, an available port is assigned.
pub fn parse_and_resolve_address(addr_str: &str) -> Result<SocketAddr> {
let resolved_addr: SocketAddr = if let Some(port) = addr_str.strip_prefix(":") {
// Process the ":port" format for double stack binding
let port_str = port;
let port: u16 = port_str
.parse()
.map_err(|e| Error::other(format!("Invalid port format: {}, err:{:?}", addr_str, e)))?;
let final_port = if port == 0 {
get_available_port() // assume get_available_port is available here
} else {
port
};
// Using IPv6 without address specified [::], it should handle both IPv4 and IPv6
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), final_port)
} else {
// Use existing logic to handle regular address formats
let mut addr = check_local_server_addr(addr_str)?; // assume check_local_server_addr is available here
if addr.port() == 0 {
addr.set_port(get_available_port());
}
addr
};
Ok(resolved_addr)
}
#[cfg(test)]
mod test {
use std::net::Ipv4Addr;
use super::*;
#[test]
fn test_is_socket_addr() {
let test_cases = [
("localhost", false),
("localhost:9000", false),
("example.com", false),
("http://192.168.1.0", false),
("http://192.168.1.0:9000", false),
("192.168.1.0", true),
("[2001:db8::1]:9000", true),
];
for (addr, expected) in test_cases {
let ret = is_socket_addr(addr);
assert_eq!(expected, ret, "addr: {}, expected: {}, got: {}", addr, expected, ret);
}
}
#[test]
fn test_check_local_server_addr() {
let test_cases = [
// (":54321", Ok(())),
("localhost:54321", Ok(())),
("0.0.0.0:9000", Ok(())),
// (":0", Ok(())),
("localhost", Err(Error::other("invalid socket address"))),
("", Err(Error::other("invalid socket address"))),
("example.org:54321", Err(Error::other("host in server address should be this server"))),
(":-10", Err(Error::other("invalid port value"))),
];
for test_case in test_cases {
let ret = check_local_server_addr(test_case.0);
if test_case.1.is_ok() && ret.is_err() {
panic!("{}: error: expected = <nil>, got = {:?}", test_case.0, ret);
}
if test_case.1.is_err() && ret.is_ok() {
panic!("{}: error: expected = {:?}, got = <nil>", test_case.0, test_case.1);
}
}
}
#[test]
fn test_must_get_local_ips() {
let local_ips = must_get_local_ips().unwrap();
let local_set: HashSet<IpAddr> = local_ips.into_iter().collect();
assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))));
}
}

View File

@@ -1,308 +0,0 @@
use std::path::Path;
use std::path::PathBuf;
pub const GLOBAL_DIR_SUFFIX: &str = "__XLDIR__";
pub const SLASH_SEPARATOR: &str = "/";
pub const GLOBAL_DIR_SUFFIX_WITH_SLASH: &str = "__XLDIR__/";
pub fn has_suffix(s: &str, suffix: &str) -> bool {
if cfg!(target_os = "windows") {
s.to_lowercase().ends_with(&suffix.to_lowercase())
} else {
s.ends_with(suffix)
}
}
pub fn encode_dir_object(object: &str) -> String {
if has_suffix(object, SLASH_SEPARATOR) {
format!("{}{}", object.trim_end_matches(SLASH_SEPARATOR), GLOBAL_DIR_SUFFIX)
} else {
object.to_string()
}
}
pub fn is_dir_object(object: &str) -> bool {
let obj = encode_dir_object(object);
obj.ends_with(GLOBAL_DIR_SUFFIX)
}
#[allow(dead_code)]
pub fn decode_dir_object(object: &str) -> String {
if has_suffix(object, GLOBAL_DIR_SUFFIX) {
format!("{}{}", object.trim_end_matches(GLOBAL_DIR_SUFFIX), SLASH_SEPARATOR)
} else {
object.to_string()
}
}
pub fn retain_slash(s: &str) -> String {
if s.is_empty() {
return s.to_string();
}
if s.ends_with(SLASH_SEPARATOR) {
s.to_string()
} else {
format!("{}{}", s, SLASH_SEPARATOR)
}
}
pub fn strings_has_prefix_fold(s: &str, prefix: &str) -> bool {
s.len() >= prefix.len() && (s[..prefix.len()] == *prefix || s[..prefix.len()].eq_ignore_ascii_case(prefix))
}
pub fn has_prefix(s: &str, prefix: &str) -> bool {
if cfg!(target_os = "windows") {
return strings_has_prefix_fold(s, prefix);
}
s.starts_with(prefix)
}
pub fn path_join(elem: &[PathBuf]) -> PathBuf {
let mut joined_path = PathBuf::new();
for path in elem {
joined_path.push(path);
}
joined_path
}
pub fn path_join_buf(elements: &[&str]) -> String {
let trailing_slash = !elements.is_empty() && elements.last().unwrap().ends_with(SLASH_SEPARATOR);
let mut dst = String::new();
let mut added = 0;
for e in elements {
if added > 0 || !e.is_empty() {
if added > 0 {
dst.push_str(SLASH_SEPARATOR);
}
dst.push_str(e);
added += e.len();
}
}
let result = dst.to_string();
let cpath = Path::new(&result).components().collect::<PathBuf>();
let clean_path = cpath.to_string_lossy();
if trailing_slash {
return format!("{}{}", clean_path, SLASH_SEPARATOR);
}
clean_path.to_string()
}
pub fn path_to_bucket_object_with_base_path(bash_path: &str, path: &str) -> (String, String) {
let path = path.trim_start_matches(bash_path).trim_start_matches(SLASH_SEPARATOR);
if let Some(m) = path.find(SLASH_SEPARATOR) {
return (path[..m].to_string(), path[m + SLASH_SEPARATOR.len()..].to_string());
}
(path.to_string(), "".to_string())
}
pub fn path_to_bucket_object(s: &str) -> (String, String) {
path_to_bucket_object_with_base_path("", s)
}
pub fn base_dir_from_prefix(prefix: &str) -> String {
let mut base_dir = dir(prefix).to_owned();
if base_dir == "." || base_dir == "./" || base_dir == "/" {
base_dir = "".to_owned();
}
if !prefix.contains('/') {
base_dir = "".to_owned();
}
if !base_dir.is_empty() && !base_dir.ends_with(SLASH_SEPARATOR) {
base_dir.push_str(SLASH_SEPARATOR);
}
base_dir
}
pub struct LazyBuf {
s: String,
buf: Option<Vec<u8>>,
w: usize,
}
impl LazyBuf {
pub fn new(s: String) -> Self {
LazyBuf { s, buf: None, w: 0 }
}
pub fn index(&self, i: usize) -> u8 {
if let Some(ref buf) = self.buf {
buf[i]
} else {
self.s.as_bytes()[i]
}
}
pub fn append(&mut self, c: u8) {
if self.buf.is_none() {
if self.w < self.s.len() && self.s.as_bytes()[self.w] == c {
self.w += 1;
return;
}
let mut new_buf = vec![0; self.s.len()];
new_buf[..self.w].copy_from_slice(&self.s.as_bytes()[..self.w]);
self.buf = Some(new_buf);
}
if let Some(ref mut buf) = self.buf {
buf[self.w] = c;
self.w += 1;
}
}
pub fn string(&self) -> String {
if let Some(ref buf) = self.buf {
String::from_utf8(buf[..self.w].to_vec()).unwrap()
} else {
self.s[..self.w].to_string()
}
}
}
pub fn clean(path: &str) -> String {
if path.is_empty() {
return ".".to_string();
}
let rooted = path.starts_with('/');
let n = path.len();
let mut out = LazyBuf::new(path.to_string());
let mut r = 0;
let mut dotdot = 0;
if rooted {
out.append(b'/');
r = 1;
dotdot = 1;
}
while r < n {
match path.as_bytes()[r] {
b'/' => {
// Empty path element
r += 1;
}
b'.' if r + 1 == n || path.as_bytes()[r + 1] == b'/' => {
// . element
r += 1;
}
b'.' if path.as_bytes()[r + 1] == b'.' && (r + 2 == n || path.as_bytes()[r + 2] == b'/') => {
// .. element: remove to last /
r += 2;
if out.w > dotdot {
// Can backtrack
out.w -= 1;
while out.w > dotdot && out.index(out.w) != b'/' {
out.w -= 1;
}
} else if !rooted {
// Cannot backtrack but not rooted, so append .. element.
if out.w > 0 {
out.append(b'/');
}
out.append(b'.');
out.append(b'.');
dotdot = out.w;
}
}
_ => {
// Real path element.
// Add slash if needed
if (rooted && out.w != 1) || (!rooted && out.w != 0) {
out.append(b'/');
}
// Copy element
while r < n && path.as_bytes()[r] != b'/' {
out.append(path.as_bytes()[r]);
r += 1;
}
}
}
}
// Turn empty string into "."
if out.w == 0 {
return ".".to_string();
}
out.string()
}
pub fn split(path: &str) -> (&str, &str) {
// Find the last occurrence of the '/' character
if let Some(i) = path.rfind('/') {
// Return the directory (up to and including the last '/') and the file name
return (&path[..i + 1], &path[i + 1..]);
}
// If no '/' is found, return an empty string for the directory and the whole path as the file name
(path, "")
}
pub fn dir(path: &str) -> String {
let (a, _) = split(path);
clean(a)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_base_dir_from_prefix() {
let a = "da/";
println!("---- in {}", a);
let a = base_dir_from_prefix(a);
println!("---- out {}", a);
}
#[test]
fn test_clean() {
assert_eq!(clean(""), ".");
assert_eq!(clean("abc"), "abc");
assert_eq!(clean("abc/def"), "abc/def");
assert_eq!(clean("a/b/c"), "a/b/c");
assert_eq!(clean("."), ".");
assert_eq!(clean(".."), "..");
assert_eq!(clean("../.."), "../..");
assert_eq!(clean("../../abc"), "../../abc");
assert_eq!(clean("/abc"), "/abc");
assert_eq!(clean("/"), "/");
assert_eq!(clean("abc/"), "abc");
assert_eq!(clean("abc/def/"), "abc/def");
assert_eq!(clean("a/b/c/"), "a/b/c");
assert_eq!(clean("./"), ".");
assert_eq!(clean("../"), "..");
assert_eq!(clean("../../"), "../..");
assert_eq!(clean("/abc/"), "/abc");
assert_eq!(clean("abc//def//ghi"), "abc/def/ghi");
assert_eq!(clean("//abc"), "/abc");
assert_eq!(clean("///abc"), "/abc");
assert_eq!(clean("//abc//"), "/abc");
assert_eq!(clean("abc//"), "abc");
assert_eq!(clean("abc/./def"), "abc/def");
assert_eq!(clean("/./abc/def"), "/abc/def");
assert_eq!(clean("abc/."), "abc");
assert_eq!(clean("abc/./../def"), "def");
assert_eq!(clean("abc//./../def"), "def");
assert_eq!(clean("abc/../../././../def"), "../../def");
assert_eq!(clean("abc/def/ghi/../jkl"), "abc/def/jkl");
assert_eq!(clean("abc/def/../ghi/../jkl"), "abc/jkl");
assert_eq!(clean("abc/def/.."), "abc");
assert_eq!(clean("abc/def/../.."), ".");
assert_eq!(clean("/abc/def/../.."), "/");
assert_eq!(clean("abc/def/../../.."), "..");
assert_eq!(clean("/abc/def/../../.."), "/");
assert_eq!(clean("abc/def/../../../ghi/jkl/../../../mno"), "../../mno");
}
}

View File

@@ -1,80 +0,0 @@
use nix::sys::{
stat::{major, minor, stat},
statfs::{statfs, FsType},
};
use crate::{
disk::Info,
error::{Error, Result},
};
use lazy_static::lazy_static;
use std::collections::HashMap;
lazy_static! {
static ref FS_TYPE_TO_STRING_MAP: HashMap<&'static str, &'static str> = {
let mut m = HashMap::new();
m.insert("1021994", "TMPFS");
m.insert("137d", "EXT");
m.insert("4244", "HFS");
m.insert("4d44", "MSDOS");
m.insert("52654973", "REISERFS");
m.insert("5346544e", "NTFS");
m.insert("58465342", "XFS");
m.insert("61756673", "AUFS");
m.insert("6969", "NFS");
m.insert("ef51", "EXT2OLD");
m.insert("ef53", "EXT4");
m.insert("f15f", "ecryptfs");
m.insert("794c7630", "overlayfs");
m.insert("2fc12fc1", "zfs");
m.insert("ff534d42", "cifs");
m.insert("53464846", "wslfs");
m
};
}
fn get_fs_type(ftype: FsType) -> String {
let binding = format!("{:?}", ftype);
let fs_type_hex = binding.as_str();
match FS_TYPE_TO_STRING_MAP.get(fs_type_hex) {
Some(fs_type_string) => fs_type_string.to_string(),
None => "UNKNOWN".to_string(),
}
}
pub fn get_info(path: &str) -> Result<Info> {
let statfs = statfs(path)?;
let reserved_blocks = statfs.blocks_free() - statfs.blocks_available();
let mut info = Info {
total: statfs.block_size() as u64 * (statfs.blocks() - reserved_blocks),
free: statfs.blocks() as u64 * statfs.blocks_available(),
files: statfs.files(),
ffree: statfs.files_free(),
fstype: get_fs_type(statfs.filesystem_type()),
..Default::default()
};
let stat = stat(path)?;
let dev_id = stat.st_dev as u64;
info.major = major(dev_id);
info.minor = minor(dev_id);
if info.free > info.total {
return Err(Error::from_string(format!(
"detected free space {} > total drive space {}, fs corruption at {}. please run 'fsck'",
info.free, info.total, path
)));
}
info.used = info.total - info.free;
Ok(info)
}
pub fn same_disk(disk1: &str, disk2: &str) -> Result<bool> {
let stat1 = stat(disk1)?;
let stat2 = stat(disk2)?;
Ok(stat1.st_dev == stat2.st_dev)
}

View File

@@ -1,73 +0,0 @@
use crate::disk::RUSTFS_META_BUCKET;
pub fn match_simple(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
}
if pattern == "*" {
return true;
}
// Do an extended wildcard '*' and '?' match.
deep_match_rune(name.as_bytes(), pattern.as_bytes(), true)
}
pub fn match_pattern(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
}
if pattern == "*" {
return true;
}
// Do an extended wildcard '*' and '?' match.
deep_match_rune(name.as_bytes(), pattern.as_bytes(), false)
}
fn deep_match_rune(str_: &[u8], pattern: &[u8], simple: bool) -> bool {
let (mut str_, mut pattern) = (str_, pattern);
while !pattern.is_empty() {
match pattern[0] as char {
'*' => {
return if pattern.len() == 1 {
true
} else {
deep_match_rune(str_, &pattern[1..], simple)
|| (!str_.is_empty() && deep_match_rune(&str_[1..], pattern, simple))
};
}
'?' => {
if str_.is_empty() {
return simple;
}
}
_ => {
if str_.is_empty() || str_[0] != pattern[0] {
return false;
}
}
}
str_ = &str_[1..];
pattern = &pattern[1..];
}
str_.is_empty() && pattern.is_empty()
}
pub fn match_as_pattern_prefix(pattern: &str, text: &str) -> bool {
let mut i = 0;
while i < text.len() && i < pattern.len() {
match pattern.as_bytes()[i] as char {
'*' => return true,
'?' => i += 1,
_ => {
if pattern.as_bytes()[i] != text.as_bytes()[i] {
return false;
}
}
}
i += 1;
}
text.len() <= pattern.len()
}
pub fn is_rustfs_meta_bucket_name(bucket: &str) -> bool {
bucket.starts_with(RUSTFS_META_BUCKET)
}

View File

@@ -1,29 +0,0 @@
use s3s::xml;
pub fn deserialize<T>(input: &[u8]) -> xml::DeResult<T>
where
T: for<'xml> xml::Deserialize<'xml>,
{
let mut d = xml::Deserializer::new(input);
let ans = T::deserialize(&mut d)?;
d.expect_eof()?;
Ok(ans)
}
pub fn serialize_content<T: xml::SerializeContent>(val: &T) -> xml::SerResult<String> {
let mut buf = Vec::with_capacity(256);
{
let mut ser = xml::Serializer::new(&mut buf);
val.serialize_content(&mut ser)?;
}
Ok(String::from_utf8(buf).unwrap())
}
pub fn serialize<T: xml::Serialize>(val: &T) -> xml::SerResult<Vec<u8>> {
let mut buf = Vec::with_capacity(256);
{
let mut ser = xml::Serializer::new(&mut buf);
val.serialize(&mut ser)?;
}
Ok(buf)
}

View File

@@ -31,6 +31,7 @@ tracing.workspace = true
madmin.workspace = true
lazy_static.workspace = true
regex = { workspace = true }
rustfs-utils= { workspace = true, features = ["path"] }
[dev-dependencies]
test-case.workspace = true

View File

@@ -9,7 +9,7 @@ use crate::{
UpdateServiceAccountOpts,
},
};
use ecstore::utils::{crypto::base64_encode, path::path_join_buf};
// use ecstore::utils::crypto::base64_encode;
use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
use policy::{
arn::ARN,
@@ -19,6 +19,8 @@ use policy::{
EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, default::DEFAULT_POLICIES, iam_policy_claim_name_sa,
},
};
use rustfs_utils::crypto::base64_encode;
use rustfs_utils::path::path_join_buf;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{

View File

@@ -14,11 +14,11 @@ use ecstore::{
store::ECStore,
store_api::{ObjectInfo, ObjectOptions},
store_list_objects::{ObjectInfoOrErr, WalkOptions},
utils::path::{SLASH_SEPARATOR, path_join_buf},
};
use futures::future::join_all;
use lazy_static::lazy_static;
use policy::{auth::UserIdentity, policy::PolicyDoc};
use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf};
use serde::{Serialize, de::DeserializeOwned};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
@@ -206,7 +206,7 @@ impl ObjectStore {
let mut futures = Vec::with_capacity(names.len());
for name in names {
let policy_name = ecstore::utils::path::dir(name);
let policy_name = rustfs_utils::path::dir(name);
futures.push(async move {
match self.load_policy(&policy_name).await {
Ok(p) => Ok(p),
@@ -238,7 +238,7 @@ impl ObjectStore {
let mut futures = Vec::with_capacity(names.len());
for name in names {
let user_name = ecstore::utils::path::dir(name);
let user_name = rustfs_utils::path::dir(name);
futures.push(async move {
match self.load_user_identity(&user_name, user_type).await {
Ok(res) => Ok(res),
@@ -464,7 +464,7 @@ impl Store for ObjectStore {
}
if let Some(item) = v.item {
let name = ecstore::utils::path::dir(&item);
let name = rustfs_utils::path::dir(&item);
self.load_user(&name, user_type, m).await?;
}
}
@@ -526,7 +526,7 @@ impl Store for ObjectStore {
}
if let Some(item) = v.item {
let name = ecstore::utils::path::dir(&item);
let name = rustfs_utils::path::dir(&item);
self.load_group(&name, m).await?;
}
}
@@ -590,7 +590,7 @@ impl Store for ObjectStore {
}
if let Some(item) = v.item {
let name = ecstore::utils::path::dir(&item);
let name = rustfs_utils::path::dir(&item);
self.load_policy_doc(&name, m).await?;
}
}
@@ -690,7 +690,7 @@ impl Store for ObjectStore {
continue;
}
let policy_name = ecstore::utils::path::dir(&policies_list[idx]);
let policy_name = rustfs_utils::path::dir(&policies_list[idx]);
info!("load policy: {}", policy_name);
@@ -706,7 +706,7 @@ impl Store for ObjectStore {
continue;
}
let policy_name = ecstore::utils::path::dir(&policies_list[idx]);
let policy_name = rustfs_utils::path::dir(&policies_list[idx]);
info!("load policy: {}", policy_name);
policy_docs_cache.insert(policy_name, p);
}
@@ -734,7 +734,7 @@ impl Store for ObjectStore {
continue;
}
let name = ecstore::utils::path::dir(&item_name_list[idx]);
let name = rustfs_utils::path::dir(&item_name_list[idx]);
info!("load reg user: {}", name);
user_items_cache.insert(name, p);
}
@@ -748,7 +748,7 @@ impl Store for ObjectStore {
continue;
}
let name = ecstore::utils::path::dir(&item_name_list[idx]);
let name = rustfs_utils::path::dir(&item_name_list[idx]);
info!("load reg user: {}", name);
user_items_cache.insert(name, p);
}
@@ -764,7 +764,7 @@ impl Store for ObjectStore {
let mut items_cache = CacheEntity::default();
for item in item_name_list.iter() {
let name = ecstore::utils::path::dir(item);
let name = rustfs_utils::path::dir(item);
info!("load group: {}", name);
if let Err(err) = self.load_group(&name, &mut items_cache).await {
return Err(Error::other(format!("load group failed: {}", err)));
@@ -843,7 +843,7 @@ impl Store for ObjectStore {
let mut items_cache = HashMap::default();
for item in item_name_list.iter() {
let name = ecstore::utils::path::dir(item);
let name = rustfs_utils::path::dir(item);
info!("load svc user: {}", name);
if let Err(err) = self.load_user(&name, UserType::Svc, &mut items_cache).await {
if !is_err_no_such_user(&err) {
@@ -880,7 +880,7 @@ impl Store for ObjectStore {
for item in item_name_list.iter() {
info!("load sts user path: {}", item);
let name = ecstore::utils::path::dir(item);
let name = rustfs_utils::path::dir(item);
info!("load sts user: {}", name);
if let Err(err) = self.load_user(&name, UserType::Sts, &mut sts_items_cache).await {
info!("load sts user failed: {}", err);

View File

@@ -9,8 +9,8 @@ use crate::manager::get_default_policyes;
use crate::store::MappedPolicy;
use crate::store::Store;
use crate::store::UserType;
use ecstore::utils::crypto::base64_decode;
use ecstore::utils::crypto::base64_encode;
// use ecstore::utils::crypto::base64_decode;
// use ecstore::utils::crypto::base64_encode;
use madmin::AddOrUpdateUserReq;
use madmin::GroupDesc;
use policy::arn::ARN;
@@ -28,6 +28,7 @@ use policy::policy::INHERITED_POLICY_TYPE;
use policy::policy::Policy;
use policy::policy::PolicyDoc;
use policy::policy::iam_policy_claim_name_sa;
use rustfs_utils::crypto::{base64_decode, base64_encode};
use serde_json::Value;
use serde_json::json;
use std::collections::HashMap;

View File

@@ -21,12 +21,12 @@ use ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free};
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::StorageAPI;
use ecstore::utils::path::path_join;
use futures::{Stream, StreamExt};
use http::{HeaderMap, Uri};
use hyper::StatusCode;
use iam::get_global_action_cred;
use iam::store::MappedPolicy;
use rustfs_utils::path::path_join;
// use lazy_static::lazy_static;
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;

View File

@@ -4,11 +4,12 @@ use crate::{
admin::router::Operation,
auth::{check_key_valid, get_session_token},
};
use ecstore::utils::{crypto::base64_encode, xml};
use ecstore::bucket::utils::serialize;
use http::StatusCode;
use iam::{manager::get_token_signing_key, sys::SESSION_POLICY_NAME};
use matchit::Params;
use policy::{auth::get_new_credentials_with_metadata, policy::Policy};
use rustfs_utils::crypto::base64_encode;
use s3s::{
Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result,
dto::{AssumeRoleOutput, Credentials, Timestamp},
@@ -138,7 +139,7 @@ impl Operation for AssumeRoleHandle {
};
// getAssumeRoleCredentials
let output = xml::serialize::<AssumeRoleOutput>(&resp).unwrap();
let output = serialize::<AssumeRoleOutput>(&resp).unwrap();
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}

View File

@@ -8,6 +8,7 @@ use axum::{
};
use axum_extra::extract::Host;
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_utils::net::parse_and_resolve_address;
use std::io;
use axum::response::Redirect;
@@ -239,8 +240,7 @@ pub async fn start_static_file_server(
.layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true))
.layer(TraceLayer::new_for_http());
use ecstore::utils::net;
let server_addr = net::parse_and_resolve_address(addrs).expect("Failed to parse socket address");
let server_addr = parse_and_resolve_address(addrs).expect("Failed to parse socket address");
let server_port = server_addr.port();
let server_address = server_addr.to_string();

View File

@@ -29,7 +29,6 @@ use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::store_api::BucketOptions;
use ecstore::utils::net;
use ecstore::{
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
@@ -51,6 +50,7 @@ use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
@@ -122,7 +122,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// Initialize event notifier
event::init_event_notifier(opt.event_config).await;
let server_addr = net::parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?;
let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?;
let server_port = server_addr.port();
let server_address = server_addr.to_string();

View File

@@ -48,11 +48,10 @@ use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
use ecstore::bucket::utils::serialize;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::store_api::RESERVED_METADATA_PREFIX_LOWER;
use ecstore::utils::path::path_join_buf;
use ecstore::utils::xml;
use ecstore::xhttp;
use futures::pin_mut;
use futures::{Stream, StreamExt};
@@ -66,6 +65,7 @@ use policy::policy::action::Action;
use policy::policy::action::S3Action;
use query::instance::make_rustfsms;
use rustfs_rio::HashReader;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
use s3s::S3;
use s3s::S3Error;
@@ -1274,7 +1274,7 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let data = try_!(xml::serialize(&tagging));
let data = try_!(serialize(&tagging));
metadata_sys::update(&bucket, BUCKET_TAGGING_CONFIG, data)
.await
@@ -1405,7 +1405,7 @@ impl S3 for FS {
// check bucket object lock enable
// check replication suspended
let data = try_!(xml::serialize(&versioning_configuration));
let data = try_!(serialize(&versioning_configuration));
metadata_sys::update(&bucket, BUCKET_VERSIONING_CONFIG, data)
.await
@@ -1596,7 +1596,7 @@ impl S3 for FS {
let Some(input_cfg) = lifecycle_configuration else { return Err(s3_error!(InvalidArgument)) };
let data = try_!(xml::serialize(&input_cfg));
let data = try_!(serialize(&input_cfg));
metadata_sys::update(&bucket, BUCKET_LIFECYCLE_CONFIG, data)
.await
.map_err(ApiError::from)?;
@@ -1681,7 +1681,7 @@ impl S3 for FS {
// TODO: check kms
let data = try_!(xml::serialize(&server_side_encryption_configuration));
let data = try_!(serialize(&server_side_encryption_configuration));
metadata_sys::update(&bucket, BUCKET_SSECONFIG, data)
.await
.map_err(ApiError::from)?;
@@ -1753,7 +1753,7 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let data = try_!(xml::serialize(&input_cfg));
let data = try_!(serialize(&input_cfg));
metadata_sys::update(&bucket, OBJECT_LOCK_CONFIG, data)
.await
@@ -1829,7 +1829,7 @@ impl S3 for FS {
.map_err(ApiError::from)?;
// TODO: check enable, versioning enable
let data = try_!(xml::serialize(&replication_configuration));
let data = try_!(serialize(&replication_configuration));
metadata_sys::update(&bucket, BUCKET_REPLICATION_CONFIG, data)
.await
@@ -1924,7 +1924,7 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let data = try_!(xml::serialize(&notification_configuration));
let data = try_!(serialize(&notification_configuration));
metadata_sys::update(&bucket, BUCKET_NOTIFICATION_CONFIG, data)
.await

View File

@@ -2,9 +2,9 @@ use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::error::Result;
use ecstore::error::StorageError;
use ecstore::store_api::ObjectOptions;
use ecstore::utils::path::is_dir_object;
use http::{HeaderMap, HeaderValue};
use lazy_static::lazy_static;
use rustfs_utils::path::is_dir_object;
use std::collections::HashMap;
use uuid::Uuid;