refactor: standardize constant management and fix typos (#387)

* init rustfs config

* init rustfs-utils crate

* improve code for rustfs-config crate

* add

* improve code for comment

* init rustfs config

* improve code for rustfs-config crate

* add

* improve code for comment

* Unified management of configurations and constants

* fix: modify rustfs-config crate name

* add default fn

* improve code for rustfs config

* refactor: standardize constant management and fix typos

- Create centralized constants module for global static constants
- Replace runtime format! expressions with compile-time constants
- Fix DEFAULT_PORT reference issues in configuration arguments
- Use const-str crate for compile-time string concatenation
- Update tokio dependency from 1.42.2 to 1.45.0
- Ensure consistent naming convention for configuration constants

* fix

* Update common/workers/src/workers.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
houseme
2025-05-07 17:23:22 +08:00
committed by GitHub
parent de275b0510
commit 29ddf4dbc8
38 changed files with 637 additions and 205 deletions

32
Cargo.lock generated
View File

@@ -3107,6 +3107,7 @@ dependencies = [
"reqwest",
"rmp",
"rmp-serde",
"rustfs-config",
"s3s",
"s3s-policy",
"serde",
@@ -7258,7 +7259,6 @@ dependencies = [
"iam",
"lazy_static",
"libsystemd",
"local-ip-address",
"lock",
"madmin",
"matchit",
@@ -7273,11 +7273,11 @@ dependencies = [
"query",
"rmp-serde",
"rust-embed",
"rustfs-config",
"rustfs-event-notifier",
"rustfs-obs",
"rustfs-utils",
"rustls 0.23.27",
"rustls-pemfile",
"rustls-pki-types",
"s3s",
"serde",
"serde_json",
@@ -7299,6 +7299,16 @@ dependencies = [
"uuid",
]
[[package]]
name = "rustfs-config"
version = "0.0.1"
dependencies = [
"config",
"const-str",
"serde",
"serde_json",
]
[[package]]
name = "rustfs-event-notifier"
version = "0.0.1"
@@ -7373,6 +7383,18 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "rustfs-utils"
version = "0.0.1"
dependencies = [
"local-ip-address",
"rustfs-config",
"rustls 0.23.27",
"rustls-pemfile",
"rustls-pki-types",
"tracing",
]
[[package]]
name = "rustix"
version = "0.38.44"
@@ -8611,9 +8633,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.44.2"
version = "1.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48"
checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165"
dependencies = [
"backtrace",
"bytes",

View File

@@ -1,21 +1,23 @@
[workspace]
members = [
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"appauth", # Application authentication and authorization
"cli/rustfs-gui", # Graphical user interface client
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"crates/config", # Configuration management
"crates/event-notifier", # Event notification system
"crates/obs", # Observability utilities
"crates/utils", # Utility functions and helpers
"crypto", # Cryptography and security features
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"iam", # Identity and Access Management
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"appauth", # Application authentication and authorization
]
resolver = "2"
@@ -45,8 +47,10 @@ policy = { path = "./policy", version = "0.0.1" }
protos = { path = "./common/protos", version = "0.0.1" }
query = { path = "./s3select/query", version = "0.0.1" }
rustfs = { path = "./rustfs", version = "0.0.1" }
rustfs-config = { path = "./crates/config", version = "0.0.1" }
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" }
rustfs-utils = { path = "crates/utils", version = "0.0.1" }
workers = { path = "./common/workers", version = "0.0.1" }
atoi = "2.0.0"
async-recursion = "1.1.1"
@@ -62,6 +66,7 @@ bytesize = "2.0.1"
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.37", features = ["derive", "env"] }
config = "0.15.11"
const-str = { version = "0.6.2", features = ["std", "proc"] }
datafusion = "46.0.1"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
@@ -150,6 +155,7 @@ serde_with = "3.12.0"
sha2 = "0.10.8"
smallvec = { version = "1.15.0", features = ["serde"] }
snafu = "0.8.5"
socket2 = "0.5.9"
strum = { version = "0.27.1", features = ["derive"] }
sysinfo = "0.34.2"
tempfile = "3.19.1"
@@ -162,7 +168,7 @@ time = { version = "0.3.41", features = [
"macros",
"serde",
] }
tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] }
tokio = { version = "1.45.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.13.1", features = ["gzip"] }
tonic-build = { version = "0.13.1" }
tokio-rustls = { version = "0.26.2", default-features = false }

View File

@@ -3,13 +3,13 @@ use tokio::sync::{Mutex, Notify};
use tracing::info;
pub struct Workers {
available: Mutex<usize>, // 可用的工作槽
notify: Notify, // 用于通知等待的任务
limit: usize, // 最大并发工作数
available: Mutex<usize>, // Available working slots
notify: Notify, // Used to notify waiting tasks
limit: usize, // Maximum number of concurrent jobs
}
impl Workers {
// 创建 Workers 对象,允许最多 n 个作业并发执行
// Create a Workers object that allows up to n jobs to execute concurrently
pub fn new(n: usize) -> Result<Arc<Workers>, &'static str> {
if n == 0 {
return Err("n must be > 0");
@@ -22,7 +22,7 @@ impl Workers {
}))
}
// 让一个作业获得执行的机会
// Give a job a chance to be executed
pub async fn take(&self) {
loop {
let mut available = self.available.lock().await;
@@ -37,15 +37,15 @@ impl Workers {
}
}
// 让一个作业释放其机会
// Release a job's slot
pub async fn give(&self) {
let mut available = self.available.lock().await;
info!("worker give, {}", *available);
*available += 1; // 增加可用槽
self.notify.notify_one(); // 通知一个等待的任务
*available += 1; // Increase available slots
self.notify.notify_one(); // Notify a waiting task
}
// 等待所有并发作业完成
// Wait for all concurrent jobs to complete
pub async fn wait(&self) {
loop {
{
@@ -54,7 +54,7 @@ impl Workers {
break;
}
}
// 等待直到所有槽都被释放
// Wait until all slots are freed
self.notify.notified().await;
}
info!("worker wait end");

17
crates/config/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "rustfs-config"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
config = { workspace = true }
const-str = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,23 @@
use crate::event::config::EventConfig;
use crate::ObservabilityConfig;
/// RustFs configuration
pub struct RustFsConfig {
pub observability: ObservabilityConfig,
pub event: EventConfig,
}
impl RustFsConfig {
pub fn new() -> Self {
Self {
observability: ObservabilityConfig::new(),
event: EventConfig::new(),
}
}
}
impl Default for RustFsConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,72 @@
use const_str::concat;
/// Application name
/// Default value: RustFs
/// Environment variable: RUSTFS_APP_NAME
pub const APP_NAME: &str = "RustFs";
/// Application version
/// Default value: 1.0.0
/// Environment variable: RUSTFS_VERSION
pub const VERSION: &str = "0.0.1";
/// Default configuration logger level
/// Default value: info
/// Environment variable: RUSTFS_LOG_LEVEL
pub const DEFAULT_LOG_LEVEL: &str = "info";
/// maximum number of connections
/// This is the maximum number of connections that the server will accept.
/// This is used to limit the number of connections to the server.
pub const MAX_CONNECTIONS: usize = 100;
/// timeout for connections
/// This is the timeout for connections to the server.
/// This is used to limit the time that a connection can be open.
pub const DEFAULT_TIMEOUT_MS: u64 = 3000;
/// Default Access Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_ACCESS_KEY
/// Command line argument: --access-key
/// Example: RUSTFS_ACCESS_KEY=rustfsadmin
/// Example: --access-key rustfsadmin
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Default Secret Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_SECRET_KEY
/// Command line argument: --secret-key
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Default configuration file for observability
/// Default value: config/obs.toml
/// Environment variable: RUSTFS_OBS_CONFIG
/// Command line argument: --obs-config
/// Example: RUSTFS_OBS_CONFIG=config/obs.toml
/// Example: --obs-config config/obs.toml
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.
pub const RUSTFS_TLS_KEY: &str = "rustfs_key.pem";
/// Default TLS cert for rustfs
/// This is the default cert for TLS.
pub const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem";
/// Default port for rustfs
/// This is the default port for rustfs.
/// This is used to bind the server to a specific port.
pub const DEFAULT_PORT: u16 = 9000;
/// Default address for rustfs
/// This is the default address for rustfs.
pub const DEFAULT_ADDRESS: &str = concat!(":", DEFAULT_PORT);
/// Default port for rustfs console
/// This is the default port for rustfs console.
pub const DEFAULT_CONSOLE_PORT: u16 = 9002;
/// Default address for rustfs console
/// This is the default address for rustfs console.
pub const DEFAULT_CONSOLE_ADDRESS: &str = concat!(":", DEFAULT_CONSOLE_PORT);

View File

@@ -0,0 +1 @@
pub(crate) mod app;

View File

@@ -0,0 +1,23 @@
/// Event configuration module
pub struct EventConfig {
pub event_type: String,
pub event_source: String,
pub event_destination: String,
}
impl EventConfig {
/// Creates a new instance of `EventConfig` with default values.
pub fn new() -> Self {
Self {
event_type: "default".to_string(),
event_source: "default".to_string(),
event_destination: "default".to_string(),
}
}
}
impl Default for EventConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,17 @@
/// Event configuration module
pub struct EventConfig {
pub event_type: String,
pub event_source: String,
pub event_destination: String,
}
impl EventConfig {
/// Creates a new instance of `EventConfig` with default values.
pub fn new() -> Self {
Self {
event_type: "default".to_string(),
event_source: "default".to_string(),
event_destination: "default".to_string(),
}
}
}

View File

@@ -0,0 +1,2 @@
pub(crate) mod config;
pub(crate) mod event;

9
crates/config/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
use crate::observability::config::ObservabilityConfig;
mod config;
mod constants;
mod event;
mod observability;
pub use config::RustFsConfig;
pub use constants::app::*;

View File

@@ -0,0 +1,28 @@
use crate::observability::logger::LoggerConfig;
use crate::observability::otel::OtelConfig;
use crate::observability::sink::SinkConfig;
use serde::Deserialize;
/// Observability configuration
#[derive(Debug, Deserialize, Clone)]
pub struct ObservabilityConfig {
pub otel: OtelConfig,
pub sinks: SinkConfig,
pub logger: Option<LoggerConfig>,
}
impl ObservabilityConfig {
pub fn new() -> Self {
Self {
otel: OtelConfig::new(),
sinks: SinkConfig::new(),
logger: Some(LoggerConfig::new()),
}
}
}
impl Default for ObservabilityConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,25 @@
use serde::Deserialize;
/// File sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct FileSinkConfig {
pub path: String,
pub max_size: u64,
pub max_backups: u64,
}
impl FileSinkConfig {
pub fn new() -> Self {
Self {
path: "".to_string(),
max_size: 0,
max_backups: 0,
}
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,23 @@
use serde::Deserialize;
/// Kafka sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct KafkaSinkConfig {
pub brokers: Vec<String>,
pub topic: String,
}
impl KafkaSinkConfig {
pub fn new() -> Self {
Self {
brokers: vec!["localhost:9092".to_string()],
topic: "rustfs".to_string(),
}
}
}
impl Default for KafkaSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,21 @@
use serde::Deserialize;
/// Logger configuration
#[derive(Debug, Deserialize, Clone)]
pub struct LoggerConfig {
pub queue_capacity: Option<usize>,
}
impl LoggerConfig {
pub fn new() -> Self {
Self {
queue_capacity: Some(10000),
}
}
}
impl Default for LoggerConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,8 @@
pub(crate) mod config;
pub(crate) mod file_sink;
pub(crate) mod kafka_sink;
pub(crate) mod logger;
pub(crate) mod observability;
pub(crate) mod otel;
pub(crate) mod sink;
pub(crate) mod webhook_sink;

View File

@@ -0,0 +1,22 @@
use crate::observability::logger::LoggerConfig;
use crate::observability::otel::OtelConfig;
use crate::observability::sink::SinkConfig;
use serde::Deserialize;
/// Observability configuration
#[derive(Debug, Deserialize, Clone)]
pub struct ObservabilityConfig {
pub otel: OtelConfig,
pub sinks: SinkConfig,
pub logger: Option<LoggerConfig>,
}
impl ObservabilityConfig {
pub fn new() -> Self {
Self {
otel: OtelConfig::new(),
sinks: SinkConfig::new(),
logger: Some(LoggerConfig::new()),
}
}
}

View File

@@ -0,0 +1,27 @@
use serde::Deserialize;
/// OpenTelemetry configuration
#[derive(Debug, Deserialize, Clone)]
pub struct OtelConfig {
pub endpoint: String,
pub service_name: String,
pub service_version: String,
pub resource_attributes: Vec<String>,
}
impl OtelConfig {
pub fn new() -> Self {
Self {
endpoint: "http://localhost:4317".to_string(),
service_name: "rustfs".to_string(),
service_version: "0.1.0".to_string(),
resource_attributes: vec![],
}
}
}
impl Default for OtelConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,28 @@
use crate::observability::file_sink::FileSinkConfig;
use crate::observability::kafka_sink::KafkaSinkConfig;
use crate::observability::webhook_sink::WebhookSinkConfig;
use serde::Deserialize;
/// Sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct SinkConfig {
pub kafka: Option<KafkaSinkConfig>,
pub webhook: Option<WebhookSinkConfig>,
pub file: Option<FileSinkConfig>,
}
impl SinkConfig {
pub fn new() -> Self {
Self {
kafka: None,
webhook: None,
file: Some(FileSinkConfig::new()),
}
}
}
impl Default for SinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,25 @@
use serde::Deserialize;
/// Webhook sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct WebhookSinkConfig {
pub url: String,
pub method: String,
pub headers: Vec<(String, String)>,
}
impl WebhookSinkConfig {
pub fn new() -> Self {
Self {
url: "http://localhost:8080/webhook".to_string(),
method: "POST".to_string(),
headers: vec![],
}
}
}
impl Default for WebhookSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -162,7 +162,7 @@ impl NotifierConfig {
}
}
const DEFAULT_CONFIG_FILE: &str = "obs";
const DEFAULT_CONFIG_FILE: &str = "event";
/// Provide temporary directories as default storage paths
fn default_store_path() -> String {

View File

@@ -154,16 +154,6 @@ impl Default for Metadata {
}
}
impl Metadata {
/// Create a new Metadata instance
pub fn create(schema_version: String, configuration_id: String, bucket: Bucket, object: Object) -> Self {
Self {
schema_version,
configuration_id,
bucket,
object,
}
}
/// Create a new Metadata instance with default values
pub fn new() -> Self {
Self {
@@ -178,6 +168,16 @@ impl Metadata {
}
}
/// Create a new Metadata instance
pub fn create(schema_version: String, configuration_id: String, bucket: Bucket, object: Object) -> Self {
Self {
schema_version,
configuration_id,
bucket,
object,
}
}
/// Set the schema version
pub fn set_schema_version(&mut self, schema_version: String) {
self.schema_version = schema_version;
@@ -470,17 +470,7 @@ pub struct Log {
pub records: Vec<Event>,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
SerializeDisplay,
DeserializeFromStr,
Display,
EnumString
)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, SerializeDisplay, DeserializeFromStr, Display, EnumString)]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
pub enum Name {
ObjectAccessedGet,

18
crates/utils/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "rustfs-utils"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
local-ip-address = { workspace = true }
rustfs-config = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
rustls-pki-types = { workspace = true }
tracing = { workspace = true }
[lints]
workspace = true

View File

@@ -1,4 +1,4 @@
use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustls::server::{ClientHello, ResolvesServerCert, ResolvesServerCertUsingSni};
use rustls::sign::CertifiedKey;
use rustls_pemfile::{certs, private_key};
@@ -12,34 +12,37 @@ use tracing::{debug, warn};
/// Load public certificate from file.
/// This function loads a public certificate from the specified file.
pub(crate) fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
pub fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
// Open certificate file.
let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let cert_file = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(cert_file);
// Load and return certificate.
let certs = certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| error(format!("certificate file {} format error", filename)))?;
.map_err(|_| certs_error(format!("certificate file {} format error", filename)))?;
if certs.is_empty() {
return Err(error(format!("No valid certificate was found in the certificate file {}", filename)));
return Err(certs_error(format!(
"No valid certificate was found in the certificate file {}",
filename
)));
}
Ok(certs)
}
/// Load private key from file.
/// This function loads a private key from the specified file.
pub(crate) fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
pub fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
// Open keyfile.
let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let keyfile = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(keyfile);
// Load and return a single private key.
private_key(&mut reader)?.ok_or_else(|| error(format!("no private key found in {}", filename)))
private_key(&mut reader)?.ok_or_else(|| certs_error(format!("no private key found in {}", filename)))
}
/// error function
pub(crate) fn error(err: String) -> Error {
pub fn certs_error(err: String) -> Error {
Error::new(io::ErrorKind::Other, err)
}
@@ -47,14 +50,14 @@ pub(crate) fn error(err: String) -> Error {
/// This function loads all certificate and private key pairs from the specified directory.
/// It looks for files named `rustfs_cert.pem` and `rustfs_key.pem` in each subdirectory.
/// The root directory can also contain a default certificate/private key pair.
pub(crate) fn load_all_certs_from_directory(
pub fn load_all_certs_from_directory(
dir_path: &str,
) -> io::Result<HashMap<String, (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>> {
let mut cert_key_pairs = HashMap::new();
let dir = Path::new(dir_path);
if !dir.exists() || !dir.is_dir() {
return Err(error(format!(
return Err(certs_error(format!(
"The certificate directory does not exist or is not a directory: {}",
dir_path
)));
@@ -68,10 +71,10 @@ pub(crate) fn load_all_certs_from_directory(
debug!("find the root directory certificate: {:?}", root_cert_path);
let root_cert_str = root_cert_path
.to_str()
.ok_or_else(|| error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?;
.ok_or_else(|| certs_error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?;
let root_key_str = root_key_path
.to_str()
.ok_or_else(|| error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?;
.ok_or_else(|| certs_error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?;
match load_cert_key_pair(root_cert_str, root_key_str) {
Ok((certs, key)) => {
// The root directory certificate is used as the default certificate and is stored using special keys.
@@ -92,7 +95,7 @@ pub(crate) fn load_all_certs_from_directory(
let domain_name = path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| error(format!("invalid domain name directory:{:?}", path)))?;
.ok_or_else(|| certs_error(format!("invalid domain name directory:{:?}", path)))?;
// find certificate and private key files
let cert_path = path.join(RUSTFS_TLS_CERT); // e.g., rustfs_cert.pem
@@ -113,7 +116,10 @@ pub(crate) fn load_all_certs_from_directory(
}
if cert_key_pairs.is_empty() {
return Err(error(format!("No valid certificate/private key pair found in directory {}", dir_path)));
return Err(certs_error(format!(
"No valid certificate/private key pair found in directory {}",
dir_path
)));
}
Ok(cert_key_pairs)
@@ -159,7 +165,7 @@ pub fn create_multi_cert_resolver(
for (domain, (certs, key)) in cert_key_pairs {
// create a signature
let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key)
.map_err(|_| error(format!("unsupported private key types:{}", domain)))?;
.map_err(|_| certs_error(format!("unsupported private key types:{}", domain)))?;
// create a CertifiedKey
let certified_key = CertifiedKey::new(certs, signing_key);
@@ -169,7 +175,7 @@ pub fn create_multi_cert_resolver(
// add certificate to resolver
resolver
.add(&domain, certified_key)
.map_err(|e| error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?;
.map_err(|e| certs_error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?;
}
}

43
crates/utils/src/ip.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::net::{IpAddr, Ipv4Addr};
/// Get the IP address of the machine
///
/// Priority is given to trying to get the IPv4 address, and if it fails, try to get the IPv6 address.
/// If both fail to retrieve, None is returned.
///
/// # Returns
///
/// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6)
/// * `None` - Unable to obtain any native IP address
pub fn get_local_ip() -> Option<IpAddr> {
local_ip_address::local_ip()
.ok()
.or_else(|| local_ip_address::local_ipv6().ok())
}
/// Get the IP address of the machine as a string
///
/// If the IP address cannot be obtained, returns "127.0.0.1" as the default value.
///
/// # Returns
///
/// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value
pub fn get_local_ip_with_default() -> String {
get_local_ip()
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_local_ip() {
match get_local_ip() {
Some(ip) => println!("the ip address of this machine:{}", ip),
None => println!("Unable to obtain the IP address of the machine"),
}
assert!(get_local_ip().is_some());
}
}

11
crates/utils/src/lib.rs Normal file
View File

@@ -0,0 +1,11 @@
mod certs;
mod ip;
mod net;
pub use certs::certs_error;
pub use certs::create_multi_cert_resolver;
pub use certs::load_all_certs_from_directory;
pub use certs::load_certs;
pub use certs::load_private_key;
pub use ip::get_local_ip;
pub use ip::get_local_ip_with_default;

0
crates/utils/src/net.rs Normal file
View File

View File

@@ -11,6 +11,7 @@ rust-version.workspace = true
workspace = true
[dependencies]
rustfs-config = { workspace = true }
async-trait.workspace = true
backon.workspace = true
blake2 = { workspace = true }

View File

@@ -20,8 +20,6 @@ pub const DISK_MIN_INODES: u64 = 1000;
pub const DISK_FILL_FRACTION: f64 = 0.99;
pub const DISK_RESERVE_FRACTION: f64 = 0.15;
pub const DEFAULT_PORT: u16 = 9000;
lazy_static! {
static ref GLOBAL_RUSTFS_PORT: OnceLock<u16> = OnceLock::new();
pub static ref GLOBAL_OBJECT_API: OnceLock<Arc<ECStore>> = OnceLock::new();
@@ -41,31 +39,37 @@ lazy_static! {
pub static ref GLOBAL_BOOT_TIME: OnceCell<SystemTime> = OnceCell::new();
}
/// Get the global rustfs port
pub fn global_rustfs_port() -> u16 {
if let Some(p) = GLOBAL_RUSTFS_PORT.get() {
*p
} else {
DEFAULT_PORT
rustfs_config::DEFAULT_PORT
}
}
/// Set the global rustfs port
pub fn set_global_rustfs_port(value: u16) {
GLOBAL_RUSTFS_PORT.set(value).expect("set_global_rustfs_port fail");
}
/// Get the global rustfs port
pub fn set_global_deployment_id(id: Uuid) {
globalDeploymentIDPtr.set(id).unwrap();
}
/// Get the global deployment id
pub fn get_global_deployment_id() -> Option<String> {
globalDeploymentIDPtr.get().map(|v| v.to_string())
}
/// Get the global deployment id
pub fn set_global_endpoints(eps: Vec<PoolEndpoints>) {
GLOBAL_Endpoints
.set(EndpointServerPools::from(eps))
.expect("GLOBAL_Endpoints set failed")
}
/// Get the global endpoints
pub fn get_global_endpoints() -> EndpointServerPools {
if let Some(eps) = GLOBAL_Endpoints.get() {
eps.clone()

View File

@@ -17,7 +17,7 @@ use crate::{
global::GLOBAL_IsDistErasure,
heal::heal_commands::{HealStartSuccess, HEAL_UNKNOWN_SCAN},
new_object_layer_fn,
utils::path::has_profix,
utils::path::has_prefix,
};
use crate::{
heal::heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT},
@@ -786,7 +786,7 @@ impl AllHealState {
let _ = self.mu.write().await;
for (k, v) in self.heal_seq_map.read().await.iter() {
if (has_profix(k, path_s) || has_profix(path_s, k)) && !v.has_ended().await {
if (has_prefix(k, path_s) || has_prefix(path_s, k)) && !v.has_ended().await {
return Err(Error::from_string(format!(
"The provided heal sequence path overlaps with an existing heal path: {}",
k

View File

@@ -52,7 +52,7 @@ pub fn strings_has_prefix_fold(s: &str, prefix: &str) -> bool {
s.len() >= prefix.len() && (s[..prefix.len()] == *prefix || s[..prefix.len()].eq_ignore_ascii_case(prefix))
}
pub fn has_profix(s: &str, prefix: &str) -> bool {
pub fn has_prefix(s: &str, prefix: &str) -> bool {
if cfg!(target_os = "windows") {
return strings_has_prefix_fold(s, prefix);
}

View File

@@ -30,7 +30,7 @@ clap.workspace = true
crypto = { workspace = true }
datafusion = { workspace = true }
common.workspace = true
const-str = { version = "0.6.1", features = ["std", "proc"] }
const-str = { workspace = true }
ecstore.workspace = true
policy.workspace = true
flatbuffers.workspace = true
@@ -42,7 +42,6 @@ http.workspace = true
http-body.workspace = true
iam = { workspace = true }
lock.workspace = true
local-ip-address = { workspace = true }
matchit = { workspace = true }
mime.workspace = true
mime_guess = { workspace = true }
@@ -51,18 +50,18 @@ pin-project-lite.workspace = true
protos.workspace = true
query = { workspace = true }
rmp-serde.workspace = true
rustfs-config = { workspace = true }
rustfs-event-notifier = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true }
rustls.workspace = true
rustls-pemfile.workspace = true
rustls-pki-types.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_urlencoded = { workspace = true }
shadow-rs = { workspace = true, features = ["build", "metadata"] }
socket2 = "0.5.9"
socket2 = { workspace = true }
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util.workspace = true

View File

@@ -1,30 +1,36 @@
rustfs/
├── Cargo.toml
├── src/
│ ├── main.rs # 主入口
│ ├── admin/
│ │ └── mod.rs # 管理接口
│ ├── auth/
│ │ └── mod.rs # 认证模块
│ ├── config/
│ │ ├── mod.rs # 配置模块
│ │ └── options.rs # 命令行参数
│ ├── console/
│ │ ├── mod.rs # 控制台模块
│ │ └── server.rs # 控制台服务器
│ ├── grpc/
│ │ └── mod.rs # gRPC 服务
│ ├── license/
│ │ └── mod.rs # 许可证管理
│ ├── logging/
│ │ └── mod.rs # 日志管理
│ ├── server/
│ │ ├── mod.rs # 服务器实现
│ │ ├── connection.rs # 连接处理
│ │ ├── service.rs # 服务实现
│ │ └── state.rs # 状态管理
│ ├── storage/
│ │ ├── mod.rs # 存储模块
│ │ └── fs.rs # 文件系统实现
│ └── utils/
│ └── mod.rs # 工具函数
# RustFS
RustFS is a simple file system written in Rust. It is designed to be a learning project for those who want to understand
how file systems work and how to implement them in Rust.
## Features
- Simple file system structure
- Basic file operations (create, read, write, delete)
- Directory support
- File metadata (size, creation time, etc.)
- Basic error handling
- Unit tests for core functionality
- Documentation for public API
- Example usage
- License information
- Contributing guidelines
- Changelog
- Code of conduct
- Acknowledgements
- Contact information
- Links to additional resources
## Getting Started
To get started with RustFS, clone the repository and build the project:
```bash
git clone git@github.com:rustfs/s3-rustfs.git
cd rustfs
cargo build
```
## Usage
To use RustFS, you can create a new file system instance and perform basic file operations. Here is an example:

View File

@@ -1,40 +1,8 @@
use clap::Parser;
use const_str::concat;
use ecstore::global::DEFAULT_PORT;
use std::string::ToString;
shadow_rs::shadow!(build);
/// Default Access Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_ACCESS_KEY
/// Command line argument: --access-key
/// Example: RUSTFS_ACCESS_KEY=rustfsadmin
/// Example: --access-key rustfsadmin
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Default Secret Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_SECRET_KEY
/// Command line argument: --secret-key
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Default configuration file for observability
/// Default value: config/obs.toml
/// Environment variable: RUSTFS_OBS_CONFIG
/// Command line argument: --obs-config
/// Example: RUSTFS_OBS_CONFIG=config/obs.toml
/// Example: --obs-config config/obs.toml
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.
pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_key.pem";
/// Default TLS cert for rustfs
/// This is the default cert for TLS.
pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem";
#[allow(clippy::const_is_empty)]
const SHORT_VERSION: &str = {
if !build::TAG.is_empty() {
@@ -67,7 +35,7 @@ pub struct Opt {
pub volumes: Vec<String>,
/// bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname
#[arg(long, default_value_t = format!("0.0.0.0:{}", DEFAULT_PORT), env = "RUSTFS_ADDRESS")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_ADDRESS.to_string(), env = "RUSTFS_ADDRESS")]
pub address: String,
/// Domain name used for virtual-hosted-style requests.
@@ -75,17 +43,19 @@ pub struct Opt {
pub server_domains: Vec<String>,
/// Access key used for authentication.
#[arg(long, default_value_t = DEFAULT_ACCESS_KEY.to_string(), env = "RUSTFS_ACCESS_KEY")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_ACCESS_KEY.to_string(), env = "RUSTFS_ACCESS_KEY")]
pub access_key: String,
/// Secret key used for authentication.
#[arg(long, default_value_t = DEFAULT_SECRET_KEY.to_string(), env = "RUSTFS_SECRET_KEY")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_SECRET_KEY.to_string(), env = "RUSTFS_SECRET_KEY")]
pub secret_key: String,
/// Enable console server
#[arg(long, default_value_t = false, env = "RUSTFS_CONSOLE_ENABLE")]
pub console_enable: bool,
#[arg(long, default_value_t = format!("127.0.0.1:{}", 9002), env = "RUSTFS_CONSOLE_ADDRESS")]
/// Console server bind address
#[arg(long, default_value_t = rustfs_config::DEFAULT_CONSOLE_ADDRESS.to_string(), env = "RUSTFS_CONSOLE_ADDRESS")]
pub console_address: String,
/// rustfs endpoint for console
@@ -94,7 +64,7 @@ pub struct Opt {
/// Observability configuration file
/// Default value: config/obs.toml
#[arg(long, default_value_t = DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")]
pub obs_config: String,
/// tls path for rustfs api and console.

View File

@@ -1,4 +1,3 @@
use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use crate::license::get_license;
use axum::{
body::Body,
@@ -8,6 +7,7 @@ use axum::{
Router,
};
use axum_extra::extract::Host;
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use std::io;
use axum::response::Redirect;
@@ -17,7 +17,7 @@ use mime_guess::from_path;
use rust_embed::RustEmbed;
use serde::Serialize;
use shadow_rs::shadow;
use std::net::{Ipv4Addr, SocketAddr};
use std::net::{IpAddr, SocketAddr};
use std::sync::OnceLock;
use std::time::Duration;
use tokio::signal;
@@ -73,7 +73,7 @@ pub(crate) struct Config {
}
impl Config {
fn new(local_ip: Ipv4Addr, port: u16, version: &str, date: &str) -> Self {
fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self {
Config {
port,
api: Api {
@@ -144,7 +144,7 @@ struct License {
pub(crate) static CONSOLE_CONFIG: OnceLock<Config> = OnceLock::new();
#[allow(clippy::const_is_empty)]
pub(crate) fn init_console_cfg(local_ip: Ipv4Addr, port: u16) {
pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) {
CONSOLE_CONFIG.get_or_init(|| {
let ver = {
if !build::TAG.is_empty() {
@@ -220,7 +220,7 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse {
pub async fn start_static_file_server(
addrs: &str,
local_ip: Ipv4Addr,
local_ip: IpAddr,
access_key: &str,
secret_key: &str,
tls_path: Option<String>,

21
rustfs/src/event.rs Normal file
View File

@@ -0,0 +1,21 @@
use rustfs_event_notifier::NotifierConfig;
use tracing::{error, info, instrument};
#[instrument]
pub(crate) async fn init_event_notifier(notifier_config: Option<String>) {
// Initialize event notifier
if notifier_config.is_some() {
info!("event_config is not empty");
tokio::spawn(async move {
let config = NotifierConfig::event_load_config(notifier_config);
let result = rustfs_event_notifier::initialize(config).await;
if let Err(e) = result {
error!("Failed to initialize event notifier: {}", e);
} else {
info!("Event notifier initialized successfully");
}
});
} else {
info!("event_config is empty");
}
}

View File

@@ -2,18 +2,18 @@ mod admin;
mod auth;
mod config;
mod console;
mod event;
mod grpc;
pub mod license;
mod logging;
mod server;
mod service;
mod storage;
mod utils;
use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use crate::utils::error;
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
@@ -21,7 +21,6 @@ use common::{
error::{Error, Result},
globals::set_global_addr,
};
use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
@@ -48,7 +47,7 @@ use hyper_util::{
use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_event_notifier::NotifierConfig;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
@@ -85,7 +84,7 @@ fn print_server_info() {
let cfg = CONSOLE_CONFIG.get().unwrap();
let current_year = chrono::Utc::now().year();
// 使用自定义宏打印服务器信息
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
info!("License: {}", cfg.license());
@@ -114,32 +113,12 @@ async fn main() -> Result<()> {
run(opt).await
}
#[instrument]
async fn init_event_notifier(notifier_config: Option<String>) {
// Initialize event notifier
if notifier_config.is_some() {
info!("event_config is not empty");
tokio::spawn(async move {
let config = NotifierConfig::event_load_config(notifier_config);
let result = rustfs_event_notifier::initialize(config).await;
if let Err(e) = result {
error!("Failed to initialize event notifier: {}", e);
} else {
info!("Event notifier initialized successfully");
}
});
} else {
info!("event_config is empty");
}
}
#[instrument(skip(opt))]
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Initialize event notifier
let notifier_config = opt.event_config;
init_event_notifier(notifier_config).await;
event::init_event_notifier(opt.event_config).await;
let server_addr = net::parse_and_resolve_address(opt.address.as_str())?;
let server_port = server_addr.port();
@@ -147,16 +126,17 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("server_address {}", &server_address);
//设置 AK SK
// Set up AK and SK
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?;
set_global_rustfs_port(server_port);
//监听地址,端口从参数中获取
// The listening address and port are obtained from the parameters
let listener = TcpListener::bind(server_address.clone()).await?;
//获取监听地址
// Obtain the listener address
let local_addr: SocketAddr = listener.local_addr()?;
let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
// let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
let local_ip = rustfs_utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
// 用于 rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone())
@@ -203,13 +183,13 @@ async fn run(opt: config::Opt) -> Result<()> {
set_global_endpoints(endpoint_pools.as_ref().clone());
update_erasure_type(setup_type).await;
// 初始化本地磁盘
// Initialize the local disk
init_local_disks(endpoint_pools.clone())
.await
.map_err(|err| Error::from_string(err.to_string()))?;
// Setup S3 service
// 本项目使用 s3s 库来实现 s3 服务
// This project uses the S3S library to implement S3 services
let s3_service = {
let store = storage::ecfs::FS::new();
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?);
@@ -217,7 +197,7 @@ async fn run(opt: config::Opt) -> Result<()> {
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
//显示 info 信息
// Displays info information
debug!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(IAMAuth::new(access_key, secret_key));
@@ -268,7 +248,7 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("Found TLS directory, checking for certificates");
// 1. Try to load all certificates directly (including root and subdirectories)
match utils::load_all_certs_from_directory(&tls_path) {
match rustfs_utils::load_all_certs_from_directory(&tls_path) {
Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => {
debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
@@ -276,7 +256,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// create a multi certificate configuration
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(utils::create_multi_cert_resolver(cert_key_pairs)?));
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
@@ -291,12 +271,14 @@ async fn run(opt: config::Opt) -> Result<()> {
if has_single_cert {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?;
let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?;
let certs =
rustfs_utils::load_certs(cert_path.as_str()).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let key = rustfs_utils::load_private_key(key_path.as_str())
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| error(e.to_string()))?;
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
} else {
@@ -446,7 +428,7 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("TLS certificates found, starting with SIGINT");
let tls_socket = match tls_acceptor
.as_ref()
.ok_or_else(|| error("TLS not configured".to_string()))
.ok_or_else(|| rustfs_utils::certs_error("TLS not configured".to_string()))
.unwrap()
.accept(socket)
.await

View File

@@ -1,18 +0,0 @@
mod certs;
use std::net::IpAddr;
pub(crate) use certs::create_multi_cert_resolver;
pub(crate) use certs::error;
pub(crate) use certs::load_all_certs_from_directory;
pub(crate) use certs::load_certs;
pub(crate) use certs::load_private_key;
/// Get the local IP address.
/// This function retrieves the local IP address of the machine.
pub(crate) fn get_local_ip() -> Option<std::net::Ipv4Addr> {
match local_ip_address::local_ip() {
Ok(IpAddr::V4(ip)) => Some(ip),
Err(_) => None,
Ok(IpAddr::V6(_)) => todo!(),
}
}