Compare commits

..

3 Commits

Author SHA1 Message Date
houseme
3f095e75cb improve code for logger and fix typo (#272) 2025-07-21 15:20:36 +08:00
houseme
f7d30da9e0 fix typo (#267)
* fix typo

* cargo fmt
2025-07-20 00:11:15 +08:00
Chrislearn Young
823d4b6f79 Add typos github actions and fix typos (#265)
* Add typo github actions and fix typos

* cargo fmt
2025-07-19 22:08:50 +08:00
39 changed files with 308 additions and 235 deletions

View File

@@ -83,6 +83,16 @@ jobs:
# Never skip release events and tag pushes
do_not_skip: '["workflow_dispatch", "schedule", "merge_group", "release", "push"]'
typos:
name: Typos
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- name: Typos check with custom config file
uses: crate-ci/typos@master
test-and-lint:
name: Test and Lint
needs: skip-check

41
_typos.toml Normal file
View File

@@ -0,0 +1,41 @@
[default]
# # Ignore specific spell checking patterns
# extend-ignore-identifiers-re = [
# # Ignore common patterns in base64 encoding and hash values
# "[A-Za-z0-9+/]{8,}={0,2}", # base64 encoding
# "[A-Fa-f0-9]{8,}", # hexadecimal hash
# "[A-Za-z0-9_-]{20,}", # long random strings
# ]
# # Ignore specific regex patterns in content
# extend-ignore-re = [
# # Ignore hash values and encoded strings (base64 patterns)
# "(?i)[A-Za-z0-9+/]{8,}={0,2}",
# # Ignore long strings in quotes (usually hash or base64)
# '"[A-Za-z0-9+/=_-]{8,}"',
# # Ignore IV values and similar cryptographic strings
# '"[A-Za-z0-9+/=]{12,}"',
# # Ignore cryptographic signatures and keys (including partial strings)
# "[A-Za-z0-9+/]{6,}[A-Za-z0-9+/=]*",
# # Ignore base64-like strings in comments (common in examples)
# "//.*[A-Za-z0-9+/]{8,}[A-Za-z0-9+/=]*",
# ]
extend-ignore-re = [
# Ignore long strings in quotes (usually hash or base64)
'"[A-Za-z0-9+/=_-]{32,}"',
# Ignore IV values and similar cryptographic strings
'"[A-Za-z0-9+/=]{12,}"',
# Ignore cryptographic signatures and keys (including partial strings)
"[A-Za-z0-9+/]{16,}[A-Za-z0-9+/=]*",
]
[default.extend-words]
bui = "bui"
typ = "typ"
clen = "clen"
datas = "datas"
bre = "bre"
abd = "abd"
[files]
extend-exclude = []

View File

@@ -33,11 +33,11 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
match id {
ID::Argon2idChaCHa20Poly1305 => {
let key = id.get_key(password, salt)?;
decryp(ChaCha20Poly1305::new_from_slice(&key)?, nonce, data)
decrypt(ChaCha20Poly1305::new_from_slice(&key)?, nonce, data)
}
_ => {
let key = id.get_key(password, salt)?;
decryp(Aes256Gcm::new_from_slice(&key)?, nonce, data)
decrypt(Aes256Gcm::new_from_slice(&key)?, nonce, data)
}
}
}
@@ -135,7 +135,7 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
#[cfg(any(test, feature = "crypto"))]
#[inline]
fn decryp<T: aes_gcm::aead::Aead>(stream: T, nonce: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Error> {
fn decrypt<T: aes_gcm::aead::Aead>(stream: T, nonce: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Error> {
use crate::error::Error;
stream
.decrypt(aes_gcm::Nonce::from_slice(nonce), data)

View File

@@ -253,7 +253,7 @@ pub async fn get_server_info(get_pools: bool) -> InfoMessage {
warn!("load_data_usage_from_backend end {:?}", after3 - after2);
let backen_info = store.clone().backend_info().await;
let backend_info = store.clone().backend_info().await;
let after4 = OffsetDateTime::now_utc();
@@ -272,10 +272,10 @@ pub async fn get_server_info(get_pools: bool) -> InfoMessage {
backend_type: rustfs_madmin::BackendType::ErasureType,
online_disks: online_disks.sum(),
offline_disks: offline_disks.sum(),
standard_sc_parity: backen_info.standard_sc_parity,
rr_sc_parity: backen_info.rr_sc_parity,
total_sets: backen_info.total_sets,
drives_per_set: backen_info.drives_per_set,
standard_sc_parity: backend_info.standard_sc_parity,
rr_sc_parity: backend_info.rr_sc_parity,
total_sets: backend_info.total_sets,
drives_per_set: backend_info.drives_per_set,
};
if get_pools {
pools = get_pools_info(&all_disks).await.unwrap_or_default();

View File

@@ -31,7 +31,7 @@ pub struct ListPathRawOptions {
pub fallback_disks: Vec<Option<DiskStore>>,
pub bucket: String,
pub path: String,
pub recursice: bool,
pub recursive: bool,
pub filter_prefix: Option<String>,
pub forward_to: Option<String>,
pub min_disks: usize,
@@ -52,7 +52,7 @@ impl Clone for ListPathRawOptions {
fallback_disks: self.fallback_disks.clone(),
bucket: self.bucket.clone(),
path: self.path.clone(),
recursice: self.recursice,
recursive: self.recursive,
filter_prefix: self.filter_prefix.clone(),
forward_to: self.forward_to.clone(),
min_disks: self.min_disks,
@@ -85,7 +85,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
let wakl_opts = WalkDirOptions {
bucket: opts_clone.bucket.clone(),
base_dir: opts_clone.path.clone(),
recursive: opts_clone.recursice,
recursive: opts_clone.recursive,
report_notfound: opts_clone.report_not_found,
filter_prefix: opts_clone.filter_prefix.clone(),
forward_to: opts_clone.forward_to.clone(),
@@ -133,7 +133,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
WalkDirOptions {
bucket: opts_clone.bucket.clone(),
base_dir: opts_clone.path.clone(),
recursive: opts_clone.recursice,
recursive: opts_clone.recursive,
report_notfound: opts_clone.report_not_found,
filter_prefix: opts_clone.filter_prefix.clone(),
forward_to: opts_clone.forward_to.clone(),

View File

@@ -41,7 +41,7 @@
// pin_mut!(body);
// // 上一次没用完的数据
// let mut prev_bytes = Bytes::new();
// let mut readed_size = 0;
// let mut read_size = 0;
// loop {
// let data: Vec<Bytes> = {
@@ -51,9 +51,9 @@
// Some(Err(e)) => return Err(e),
// Some(Ok((data, remaining_bytes))) => {
// // debug!(
// // "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ",
// // "content_length:{},read_size:{}, read_data data:{}, remaining_bytes: {} ",
// // content_length,
// // readed_size,
// // read_size,
// // data.len(),
// // remaining_bytes.len()
// // );
@@ -65,15 +65,15 @@
// };
// for bytes in data {
// readed_size += bytes.len();
// // debug!("readed_size {}, content_length {}", readed_size, content_length,);
// read_size += bytes.len();
// // debug!("read_size {}, content_length {}", read_size, content_length,);
// y.yield_ok(bytes).await;
// }
// if readed_size + prev_bytes.len() >= content_length {
// if read_size + prev_bytes.len() >= content_length {
// // debug!(
// // "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// // readed_size,
// // "读完了 read_size:{} + prev_bytes.len({}) == content_length {}",
// // read_size,
// // prev_bytes.len(),
// // content_length,
// // );

View File

@@ -135,7 +135,7 @@ impl Default for PutObjectOptions {
#[allow(dead_code)]
impl PutObjectOptions {
fn set_matche_tag(&mut self, etag: &str) {
fn set_match_tag(&mut self, etag: &str) {
if etag == "*" {
self.custom_header
.insert("If-Match", HeaderValue::from_str("*").expect("err"));
@@ -145,7 +145,7 @@ impl PutObjectOptions {
}
}
fn set_matche_tag_except(&mut self, etag: &str) {
fn set_match_tag_except(&mut self, etag: &str) {
if etag == "*" {
self.custom_header
.insert("If-None-Match", HeaderValue::from_str("*").expect("err"));
@@ -181,7 +181,7 @@ impl PutObjectOptions {
header.insert(
"Expires",
HeaderValue::from_str(&self.expires.format(ISO8601_DATEFORMAT).unwrap()).expect("err"),
); //rustfs invalid heade
); //rustfs invalid header
}
if self.mode.as_str() != "" {

View File

@@ -2422,7 +2422,7 @@ impl ReplicateObjectInfo {
// let mut arns = Vec::new();
// let mut tgts_map = std::collections::HashSet::new();
// for rule in cfg.rules {
// if rule.status.as_str() == "Disabe" {
// if rule.status.as_str() == "Disable" {
// continue;
// }

View File

@@ -95,7 +95,7 @@ impl ArnTarget {
Self {
client: TargetClient {
bucket,
storage_class: "STANDRD".to_string(),
storage_class: "STANDARD".to_string(),
disable_proxy: false,
health_check_duration: Duration::from_secs(100),
endpoint,
@@ -361,7 +361,7 @@ impl BucketTargetSys {
// // Mocked implementation for obtaining a remote client
// let tcli = TargetClient {
// bucket: _tgt.target_bucket.clone(),
// storage_class: "STANDRD".to_string(),
// storage_class: "STANDARD".to_string(),
// disable_proxy: false,
// health_check_duration: Duration::from_secs(100),
// endpoint: _tgt.endpoint.clone(),
@@ -379,7 +379,7 @@ impl BucketTargetSys {
// // Mocked implementation for obtaining a remote client
// let tcli = TargetClient {
// bucket: _tgt.target_bucket.clone(),
// storage_class: "STANDRD".to_string(),
// storage_class: "STANDARD".to_string(),
// disable_proxy: false,
// health_check_duration: Duration::from_secs(100),
// endpoint: _tgt.endpoint.clone(),
@@ -403,7 +403,7 @@ impl BucketTargetSys {
match store.get_bucket_info(_bucket, &store_api::BucketOptions::default()).await {
Ok(info) => {
println!("Bucket Info: {info:?}");
info.versionning
info.versioning
}
Err(err) => {
eprintln!("Error: {err:?}");
@@ -431,7 +431,7 @@ impl BucketTargetSys {
// {
// Ok(info) => {
// println!("Bucket Info: {:?}", info);
// info.versionning
// info.versioning
// }
// Err(err) => {
// eprintln!("Error: {:?}", err);
@@ -475,8 +475,7 @@ impl BucketTargetSys {
{
Ok(info) => {
println!("Bucket Info: {info:?}");
if !info.versionning {
println!("2222222222 {}", info.versionning);
if !info.versioning {
return Err(SetTargetError::TargetNotVersioned(tgt.target_bucket.to_string()));
}
}

View File

@@ -563,7 +563,7 @@ impl LocalDisk {
}
async fn read_metadata(&self, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
// TODO: suport timeout
// TODO: support timeout
let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?;
Ok(data)
}
@@ -595,7 +595,7 @@ impl LocalDisk {
}
async fn read_all_data(&self, volume: &str, volume_dir: impl AsRef<Path>, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
// TODO: timeout suport
// TODO: timeout support
let (data, _) = self.read_all_data_with_dmtime(volume, volume_dir, file_path).await?;
Ok(data)
}
@@ -750,7 +750,7 @@ impl LocalDisk {
let mut f = {
if sync {
// TODO: suport sync
// TODO: support sync
self.open_file(file_path, flags, skip_parent).await?
} else {
self.open_file(file_path, flags, skip_parent).await?
@@ -2336,7 +2336,7 @@ impl DiskAPI for LocalDisk {
};
done_sz(buf.len() as u64);
res.insert("metasize".to_string(), buf.len().to_string());
item.transform_meda_dir();
item.transform_meta_dir();
let meta_cache = MetaCacheEntry {
name: item.object_path().to_string_lossy().to_string(),
metadata: buf,

View File

@@ -308,7 +308,7 @@ impl Erasure {
// ec encode, 结果会写进 data_buffer
let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect();
// partiy 数量大于 0 才 ec
// parity 数量大于 0 才 ec
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().encode(data_slices).map_err(Error::other)?;
}

View File

@@ -563,12 +563,12 @@ impl CurrentScannerCycle {
}
}
// SystemTime 转换为时间戳
// Convert `SystemTime` to timestamp
fn system_time_to_timestamp(time: &DateTime<Utc>) -> i64 {
time.timestamp_micros()
}
// 将时间戳转换为 SystemTime
// Convert timestamp to `SystemTime`
fn timestamp_to_system_time(timestamp: i64) -> DateTime<Utc> {
DateTime::from_timestamp_micros(timestamp).unwrap_or_default()
}
@@ -593,7 +593,7 @@ pub struct ScannerItem {
}
impl ScannerItem {
pub fn transform_meda_dir(&mut self) {
pub fn transform_meta_dir(&mut self) {
let split = self.prefix.split(SLASH_SEPARATOR).map(PathBuf::from).collect::<Vec<_>>();
if split.len() > 1 {
self.prefix = path_join(&split[0..split.len() - 1]).to_string_lossy().to_string();
@@ -1101,7 +1101,7 @@ impl FolderScanner {
// successfully read means we have a valid object.
found_objects = true;
// Remove filename i.e is the meta file to construct object name
item.transform_meda_dir();
item.transform_meta_dir();
// Object already accounted for, remove from heal map,
// simply because getSize() function already heals the
// object.
@@ -1262,7 +1262,7 @@ impl FolderScanner {
disks: self.disks.clone(),
bucket: bucket.clone(),
path: prefix.clone(),
recursice: true,
recursive: true,
report_not_found: true,
min_disks: self.disks_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {

View File

@@ -1355,7 +1355,7 @@ impl SetDisks {
disks: disks.iter().cloned().map(Some).collect(),
bucket: bucket_info.name.clone(),
path: bucket_info.prefix.clone(),
recursice: true,
recursive: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry)))),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<DiskError>]| {

View File

@@ -1172,7 +1172,7 @@ impl SetDisks {
ListPathRawOptions {
disks: disks.iter().cloned().map(Some).collect(),
bucket: bucket.clone(),
recursice: true,
recursive: true,
min_disks: listing_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
info!("list_objects_to_rebalance: agreed: {:?}", &entry.name);

View File

@@ -449,7 +449,7 @@ impl PeerS3Client for LocalPeerS3Client {
op.as_ref().map(|v| BucketInfo {
name: v.name.clone(),
created: v.created,
versionning: versioned,
versioning: versioned,
..Default::default()
})
})

View File

@@ -1233,7 +1233,7 @@ impl SetDisks {
return Err(DiskError::ErasureReadQuorum);
}
let mut meta_hashs = vec![None; metas.len()];
let mut meta_hashes = vec![None; metas.len()];
let mut hasher = Sha256::new();
for (i, meta) in metas.iter().enumerate() {
@@ -1265,7 +1265,7 @@ impl SetDisks {
hasher.flush()?;
meta_hashs[i] = Some(hex(hasher.clone().finalize().as_slice()));
meta_hashes[i] = Some(hex(hasher.clone().finalize().as_slice()));
hasher.reset();
}
@@ -1273,7 +1273,7 @@ impl SetDisks {
let mut count_map = HashMap::new();
for hash in meta_hashs.iter().flatten() {
for hash in meta_hashes.iter().flatten() {
*count_map.entry(hash).or_insert(0) += 1;
}
@@ -1297,7 +1297,7 @@ impl SetDisks {
let mut valid_obj_map = HashMap::new();
for (i, op_hash) in meta_hashs.iter().enumerate() {
for (i, op_hash) in meta_hashes.iter().enumerate() {
if let Some(hash) = op_hash {
if let Some(max_hash) = max_val {
if hash == max_hash {
@@ -1749,8 +1749,8 @@ impl SetDisks {
// for res in results {
// match res {
// Ok(entrys) => {
// ress.push(Some(entrys));
// Ok(entries) => {
// ress.push(Some(entries));
// errs.push(None);
// }
// Err(e) => {
@@ -2108,17 +2108,17 @@ impl SetDisks {
let erasure = erasure_coding::Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let mut total_readed = 0;
let mut total_read = 0;
for i in part_index..=last_part_index {
if total_readed == length {
if total_read == length {
break;
}
let part_number = fi.parts[i].number;
let part_size = fi.parts[i].size;
let mut part_length = part_size - part_offset;
if part_length > (length - total_readed) {
part_length = length - total_readed
if part_length > (length - total_read) {
part_length = length - total_read
}
let till_offset = erasure.shard_file_offset(part_offset, part_length, part_size);
@@ -2203,7 +2203,7 @@ impl SetDisks {
// debug!("ec decode {} written size {}", part_number, n);
total_readed += part_length;
total_read += part_length;
part_offset = 0;
}
@@ -2306,7 +2306,7 @@ impl SetDisks {
Some(filter_prefix.to_string())
}
},
recursice: true,
recursive: true,
forward_to: None,
min_disks: 1,
report_not_found: false,
@@ -2481,12 +2481,12 @@ impl SetDisks {
};
match Self::pick_valid_fileinfo(&parts_metadata, mod_time, etag, read_quorum as usize) {
Ok(lastest_meta) => {
Ok(latest_meta) => {
let (available_disks, data_errs_by_disk, data_errs_by_part) = disks_with_all_parts(
&online_disks,
&mut parts_metadata,
&errs,
&lastest_meta,
&latest_meta,
bucket,
object,
opts.scan_mode,
@@ -2494,22 +2494,22 @@ impl SetDisks {
.await?;
// info!(
// "disks_with_all_parts: got available_disks: {:?}, data_errs_by_disk: {:?}, data_errs_by_part: {:?}, lastest_meta: {:?}",
// available_disks, data_errs_by_disk, data_errs_by_part, lastest_meta
// "disks_with_all_parts: got available_disks: {:?}, data_errs_by_disk: {:?}, data_errs_by_part: {:?}, latest_meta: {:?}",
// available_disks, data_errs_by_disk, data_errs_by_part, latest_meta
// );
let erasure = if !lastest_meta.deleted && !lastest_meta.is_remote() {
let erasure = if !latest_meta.deleted && !latest_meta.is_remote() {
// Initialize erasure coding
erasure_coding::Erasure::new(
lastest_meta.erasure.data_blocks,
lastest_meta.erasure.parity_blocks,
lastest_meta.erasure.block_size,
latest_meta.erasure.data_blocks,
latest_meta.erasure.parity_blocks,
latest_meta.erasure.block_size,
)
} else {
erasure_coding::Erasure::default()
};
result.object_size =
ObjectInfo::from_file_info(&lastest_meta, bucket, object, true).get_actual_size()? as usize;
ObjectInfo::from_file_info(&latest_meta, bucket, object, true).get_actual_size()? as usize;
// Loop to find number of disks with valid data, per-drive
// data state and a list of outdated disks on which data needs
// to be healed.
@@ -2517,15 +2517,15 @@ impl SetDisks {
let mut disks_to_heal_count = 0;
// info!(
// "errs: {:?}, data_errs_by_disk: {:?}, lastest_meta: {:?}",
// errs, data_errs_by_disk, lastest_meta
// "errs: {:?}, data_errs_by_disk: {:?}, latest_meta: {:?}",
// errs, data_errs_by_disk, latest_meta
// );
for index in 0..available_disks.len() {
let (yes, reason) = should_heal_object_on_disk(
&errs[index],
&data_errs_by_disk[&index],
&parts_metadata[index],
&lastest_meta,
&latest_meta,
);
if yes {
outdate_disks[index] = disks[index].clone();
@@ -2583,10 +2583,10 @@ impl SetDisks {
return Ok((result, None));
}
if !lastest_meta.deleted && disks_to_heal_count > lastest_meta.erasure.parity_blocks {
if !latest_meta.deleted && disks_to_heal_count > latest_meta.erasure.parity_blocks {
error!(
"file({} : {}) part corrupt too much, can not to fix, disks_to_heal_count: {}, parity_blocks: {}",
bucket, object, disks_to_heal_count, lastest_meta.erasure.parity_blocks
bucket, object, disks_to_heal_count, latest_meta.erasure.parity_blocks
);
// Allow for dangling deletes, on versions that have DataDir missing etc.
@@ -2633,39 +2633,37 @@ impl SetDisks {
};
}
if !lastest_meta.deleted && lastest_meta.erasure.distribution.len() != available_disks.len() {
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != available_disks.len() {
let err_str = format!(
"unexpected file distribution ({:?}) from available disks ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})",
lastest_meta.erasure.distribution, available_disks, bucket, object, version_id
latest_meta.erasure.distribution, available_disks, bucket, object, version_id
);
warn!(err_str);
let err = DiskError::other(err_str);
return Ok((
self.default_heal_result(lastest_meta, &errs, bucket, object, version_id)
.await,
self.default_heal_result(latest_meta, &errs, bucket, object, version_id).await,
Some(err),
));
}
let latest_disks = Self::shuffle_disks(&available_disks, &lastest_meta.erasure.distribution);
if !lastest_meta.deleted && lastest_meta.erasure.distribution.len() != outdate_disks.len() {
let latest_disks = Self::shuffle_disks(&available_disks, &latest_meta.erasure.distribution);
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != outdate_disks.len() {
let err_str = format!(
"unexpected file distribution ({:?}) from outdated disks ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})",
lastest_meta.erasure.distribution, outdate_disks, bucket, object, version_id
latest_meta.erasure.distribution, outdate_disks, bucket, object, version_id
);
warn!(err_str);
let err = DiskError::other(err_str);
return Ok((
self.default_heal_result(lastest_meta, &errs, bucket, object, version_id)
.await,
self.default_heal_result(latest_meta, &errs, bucket, object, version_id).await,
Some(err),
));
}
if !lastest_meta.deleted && lastest_meta.erasure.distribution.len() != parts_metadata.len() {
if !latest_meta.deleted && latest_meta.erasure.distribution.len() != parts_metadata.len() {
let err_str = format!(
"unexpected file distribution ({:?}) from metadata entries ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})",
lastest_meta.erasure.distribution,
latest_meta.erasure.distribution,
parts_metadata.len(),
bucket,
object,
@@ -2674,15 +2672,13 @@ impl SetDisks {
warn!(err_str);
let err = DiskError::other(err_str);
return Ok((
self.default_heal_result(lastest_meta, &errs, bucket, object, version_id)
.await,
self.default_heal_result(latest_meta, &errs, bucket, object, version_id).await,
Some(err),
));
}
let out_dated_disks = Self::shuffle_disks(&outdate_disks, &lastest_meta.erasure.distribution);
let mut parts_metadata =
Self::shuffle_parts_metadata(&parts_metadata, &lastest_meta.erasure.distribution);
let out_dated_disks = Self::shuffle_disks(&outdate_disks, &latest_meta.erasure.distribution);
let mut parts_metadata = Self::shuffle_parts_metadata(&parts_metadata, &latest_meta.erasure.distribution);
let mut copy_parts_metadata = vec![None; parts_metadata.len()];
for (index, disk) in latest_disks.iter().enumerate() {
if disk.is_some() {
@@ -2703,18 +2699,18 @@ impl SetDisks {
if disk.is_some() {
// Make sure to write the FileInfo information
// that is expected to be in quorum.
parts_metadata[index] = clean_file_info(&lastest_meta);
parts_metadata[index] = clean_file_info(&latest_meta);
}
}
// We write at temporary location and then rename to final location.
let tmp_id = Uuid::new_v4().to_string();
let src_data_dir = lastest_meta.data_dir.unwrap().to_string();
let dst_data_dir = lastest_meta.data_dir.unwrap();
let src_data_dir = latest_meta.data_dir.unwrap().to_string();
let dst_data_dir = latest_meta.data_dir.unwrap();
if !lastest_meta.deleted && !lastest_meta.is_remote() {
let erasure_info = lastest_meta.erasure;
for part in lastest_meta.parts.iter() {
if !latest_meta.deleted && !latest_meta.is_remote() {
let erasure_info = latest_meta.erasure;
for part in latest_meta.parts.iter() {
let till_offset = erasure.shard_file_offset(0, part.size, part.size);
let checksum_algo = erasure_info.get_checksum_info(part.number).algorithm;
let mut readers = Vec::with_capacity(latest_disks.len());
@@ -2759,7 +2755,7 @@ impl SetDisks {
let is_inline_buffer = {
if let Some(sc) = GLOBAL_StorageClass.get() {
sc.should_inline(erasure.shard_file_size(lastest_meta.size), false)
sc.should_inline(erasure.shard_file_size(latest_meta.size), false)
} else {
false
}
@@ -3840,7 +3836,7 @@ impl SetDisks {
disks,
fallback_disks,
bucket: bucket.clone(),
recursice: true,
recursive: true,
forward_to,
min_disks: 1,
report_not_found: false,
@@ -4317,7 +4313,7 @@ impl ObjectIO for SetDisks {
fi.is_latest = true;
// TODO: version suport
// TODO: version support
Ok(ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended))
}
}
@@ -6142,7 +6138,7 @@ async fn disks_with_all_parts(
online_disks: &[Option<DiskStore>],
parts_metadata: &mut [FileInfo],
errs: &[Option<DiskError>],
lastest_meta: &FileInfo,
latest_meta: &FileInfo,
bucket: &str,
object: &str,
scan_mode: HealScanMode,
@@ -6150,10 +6146,10 @@ async fn disks_with_all_parts(
let mut available_disks = vec![None; online_disks.len()];
let mut data_errs_by_disk: HashMap<usize, Vec<usize>> = HashMap::new();
for i in 0..online_disks.len() {
data_errs_by_disk.insert(i, vec![1; lastest_meta.parts.len()]);
data_errs_by_disk.insert(i, vec![1; latest_meta.parts.len()]);
}
let mut data_errs_by_part: HashMap<usize, Vec<usize>> = HashMap::new();
for i in 0..lastest_meta.parts.len() {
for i in 0..latest_meta.parts.len() {
data_errs_by_part.insert(i, vec![1; online_disks.len()]);
}
@@ -6191,7 +6187,7 @@ async fn disks_with_all_parts(
continue;
}
let meta = &parts_metadata[index];
if !meta.mod_time.eq(&lastest_meta.mod_time) || !meta.data_dir.eq(&lastest_meta.data_dir) {
if !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir) {
warn!("mod_time is not Eq, file corrupt, index: {index}");
meta_errs[index] = Some(DiskError::FileCorrupt);
parts_metadata[index] = FileInfo::default();
@@ -6217,7 +6213,7 @@ async fn disks_with_all_parts(
meta_errs.iter().enumerate().for_each(|(index, err)| {
if err.is_some() {
let part_err = conv_part_err_to_int(err);
for p in 0..lastest_meta.parts.len() {
for p in 0..latest_meta.parts.len() {
data_errs_by_part.entry(p).or_insert(vec![0; meta_errs.len()])[index] = part_err;
}
}
@@ -6269,7 +6265,7 @@ async fn disks_with_all_parts(
let mut verify_resp = CheckPartsResp::default();
let mut verify_err = None;
meta.data_dir = lastest_meta.data_dir;
meta.data_dir = latest_meta.data_dir;
if scan_mode == HEAL_DEEP_SCAN {
// disk has a valid xl.meta but may not have all the
// parts. This is considered an outdated disk, since
@@ -6293,7 +6289,7 @@ async fn disks_with_all_parts(
}
}
for p in 0..lastest_meta.parts.len() {
for p in 0..latest_meta.parts.len() {
if let Some(vec) = data_errs_by_part.get_mut(&p) {
if index < vec.len() {
if verify_err.is_some() {
@@ -6331,7 +6327,7 @@ pub fn should_heal_object_on_disk(
err: &Option<DiskError>,
parts_errs: &[usize],
meta: &FileInfo,
lastest_meta: &FileInfo,
latest_meta: &FileInfo,
) -> (bool, Option<DiskError>) {
if let Some(err) = err {
if err == &DiskError::FileNotFound || err == &DiskError::FileVersionNotFound || err == &DiskError::FileCorrupt {
@@ -6339,12 +6335,12 @@ pub fn should_heal_object_on_disk(
}
}
if lastest_meta.volume != meta.volume
|| lastest_meta.name != meta.name
|| lastest_meta.version_id != meta.version_id
|| lastest_meta.deleted != meta.deleted
if latest_meta.volume != meta.volume
|| latest_meta.name != meta.name
|| latest_meta.version_id != meta.version_id
|| latest_meta.deleted != meta.deleted
{
info!("lastest_meta not Eq meta, lastest_meta: {:?}, meta: {:?}", lastest_meta, meta);
info!("latest_meta not Eq meta, latest_meta: {:?}, meta: {:?}", latest_meta, meta);
return (true, Some(DiskError::OutdatedXLMeta));
}
if !meta.deleted && !meta.is_remote() {

View File

@@ -65,7 +65,7 @@ pub struct Sets {
pub pool_idx: usize,
pub endpoints: PoolEndpoints,
pub format: FormatV3,
pub partiy_count: usize,
pub parity_count: usize,
pub set_count: usize,
pub set_drive_count: usize,
pub default_parity_count: usize,
@@ -82,13 +82,13 @@ impl Drop for Sets {
}
impl Sets {
#[tracing::instrument(level = "debug", skip(disks, endpoints, fm, pool_idx, partiy_count))]
#[tracing::instrument(level = "debug", skip(disks, endpoints, fm, pool_idx, parity_count))]
pub async fn new(
disks: Vec<Option<DiskStore>>,
endpoints: &PoolEndpoints,
fm: &FormatV3,
pool_idx: usize,
partiy_count: usize,
parity_count: usize,
) -> Result<Arc<Self>> {
let set_count = fm.erasure.sets.len();
let set_drive_count = fm.erasure.sets[0].len();
@@ -173,7 +173,7 @@ impl Sets {
Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))),
Arc::new(RwLock::new(set_drive)),
set_drive_count,
partiy_count,
parity_count,
i,
pool_idx,
set_endpoints,
@@ -194,10 +194,10 @@ impl Sets {
pool_idx,
endpoints: endpoints.clone(),
format: fm.clone(),
partiy_count,
parity_count,
set_count,
set_drive_count,
default_parity_count: partiy_count,
default_parity_count: parity_count,
distribution_algo: fm.erasure.distribution_algo.clone(),
exit_signal: Some(tx),
});

View File

@@ -152,7 +152,7 @@ impl ECStore {
common_parity_drives = parity_drives;
}
// validate_parity(partiy_count, pool_eps.drives_per_set)?;
// validate_parity(parity_count, pool_eps.drives_per_set)?;
let (disks, errs) = store_init::init_disks(
&pool_eps.endpoints,
@@ -302,13 +302,13 @@ impl ECStore {
}
let pools = meta.return_resumable_pools();
let mut pool_indeces = Vec::with_capacity(pools.len());
let mut pool_indices = Vec::with_capacity(pools.len());
let endpoints = get_global_endpoints();
for p in pools.iter() {
if let Some(idx) = endpoints.get_pool_idx(&p.cmd_line) {
pool_indeces.push(idx);
pool_indices.push(idx);
} else {
return Err(Error::other(format!(
"unexpected state present for decommission status pool({}) not found",
@@ -317,8 +317,8 @@ impl ECStore {
}
}
if !pool_indeces.is_empty() {
let idx = pool_indeces[0];
if !pool_indices.is_empty() {
let idx = pool_indices[0];
if endpoints.as_ref()[idx].endpoints.as_ref()[0].is_local {
let (_tx, rx) = broadcast::channel(1);
@@ -328,9 +328,9 @@ impl ECStore {
// wait 3 minutes for cluster init
tokio::time::sleep(Duration::from_secs(60 * 3)).await;
if let Err(err) = store.decommission(rx.resubscribe(), pool_indeces.clone()).await {
if let Err(err) = store.decommission(rx.resubscribe(), pool_indices.clone()).await {
if err == StorageError::DecommissionAlreadyRunning {
for i in pool_indeces.iter() {
for i in pool_indices.iter() {
store.do_decommission_in_routine(rx.resubscribe(), *i).await;
}
return;
@@ -417,9 +417,9 @@ impl ECStore {
// // TODO handle errs
// continue;
// }
// let entrys = disks_res.as_ref().unwrap();
// let entries = disks_res.as_ref().unwrap();
// for entry in entrys {
// for entry in entries {
// // warn!("lst_merged entry---- {}", &entry.name);
// if !opts.prefix.is_empty() && !entry.name.starts_with(&opts.prefix) {
@@ -1415,7 +1415,7 @@ impl StorageAPI for ECStore {
if let Ok(sys) = metadata_sys::get(bucket).await {
info.created = Some(sys.created);
info.versionning = sys.versioning();
info.versioning = sys.versioning();
info.object_locking = sys.object_locking();
}

View File

@@ -276,7 +276,10 @@ impl HTTPRangeSpec {
return Ok(range_length);
}
Err(Error::other("range value invaild"))
Err(Error::other(format!(
"range value invalid: start={}, end={}, expected start <= end and end >= -1",
self.start, self.end
)))
}
}
@@ -336,7 +339,7 @@ pub struct BucketInfo {
pub name: String,
pub created: Option<OffsetDateTime>,
pub deleted: Option<OffsetDateTime>,
pub versionning: bool,
pub versioning: bool,
pub object_locking: bool,
}

View File

@@ -222,7 +222,7 @@ fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
Ok(())
}
// load_format_erasure_all 读取所有 foramt.json
// load_format_erasure_all 读取所有 format.json
pub async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<DiskError>>) {
let mut futures = Vec::with_capacity(disks.len());
let mut datas = Vec::with_capacity(disks.len());

View File

@@ -776,7 +776,7 @@ impl ECStore {
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: bucket.to_owned(),
path,
recursice: true,
recursive: true,
filter_prefix: Some(filter_prefix),
forward_to: opts.marker.clone(),
min_disks: listing_quorum,
@@ -851,8 +851,8 @@ impl ECStore {
}
};
if let Some(fiter) = opts.filter {
if fiter(&fi) {
if let Some(filter) = opts.filter {
if filter(&fi) {
let item = ObjectInfoOrErr {
item: Some(ObjectInfo::from_file_info(&fi, &bucket, &fi.name, {
if let Some(v) = &vcf { v.versioned(&fi.name) } else { false }
@@ -899,8 +899,8 @@ impl ECStore {
}
for fi in fvs.versions.iter() {
if let Some(fiter) = opts.filter {
if fiter(fi) {
if let Some(filter) = opts.filter {
if filter(fi) {
let item = ObjectInfoOrErr {
item: Some(ObjectInfo::from_file_info(fi, &bucket, &fi.name, {
if let Some(v) = &vcf { v.versioned(&fi.name) } else { false }
@@ -972,7 +972,7 @@ async fn gather_results(
let mut sender = Some(results_tx);
let mut recv = recv;
let mut entrys = Vec::new();
let mut entries = Vec::new();
while let Some(mut entry) = recv.recv().await {
if returned {
continue;
@@ -1009,11 +1009,11 @@ async fn gather_results(
// TODO: Lifecycle
if opts.limit > 0 && entrys.len() >= opts.limit as usize {
if opts.limit > 0 && entries.len() >= opts.limit as usize {
if let Some(tx) = sender {
tx.send(MetaCacheEntriesSortedResult {
entries: Some(MetaCacheEntriesSorted {
o: MetaCacheEntries(entrys.clone()),
o: MetaCacheEntries(entries.clone()),
..Default::default()
}),
err: None,
@@ -1027,15 +1027,15 @@ async fn gather_results(
continue;
}
entrys.push(Some(entry));
// entrys.push(entry);
entries.push(Some(entry));
// entries.push(entry);
}
// finish not full, return eof
if let Some(tx) = sender {
tx.send(MetaCacheEntriesSortedResult {
entries: Some(MetaCacheEntriesSorted {
o: MetaCacheEntries(entrys.clone()),
o: MetaCacheEntries(entries.clone()),
..Default::default()
}),
err: Some(Error::Unexpected.into()),
@@ -1125,10 +1125,10 @@ async fn merge_entry_channels(
if path::clean(&best_entry.name) == path::clean(&other_entry.name) {
let dir_matches = best_entry.is_dir() && other_entry.is_dir();
let suffix_matche =
let suffix_matches =
best_entry.name.ends_with(SLASH_SEPARATOR) == other_entry.name.ends_with(SLASH_SEPARATOR);
if dir_matches && suffix_matche {
if dir_matches && suffix_matches {
to_merge.push(other_idx);
continue;
}
@@ -1286,7 +1286,7 @@ impl SetDisks {
fallback_disks: fallback_disks.iter().cloned().map(Some).collect(),
bucket: opts.bucket,
path: opts.base_dir,
recursice: opts.recursive,
recursive: opts.recursive,
filter_prefix: opts.filter_prefix,
forward_to: opts.marker,
min_disks: listing_quorum,

View File

@@ -215,7 +215,7 @@ pub struct FileInfo {
impl FileInfo {
pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
let indexs = {
let indices = {
let cardinality = data_blocks + parity_blocks;
let mut nums = vec![0; cardinality];
let key_crc = crc32fast::hash(object.as_bytes());
@@ -233,7 +233,7 @@ impl FileInfo {
data_blocks,
parity_blocks,
block_size: BLOCK_SIZE_V2,
distribution: indexs,
distribution: indices,
..Default::default()
},
..Default::default()

View File

@@ -702,7 +702,7 @@ impl FileMeta {
})
}
pub fn lastest_mod_time(&self) -> Option<OffsetDateTime> {
pub fn latest_mod_time(&self) -> Option<OffsetDateTime> {
if self.versions.is_empty() {
return None;
}
@@ -1762,7 +1762,7 @@ impl MetaDeleteMarker {
// self.meta_sys = Some(map);
// }
// name => return Err(Error::other(format!("not suport field name {name}"))),
// name => return Err(Error::other(format!("not support field name {name}"))),
// }
// }
@@ -1962,32 +1962,32 @@ pub fn merge_file_meta_versions(
n_versions += 1;
}
} else {
let mut lastest_count = 0;
let mut latest_count = 0;
for (i, ver) in tops.iter().enumerate() {
if ver.header == latest.header {
lastest_count += 1;
latest_count += 1;
continue;
}
if i == 0 || ver.header.sorts_before(&latest.header) {
if i == 0 || lastest_count == 0 {
lastest_count = 1;
if i == 0 || latest_count == 0 {
latest_count = 1;
} else if !strict && ver.header.matches_not_strict(&latest.header) {
lastest_count += 1;
latest_count += 1;
} else {
lastest_count = 1;
latest_count = 1;
}
latest = ver.clone();
continue;
}
// Mismatch, but older.
if lastest_count > 0 && !strict && ver.header.matches_not_strict(&latest.header) {
lastest_count += 1;
if latest_count > 0 && !strict && ver.header.matches_not_strict(&latest.header) {
latest_count += 1;
continue;
}
if lastest_count > 0 && ver.header.version_id == latest.header.version_id {
if latest_count > 0 && ver.header.version_id == latest.header.version_id {
let mut x: HashMap<FileMetaVersionHeader, usize> = HashMap::new();
for a in tops.iter() {
if a.header.version_id != ver.header.version_id {
@@ -1999,12 +1999,12 @@ pub fn merge_file_meta_versions(
}
*x.entry(a_clone.header).or_insert(1) += 1;
}
lastest_count = 0;
latest_count = 0;
for (k, v) in x.iter() {
if *v < lastest_count {
if *v < latest_count {
continue;
}
if *v == lastest_count && latest.header.sorts_before(k) {
if *v == latest_count && latest.header.sorts_before(k) {
continue;
}
tops.iter().for_each(|a| {
@@ -2017,12 +2017,12 @@ pub fn merge_file_meta_versions(
}
});
lastest_count = *v;
latest_count = *v;
}
break;
}
}
if lastest_count >= quorum {
if latest_count >= quorum {
if !latest.header.free_version() {
n_versions += 1;
}

View File

@@ -221,7 +221,7 @@ impl MetaCacheEntry {
};
if self_vers.versions.len() != other_vers.versions.len() {
match self_vers.lastest_mod_time().cmp(&other_vers.lastest_mod_time()) {
match self_vers.latest_mod_time().cmp(&other_vers.latest_mod_time()) {
Ordering::Greater => return (Some(self.clone()), false),
Ordering::Less => return (Some(other.clone()), false),
_ => {}

View File

@@ -90,7 +90,7 @@ where
T: Store,
{
pub(crate) async fn new(api: T) -> Arc<Self> {
let (sender, reciver) = mpsc::channel::<i64>(100);
let (sender, receiver) = mpsc::channel::<i64>(100);
let sys = Arc::new(Self {
api,
@@ -101,11 +101,11 @@ where
last_timestamp: AtomicI64::new(0),
});
sys.clone().init(reciver).await.unwrap();
sys.clone().init(receiver).await.unwrap();
sys
}
async fn init(self: Arc<Self>, reciver: Receiver<i64>) -> Result<()> {
async fn init(self: Arc<Self>, receiver: Receiver<i64>) -> Result<()> {
self.clone().save_iam_formatter().await?;
self.clone().load().await?;
@@ -118,7 +118,7 @@ where
let s = Arc::clone(&self);
async move {
let ticker = tokio::time::interval(Duration::from_secs(120));
tokio::pin!(ticker, reciver);
tokio::pin!(ticker, receiver);
loop {
select! {
_ = ticker.tick() => {
@@ -127,13 +127,13 @@ where
error!("iam load err {:?}", err);
}
},
i = reciver.recv() => {
info!("iam load reciver");
i = receiver.recv() => {
info!("iam load receiver");
match i {
Some(t) => {
let last = s.last_timestamp.load(Ordering::Relaxed);
if last <= t {
info!("iam load reciver load");
info!("iam load receiver load");
if let Err(err) =s.clone().load().await{
error!("iam load err {:?}", err);
}
@@ -814,7 +814,7 @@ where
let mp = MappedPolicy::new(policy);
let (_, combined_policy_stmt) = filter_policies(&self.cache, &mp.policies, "temp");
if combined_policy_stmt.is_empty() {
return Err(Error::other(format!("need poliy not found {}", IamError::NoSuchPolicy)));
return Err(Error::other(format!("Required policy not found: {}", IamError::NoSuchPolicy)));
}
self.api
@@ -987,7 +987,7 @@ where
_ => auth::ACCOUNT_OFF,
}
};
let user_entiry = UserIdentity::from(Credentials {
let user_entry = UserIdentity::from(Credentials {
access_key: access_key.to_string(),
secret_key: args.secret_key.to_string(),
status: status.to_owned(),
@@ -995,10 +995,10 @@ where
});
self.api
.save_user_identity(access_key, UserType::Reg, user_entiry.clone(), None)
.save_user_identity(access_key, UserType::Reg, user_entry.clone(), None)
.await?;
self.update_user_with_claims(access_key, user_entiry)?;
self.update_user_with_claims(access_key, user_entry)?;
Ok(OffsetDateTime::now_utc())
}
@@ -1104,7 +1104,7 @@ where
}
};
let user_entiry = UserIdentity::from(Credentials {
let user_entry = UserIdentity::from(Credentials {
access_key: access_key.to_string(),
secret_key: u.credentials.secret_key.clone(),
status: status.to_owned(),
@@ -1112,10 +1112,10 @@ where
});
self.api
.save_user_identity(access_key, UserType::Reg, user_entiry.clone(), None)
.save_user_identity(access_key, UserType::Reg, user_entry.clone(), None)
.await?;
self.update_user_with_claims(access_key, user_entiry)?;
self.update_user_with_claims(access_key, user_entry)?;
Ok(OffsetDateTime::now_utc())
}

View File

@@ -62,8 +62,12 @@ pub trait Store: Clone + Send + Sync + 'static {
is_group: bool,
m: &mut HashMap<String, MappedPolicy>,
) -> Result<()>;
async fn load_mapped_policys(&self, user_type: UserType, is_group: bool, m: &mut HashMap<String, MappedPolicy>)
-> Result<()>;
async fn load_mapped_policies(
&self,
user_type: UserType,
is_group: bool,
m: &mut HashMap<String, MappedPolicy>,
) -> Result<()>;
async fn load_all(&self, cache: &Cache) -> Result<()>;
}

View File

@@ -656,7 +656,7 @@ impl Store for ObjectStore {
Ok(())
}
async fn load_mapped_policys(
async fn load_mapped_policies(
&self,
user_type: UserType,
is_group: bool,

View File

@@ -124,13 +124,13 @@ impl<T: Store> IamSys<T> {
})
}
pub async fn load_mapped_policys(
pub async fn load_mapped_policies(
&self,
user_type: UserType,
is_group: bool,
m: &mut HashMap<String, MappedPolicy>,
) -> Result<()> {
self.store.api.load_mapped_policys(user_type, is_group, m).await
self.store.api.load_mapped_policies(user_type, is_group, m).await
}
pub async fn list_polices(&self, bucket_name: &str) -> Result<HashMap<String, Policy>> {

View File

@@ -22,7 +22,7 @@ pub struct LRWMutex {
id: RwLock<String>,
source: RwLock<String>,
is_write: RwLock<bool>,
refrence: RwLock<usize>,
reference: RwLock<usize>,
}
impl LRWMutex {
@@ -66,13 +66,13 @@ impl LRWMutex {
let mut locked = false;
if is_write {
if *self.refrence.read().await == 0 && !*self.is_write.read().await {
*self.refrence.write().await = 1;
if *self.reference.read().await == 0 && !*self.is_write.read().await {
*self.reference.write().await = 1;
*self.is_write.write().await = true;
locked = true;
}
} else if !*self.is_write.read().await {
*self.refrence.write().await += 1;
*self.reference.write().await += 1;
locked = true;
}
@@ -115,13 +115,13 @@ impl LRWMutex {
async fn unlock(&self, is_write: bool) -> bool {
let mut unlocked = false;
if is_write {
if *self.is_write.read().await && *self.refrence.read().await == 1 {
*self.refrence.write().await = 0;
if *self.is_write.read().await && *self.reference.read().await == 1 {
*self.reference.write().await = 0;
*self.is_write.write().await = false;
unlocked = true;
}
} else if !*self.is_write.read().await && *self.refrence.read().await > 0 {
*self.refrence.write().await -= 1;
} else if !*self.is_write.read().await && *self.reference.read().await > 0 {
*self.reference.write().await -= 1;
unlocked = true;
}
@@ -129,7 +129,7 @@ impl LRWMutex {
}
pub async fn force_un_lock(&self) {
*self.refrence.write().await = 0;
*self.reference.write().await = 0;
*self.is_write.write().await = false;
}
}

View File

@@ -36,6 +36,7 @@ use rustfs_config::{
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::fs;
use std::io::IsTerminal;
use tracing::info;
use tracing_error::ErrorLayer;
@@ -295,6 +296,21 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let log_directory = config.log_directory.as_deref().unwrap_or(DEFAULT_LOG_DIR);
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("Failed to create log directory {log_directory}: {e}");
}
#[cfg(unix)]
{
// Linux/macOS Setting Permissions
// Set the log directory permissions to 755 (rwxr-xr-x)
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => eprintln!("Failed to set log directory permissions {log_directory}: {e}"),
}
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
@@ -354,13 +370,15 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suffix("log"),
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.duplicate_to_stdout(level_filter) // Use dynamic levels
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.write_mode(WriteMode::Async)
.write_mode(WriteMode::BufferAndFlush)
.append() // Avoid clearing existing logs at startup
.print_message() // Startup information output to console
.start();
if let Ok(logger) = flexi_logger_result {
@@ -420,7 +438,7 @@ fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record:
writeln!(
w,
"[{}] {} [{}] [{}:{}] [{}:{}] {}",
now.now().format("%Y-%m-%d %H:%M:%S%.6f"),
now.now().format(flexi_logger::TS_DASHES_BLANK_COLONS_DOT_BLANK),
level_style.paint(level.to_string()),
Color::Magenta.paint(record.target()),
Color::Blue.paint(record.file().unwrap_or("unknown")),
@@ -444,7 +462,7 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
writeln!(
w,
"[{}] {} [{}] [{}:{}] [{}:{}] {}",
now.now().format("%Y-%m-%d %H:%M:%S%.6f"),
now.now().format(flexi_logger::TS_DASHES_BLANK_COLONS_DOT_BLANK),
level,
record.target(),
record.file().unwrap_or("unknown"),

View File

@@ -107,7 +107,7 @@ mod tests {
#[test_case("jwt:dwebsite/aaa")]
#[test_case("sfvc:DuratdionSeconds")]
#[test_case("svc:DursationSeconds/aaa")]
fn test_deserialize_falied(key: &str) {
fn test_deserialize_failed(key: &str) {
let val = serde_json::from_str::<Key>(key);
assert!(val.is_err());
}

View File

@@ -30,9 +30,9 @@ use super::{
pub struct ResourceSet(pub HashSet<Resource>);
impl ResourceSet {
pub fn is_match(&self, resource: &str, conditons: &HashMap<String, Vec<String>>) -> bool {
pub fn is_match(&self, resource: &str, conditions: &HashMap<String, Vec<String>>) -> bool {
for re in self.0.iter() {
if re.is_match(resource, conditons) {
if re.is_match(resource, conditions) {
return true;
}
}
@@ -85,14 +85,14 @@ pub enum Resource {
impl Resource {
pub const S3_PREFIX: &'static str = "arn:aws:s3:::";
pub fn is_match(&self, resource: &str, conditons: &HashMap<String, Vec<String>>) -> bool {
pub fn is_match(&self, resource: &str, conditions: &HashMap<String, Vec<String>>) -> bool {
let mut pattern = match self {
Resource::S3(s) => s.to_owned(),
Resource::Kms(s) => s.to_owned(),
};
if !conditons.is_empty() {
if !conditions.is_empty() {
for key in KeyName::COMMON_KEYS {
if let Some(rvalue) = conditons.get(key.name()) {
if let Some(rvalue) = conditions.get(key.name()) {
if matches!(rvalue.first().map(|c| !c.is_empty()), Some(true)) {
pattern = pattern.replace(&key.var_name(), &rvalue[0]);
}

View File

@@ -74,7 +74,7 @@ use tracing::{error, info, warn};
pub mod bucket_meta;
pub mod event;
pub mod group;
pub mod policys;
pub mod policies;
pub mod pools;
pub mod rebalance;
pub mod service_account;
@@ -798,9 +798,9 @@ pub struct GetReplicationMetricsHandler {}
impl Operation for GetReplicationMetricsHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
error!("GetReplicationMetricsHandler");
let querys = extract_query_params(&_req.uri);
if let Some(bucket) = querys.get("bucket") {
error!("get bucket:{} metris", bucket);
let queries = extract_query_params(&_req.uri);
if let Some(bucket) = queries.get("bucket") {
error!("get bucket:{} metrics", bucket);
}
//return Err(s3_error!(InvalidArgument, "Invalid bucket name"));
//Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header))
@@ -815,7 +815,7 @@ impl Operation for SetRemoteTargetHandler {
//return Ok(S3Response::new((StatusCode::OK, Body::from("OK".to_string()))));
// println!("handle MetricsHandler, params: {:?}", _req.input);
info!("SetRemoteTargetHandler params: {:?}", _req.credentials);
let querys = extract_query_params(&_req.uri);
let queries = extract_query_params(&_req.uri);
let Some(_cred) = _req.credentials else {
error!("credentials null");
return Err(s3_error!(InvalidRequest, "get cred failed"));
@@ -825,7 +825,7 @@ impl Operation for SetRemoteTargetHandler {
//println!("body: {}", std::str::from_utf8(&body.clone()).unwrap());
//println!("bucket is:{}", bucket.clone());
if let Some(bucket) = querys.get("bucket") {
if let Some(bucket) = queries.get("bucket") {
if bucket.is_empty() {
info!("have bucket: {}", bucket);
return Ok(S3Response::new((StatusCode::OK, Body::from("fuck".to_string()))));
@@ -842,7 +842,7 @@ impl Operation for SetRemoteTargetHandler {
{
Ok(info) => {
info!("Bucket Info: {:?}", info);
if !info.versionning {
if !info.versioning {
return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("bucket need versioned".to_string()))));
}
}
@@ -932,13 +932,13 @@ impl Operation for ListRemoteTargetHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("list GetRemoteTargetHandler, params: {:?}", _req.credentials);
let querys = extract_query_params(&_req.uri);
let queries = extract_query_params(&_req.uri);
let Some(_cred) = _req.credentials else {
error!("credentials null");
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
if let Some(bucket) = querys.get("bucket") {
if let Some(bucket) = queries.get("bucket") {
if bucket.is_empty() {
error!("bucket parameter is empty");
return Ok(S3Response::new((
@@ -957,7 +957,7 @@ impl Operation for ListRemoteTargetHandler {
{
Ok(info) => {
info!("Bucket Info: {:?}", info);
if !info.versionning {
if !info.versioning {
return Ok(S3Response::new((
StatusCode::FORBIDDEN,
Body::from("Bucket needs versioning".to_string()),
@@ -1009,8 +1009,8 @@ pub struct RemoveRemoteTargetHandler {}
impl Operation for RemoveRemoteTargetHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
debug!("remove remote target called");
let querys = extract_query_params(&_req.uri);
let Some(bucket) = querys.get("bucket") else {
let queries = extract_query_params(&_req.uri);
let Some(bucket) = queries.get("bucket") else {
return Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Bucket parameter is required".to_string()),
@@ -1019,7 +1019,7 @@ impl Operation for RemoveRemoteTargetHandler {
let mut need_delete = true;
if let Some(arnstr) = querys.get("arn") {
if let Some(arnstr) = queries.get("arn") {
let _arn = bucket_targets::ARN::parse(arnstr);
match get_replication_config(bucket).await {

View File

@@ -135,7 +135,7 @@ impl Operation for AddServiceAccount {
let is_svc_acc = target_user == req_user || target_user == req_parent_user;
let mut taget_groups = None;
let mut target_groups = None;
let mut opts = NewServiceAccountOpts {
access_key: create_req.access_key,
secret_key: create_req.secret_key,
@@ -154,7 +154,7 @@ impl Operation for AddServiceAccount {
target_user = req_parent_user;
}
taget_groups = req_groups;
target_groups = req_groups;
if let Some(claims) = cred.claims {
if opts.claims.is_none() {
@@ -172,7 +172,7 @@ impl Operation for AddServiceAccount {
}
let (new_cred, _) = iam_store
.new_service_account(&target_user, taget_groups, opts)
.new_service_account(&target_user, target_groups, opts)
.await
.map_err(|e| {
debug!("create service account failed, e: {:?}", e);

View File

@@ -545,7 +545,7 @@ impl Operation for ExportIam {
USER_POLICY_MAPPINGS_FILE => {
let mut user_policy_mappings: HashMap<String, MappedPolicy> = HashMap::new();
iam_store
.load_mapped_policys(UserType::Reg, false, &mut user_policy_mappings)
.load_mapped_policies(UserType::Reg, false, &mut user_policy_mappings)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
@@ -561,7 +561,7 @@ impl Operation for ExportIam {
GROUP_POLICY_MAPPINGS_FILE => {
let mut group_policy_mappings = HashMap::new();
iam_store
.load_mapped_policys(UserType::Reg, true, &mut group_policy_mappings)
.load_mapped_policies(UserType::Reg, true, &mut group_policy_mappings)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
@@ -577,7 +577,7 @@ impl Operation for ExportIam {
STS_USER_POLICY_MAPPINGS_FILE => {
let mut sts_user_policy_mappings: HashMap<String, MappedPolicy> = HashMap::new();
iam_store
.load_mapped_policys(UserType::Sts, false, &mut sts_user_policy_mappings)
.load_mapped_policies(UserType::Sts, false, &mut sts_user_policy_mappings)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let json_str = serde_json::to_vec(&sts_user_policy_mappings)

View File

@@ -20,7 +20,7 @@ pub mod utils;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::{
bucket_meta, group, policys, pools, rebalance,
bucket_meta, group, policies, pools, rebalance,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
sts, tier, user,
};
@@ -333,35 +333,35 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-canned-policies").as_str(),
AdminOperation(&policys::ListCannedPolicies {}),
AdminOperation(&policies::ListCannedPolicies {}),
)?;
// info-canned-policy?name=xxx
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/info-canned-policy").as_str(),
AdminOperation(&policys::InfoCannedPolicy {}),
AdminOperation(&policies::InfoCannedPolicy {}),
)?;
// add-canned-policy?name=xxx
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/add-canned-policy").as_str(),
AdminOperation(&policys::AddCannedPolicy {}),
AdminOperation(&policies::AddCannedPolicy {}),
)?;
// remove-canned-policy?name=xxx
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-canned-policy").as_str(),
AdminOperation(&policys::RemoveCannedPolicy {}),
AdminOperation(&policies::RemoveCannedPolicy {}),
)?;
// set-user-or-group-policy?policyName=xxx&userOrGroup=xxx&isGroup=xxx
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-user-or-group-policy").as_str(),
AdminOperation(&policys::SetPolicyForUserOrGroup {}),
AdminOperation(&policies::SetPolicyForUserOrGroup {}),
)?;
Ok(())

View File

@@ -740,7 +740,7 @@ impl S3 for FS {
if let Some(part_num) = part_number {
if part_num == 0 {
return Err(s3_error!(InvalidArgument, "part_numer invalid"));
return Err(s3_error!(InvalidArgument, "Invalid part number: part number must be greater than 0"));
}
}
@@ -882,7 +882,7 @@ impl S3 for FS {
if let Some(part_num) = part_number {
if part_num == 0 {
return Err(s3_error!(InvalidArgument, "part_numer invalid"));
return Err(s3_error!(InvalidArgument, "part_number invalid"));
}
}
@@ -1941,7 +1941,7 @@ impl S3 for FS {
let conditions = get_condition_values(&req.headers, &auth::Credentials::default());
let read_olny = PolicySys::is_allowed(&BucketPolicyArgs {
let read_only = PolicySys::is_allowed(&BucketPolicyArgs {
bucket: &bucket,
action: Action::S3Action(S3Action::ListBucketAction),
is_owner: false,
@@ -1952,7 +1952,7 @@ impl S3 for FS {
})
.await;
let write_olny = PolicySys::is_allowed(&BucketPolicyArgs {
let write_only = PolicySys::is_allowed(&BucketPolicyArgs {
bucket: &bucket,
action: Action::S3Action(S3Action::PutObjectAction),
is_owner: false,
@@ -1963,7 +1963,7 @@ impl S3 for FS {
})
.await;
let is_public = read_olny && write_olny;
let is_public = read_only && write_only;
let output = GetBucketPolicyStatusOutput {
policy_status: Some(PolicyStatus {
@@ -1996,9 +1996,9 @@ impl S3 for FS {
}
};
let policys = try_!(serde_json::to_string(&cfg));
let policies = try_!(serde_json::to_string(&cfg));
Ok(S3Response::new(GetBucketPolicyOutput { policy: Some(policys) }))
Ok(S3Response::new(GetBucketPolicyOutput { policy: Some(policies) }))
}
async fn put_bucket_policy(&self, req: S3Request<PutBucketPolicyInput>) -> S3Result<S3Response<PutBucketPolicyOutput>> {
@@ -2692,7 +2692,7 @@ impl S3 for FS {
for batch in results {
csv_writer
.write(&batch)
.map_err(|e| s3_error!(InternalError, "cann't encode output to csv. e: {}", e.to_string()))?;
.map_err(|e| s3_error!(InternalError, "can't encode output to csv. e: {}", e.to_string()))?;
}
} else if input.request.output_serialization.json.is_some() {
let mut json_writer = JsonWriterBuilder::new()
@@ -2701,13 +2701,16 @@ impl S3 for FS {
for batch in results {
json_writer
.write(&batch)
.map_err(|e| s3_error!(InternalError, "cann't encode output to json. e: {}", e.to_string()))?;
.map_err(|e| s3_error!(InternalError, "can't encode output to json. e: {}", e.to_string()))?;
}
json_writer
.finish()
.map_err(|e| s3_error!(InternalError, "writer output into json error, e: {}", e.to_string()))?;
} else {
return Err(s3_error!(InvalidArgument, "unknow output format"));
return Err(s3_error!(
InvalidArgument,
"Unsupported output format. Supported formats are CSV and JSON"
));
}
let (tx, rx) = mpsc::channel::<S3Result<SelectObjectContentEvent>>(2);

View File

@@ -64,8 +64,7 @@ export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB
#
export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs/rustfs.log"
export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs"
export RUSTFS_SINKS_FILE_BUFFER_SIZE=12
export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000
export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100