Merge pull request #313 from rustfs/feature/Systemd.service

feat: improve systemd integration and logging
This commit is contained in:
houseme
2025-04-11 18:40:23 +08:00
committed by GitHub
27 changed files with 617 additions and 282 deletions

4
.gitignore vendored
View File

@@ -9,6 +9,6 @@
rustfs/static/*
vendor
cli/rustfs-gui/embedded-rustfs/rustfs
config/obs.toml
deploy/config/obs.toml
*.log
config/certs/*
deploy/certs/*

40
Cargo.lock generated
View File

@@ -669,6 +669,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atomic_enum"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "autocfg"
version = "1.4.0"
@@ -7158,6 +7169,7 @@ dependencies = [
"appauth",
"async-trait",
"atoi",
"atomic_enum",
"axum",
"axum-extra",
"axum-server",
@@ -7167,7 +7179,6 @@ dependencies = [
"common",
"const-str",
"crypto",
"csv",
"datafusion",
"ecstore",
"flatbuffers",
@@ -7178,7 +7189,6 @@ dependencies = [
"hyper",
"hyper-util",
"iam",
"jsonwebtoken",
"lazy_static",
"libsystemd",
"local-ip-address",
@@ -7188,13 +7198,9 @@ dependencies = [
"mime",
"mime_guess",
"netif",
"once_cell",
"pin-project-lite",
"policy",
"prost",
"prost-build",
"prost-types",
"protobuf 3.7.2",
"protos",
"query",
"rmp-serde",
@@ -7215,15 +7221,10 @@ dependencies = [
"tokio-util",
"tonic 0.13.0",
"tonic-build",
"tonic-reflection",
"tower 0.5.2",
"tower-http",
"tracing",
"tracing-core",
"tracing-error",
"tracing-subscriber",
"transform-stream",
"url",
"uuid",
]
@@ -7304,9 +7305,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.25"
version = "0.23.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c"
checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0"
dependencies = [
"aws-lc-rs",
"log",
@@ -8582,19 +8583,6 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "tonic-reflection"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88fa815be858816dad226a49439ee90b7bcf81ab55bee72fdb217f1e6778c3ca"
dependencies = [
"prost",
"prost-types",
"tokio",
"tokio-stream",
"tonic 0.13.0",
]
[[package]]
name = "tower"
version = "0.4.13"

View File

@@ -36,6 +36,7 @@ madmin = { path = "./madmin" }
atoi = "2.0.0"
async-recursion = "1.0.5"
async-trait = "0.1.87"
atomic_enum = "0.3.0"
axum = "0.8.3"
axum-extra = "0.10.1"
axum-server = { version = "0.7.2", features = ["tls-rustls"] }
@@ -74,7 +75,6 @@ matchit = "0.8.4"
md-5 = "0.10.6"
mime = "0.3.17"
netif = "0.1.6"
once_cell = "1.21.1"
opentelemetry = { version = "0.29.1" }
opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { version = "0.29" }
@@ -98,7 +98,7 @@ rmp = "0.8.14"
rmp-serde = "1.3.0"
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
rust-embed = "8.6.0"
rustls = { version = "0.23" }
rustls = { version = "0.23.26" }
rustls-pki-types = "1.11.0"
rustls-pemfile = "2.2.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" }
@@ -122,7 +122,6 @@ time = { version = "0.3.41", features = [
tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.13.0", features = ["gzip"] }
tonic-build = "0.13.0"
tonic-reflection = "0.13.0"
tokio-rustls = { version = "0.26", default-features = false }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.13", features = ["io", "compat"] }

View File

@@ -1,5 +1,5 @@
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION};
use config::{Config, File, FileFormat};
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT};
use config::{Config, Environment, File, FileFormat};
use serde::Deserialize;
use std::env;
@@ -22,18 +22,44 @@ pub struct OtelConfig {
pub logger_level: Option<String>,
}
// 辅助函数:从环境变量中提取可观测性配置
fn extract_otel_config_from_env() -> OtelConfig {
OtelConfig {
endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()),
use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(USE_STDOUT)),
sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SAMPLE_RATIO)),
meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(METER_INTERVAL)),
service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_NAME.to_string())),
service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_VERSION.to_string())),
environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(ENVIRONMENT.to_string())),
logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(LOGGER_LEVEL.to_string())),
}
}
impl Default for OtelConfig {
fn default() -> Self {
OtelConfig {
endpoint: "".to_string(),
use_stdout: Some(true),
sample_ratio: Some(SAMPLE_RATIO),
meter_interval: Some(METER_INTERVAL),
service_name: Some(SERVICE_NAME.to_string()),
service_version: Some(SERVICE_VERSION.to_string()),
environment: Some(ENVIRONMENT.to_string()),
logger_level: Some(LOGGER_LEVEL.to_string()),
}
extract_otel_config_from_env()
}
}
@@ -69,14 +95,18 @@ pub struct FileSinkConfig {
impl FileSinkConfig {
pub fn get_default_log_path() -> String {
let temp_dir = env::temp_dir().join("rustfs").join("logs");
let temp_dir = env::temp_dir().join("rustfs");
if let Err(e) = std::fs::create_dir_all(&temp_dir) {
eprintln!("Failed to create log directory: {}", e);
return "logs/app.log".to_string();
return "rustfs/rustfs.log".to_string();
}
temp_dir.join("app.log").to_str().unwrap_or("logs/app.log").to_string()
temp_dir
.join("rustfs.log")
.to_str()
.unwrap_or("rustfs/rustfs.log")
.to_string()
}
}
@@ -84,7 +114,10 @@ impl Default for FileSinkConfig {
fn default() -> Self {
FileSinkConfig {
enabled: true,
path: Self::get_default_log_path(),
path: env::var("RUSTFS_SINKS_FILE_PATH")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(Self::get_default_log_path),
buffer_size: Some(8192),
flush_interval_ms: Some(1000),
flush_threshold: Some(100),
@@ -93,11 +126,21 @@ impl Default for FileSinkConfig {
}
/// Sink configuration collection
#[derive(Debug, Deserialize, Clone, Default)]
#[derive(Debug, Deserialize, Clone)]
pub struct SinkConfig {
pub kafka: KafkaSinkConfig,
pub webhook: WebhookSinkConfig,
pub file: FileSinkConfig,
pub kafka: Option<KafkaSinkConfig>,
pub webhook: Option<WebhookSinkConfig>,
pub file: Option<FileSinkConfig>,
}
impl Default for SinkConfig {
fn default() -> Self {
SinkConfig {
kafka: None,
webhook: None,
file: Some(FileSinkConfig::default()),
}
}
}
///Logger Configuration
@@ -109,7 +152,7 @@ pub struct LoggerConfig {
impl Default for LoggerConfig {
fn default() -> Self {
LoggerConfig {
queue_capacity: Some(1000),
queue_capacity: Some(10000),
}
}
}
@@ -128,11 +171,28 @@ impl Default for LoggerConfig {
///
/// let config = load_config(None);
/// ```
#[derive(Debug, Deserialize, Clone, Default)]
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub observability: OtelConfig,
pub sinks: SinkConfig,
pub logger: LoggerConfig,
pub logger: Option<LoggerConfig>,
}
// 为 AppConfig 实现 Default
impl AppConfig {
pub fn new() -> Self {
Self {
observability: OtelConfig::default(),
sinks: SinkConfig::default(),
logger: Some(LoggerConfig::default()),
}
}
}
impl Default for AppConfig {
fn default() -> Self {
Self::new()
}
}
const DEFAULT_CONFIG_FILE: &str = "obs";
@@ -184,12 +244,28 @@ pub fn load_config(config_dir: Option<String>) -> AppConfig {
// Log using proper logging instead of println when possible
println!("Using config file base: {}", config_dir);
let config = Config::builder()
let app_config = Config::builder()
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false))
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false))
.add_source(config::Environment::with_prefix(""))
.add_source(
Environment::default()
.prefix("RUSTFS")
.prefix_separator("__")
.separator("__")
.with_list_parse_key("volumes")
.try_parsing(true),
)
.build()
.unwrap_or_default();
config.try_deserialize().unwrap_or_default()
match app_config.try_deserialize::<AppConfig>() {
Ok(app_config) => {
println!("Parsed AppConfig: {:?}", app_config);
app_config
}
Err(e) => {
println!("Failed to deserialize config: {}", e);
AppConfig::default()
}
}
}

View File

@@ -21,7 +21,7 @@ impl Logger {
/// Returns Logger and corresponding Receiver
pub fn new(config: &AppConfig) -> (Self, Receiver<UnifiedLogEntry>) {
// Get queue capacity from configuration, or use default values 10000
let queue_capacity = config.logger.queue_capacity.unwrap_or(10000);
let queue_capacity = config.logger.as_ref().and_then(|l| l.queue_capacity).unwrap_or(10000);
let (sender, receiver) = mpsc::channel(queue_capacity);
(Logger { sender, queue_capacity }, receiver)
}

View File

@@ -4,7 +4,6 @@ use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::AsyncWriteExt;
use tracing::debug;
/// Sink Trait definition, asynchronously write logs
#[async_trait]
@@ -274,15 +273,15 @@ impl FileSink {
// if the file not exists, create it
if !file_exists {
tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?;
debug!("the file not exists,create if. path: {:?}", path)
tracing::debug!("File does not exist, creating it. Path: {:?}", path)
}
let file = if file_exists {
// If the file exists, open it in append mode
debug!("FileSink: File exists, opening in append mode.");
tracing::debug!("FileSink: File exists, opening in append mode.");
OpenOptions::new().append(true).create(true).open(&path).await?
} else {
// If the file does not exist, create it
debug!("FileSink: File does not exist, creating a new file.");
tracing::debug!("FileSink: File does not exist, creating a new file.");
// Create the file and write a header or initial content if needed
OpenOptions::new().create(true).truncate(true).write(true).open(&path).await?
};
@@ -414,52 +413,84 @@ pub async fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
let mut sinks: Vec<Arc<dyn Sink>> = Vec::new();
#[cfg(feature = "kafka")]
if config.sinks.kafka.enabled {
match rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &config.sinks.kafka.bootstrap_servers)
.set("message.timeout.ms", "5000")
.create()
{
Ok(producer) => {
sinks.push(Arc::new(KafkaSink::new(
producer,
config.sinks.kafka.topic.clone(),
config.sinks.kafka.batch_size.unwrap_or(100),
config.sinks.kafka.batch_timeout_ms.unwrap_or(1000),
)));
{
match &config.sinks.kafka {
Some(sink_kafka) => {
if sink_kafka.enabled {
match rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &sink_kafka.bootstrap_servers)
.set("message.timeout.ms", "5000")
.create()
{
Ok(producer) => {
sinks.push(Arc::new(KafkaSink::new(
producer,
sink_kafka.topic.clone(),
sink_kafka.batch_size.unwrap_or(100),
sink_kafka.batch_timeout_ms.unwrap_or(1000),
)));
}
Err(e) => {
tracing::error!("Failed to create Kafka producer: {}", e);
}
}
} else {
tracing::info!("Kafka sink is disabled in the configuration");
}
}
_ => {
tracing::info!("Kafka sink is not configured or disabled");
}
Err(e) => eprintln!("Failed to create Kafka producer: {}", e),
}
}
#[cfg(feature = "webhook")]
if config.sinks.webhook.enabled {
sinks.push(Arc::new(WebhookSink::new(
config.sinks.webhook.endpoint.clone(),
config.sinks.webhook.auth_token.clone(),
config.sinks.webhook.max_retries.unwrap_or(3),
config.sinks.webhook.retry_delay_ms.unwrap_or(100),
)));
{
match &config.sinks.webhook {
Some(sink_webhook) => {
if sink_webhook.enabled {
sinks.push(Arc::new(WebhookSink::new(
sink_webhook.endpoint.clone(),
sink_webhook.auth_token.clone(),
sink_webhook.max_retries.unwrap_or(3),
sink_webhook.retry_delay_ms.unwrap_or(100),
)));
} else {
tracing::info!("Webhook sink is disabled in the configuration");
}
}
_ => {
tracing::info!("Webhook sink is not configured or disabled");
}
}
}
#[cfg(feature = "file")]
{
let path = if config.sinks.file.enabled {
config.sinks.file.path.clone()
} else {
"default.log".to_string()
};
debug!("FileSink: Using path: {}", path);
sinks.push(Arc::new(
FileSink::new(
path.clone(),
config.sinks.file.buffer_size.unwrap_or(8192),
config.sinks.file.flush_interval_ms.unwrap_or(1000),
config.sinks.file.flush_threshold.unwrap_or(100),
)
.await
.unwrap(),
));
// let config = config.clone();
match &config.sinks.file {
Some(sink_file) => {
tracing::info!("File sink is enabled in the configuration");
let path = if sink_file.enabled {
sink_file.path.clone()
} else {
"rustfs.log".to_string()
};
tracing::debug!("FileSink: Using path: {}", path);
sinks.push(Arc::new(
FileSink::new(
path.clone(),
sink_file.buffer_size.unwrap_or(8192),
sink_file.flush_interval_ms.unwrap_or(1000),
sink_file.flush_threshold.unwrap_or(100),
)
.await
.unwrap(),
));
}
_ => {
tracing::info!("File sink is not configured or disabled");
}
}
}
sinks

View File

@@ -293,7 +293,10 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
registry.with(ErrorLayer::default()).with(fmt_layer).init();
if !config.endpoint.is_empty() {
info!("OpenTelemetry telemetry initialized with OTLP endpoint: {}", config.endpoint);
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}",
config.endpoint, logger_level
);
}
OtelGuard {

35
deploy/README.md Normal file
View File

@@ -0,0 +1,35 @@
# RustFS Deploy
This directory contains the deployment scripts and configurations for the project.
The deployment process is divided into two main parts: the RustFS binary and the RustFS console. The RustFS binary is
responsible for the core functionality of the system, while the RustFS console provides a web-based interface for
managing and monitoring the system.
# Directory Structure
```text
|--data // data directory
| |--vol1 // volume 1 not created
| |--vol2 // volume 2 not created
| |--vol3 // volume 3 not created
| |--vol4 // volume 4 not created
| |--README.md // data directory readme
|--logs // log directory
| |--rustfs.log // RustFS log
| |--README.md // logs directory readme
|--build
| |--rustfs.run.md // deployment script for RustFS
| |--rustfs.run-zh.md // deployment script for RustFS in Chinese
| |--rustfs.service // systemd service file
| |--rustfs-zh.service.md // systemd service file in Chinese
|--certs
| |--README.md // certs readme
| |--rustfs_tls_cert.pem // API cert.pem
| |--rustfs_tls_key.pem // API key.pem
| |--rustfs_console_tls_cert.pem // console cert.pem
| |--rustfs_console_tls_key.pem // console key.pem
|--config
| |--obs.example.yaml // example config
| |--rustfs.env // env config
| |--rustfs-zh.env // env config in Chinese
```

View File

@@ -24,20 +24,15 @@ User=rustfs
Group=rustfs
# 以 rustfs 组身份运行服务,与 User 配合使用。
# environment variable configuration
# 定义环境变量配置,用于传递给服务程序。
Environment=RUST_LOG=info
# 设置日志级别为 info控制服务日志输出需服务支持此变量
Environment=RUSTFS_ACCESS_KEY=minioadmin
# 设置访问密钥为 minioadmin用于 RustFS 的认证。
Environment=RUSTFS_SECRET_KEY=minioadmin
# 设置秘密密钥为 minioadmin与访问密钥配套使用。
# working directory
WorkingDirectory=/opt/rustfs
# 设置服务的工作目录为 /opt/rustfs影响相对路径的解析。
# main program
# 定义环境变量配置,用于传递给服务程序。
Environment=RUSTFS_ACCESS_KEY=minioadmin
# 设置访问密钥为 minioadmin用于 RustFS 的认证。
Environment=RUSTFS_SECRET_KEY=minioadmin
# 设置秘密密钥为 minioadmin与访问密钥配套使用。
ExecStart=/usr/local/bin/rustfs \
--address 0.0.0.0:9000 \
--volumes /data/rustfs/vol1,/data/rustfs/vol2 \
@@ -51,6 +46,10 @@ ExecStart=/usr/local/bin/rustfs \
# --console-enable启用控制台功能。
# --console-address 0.0.0.0:9002控制台监听所有接口的 9002 端口。
# 定义环境变量配置,用于传递给服务程序,推荐使用且简洁
EnvironmentFile=-/etc/default/rustfs
ExecStart=/usr/local/bin/rustfs $RUSTFS_VOLUMES $RUSTFS_OPTS
# resource constraints
LimitNOFILE=1048576
# 设置文件描述符上限为 1048576支持高并发连接。

View File

@@ -13,15 +13,12 @@ NotifyAccess=main
User=rustfs
Group=rustfs
# environment variable configuration
Environment=RUST_LOG=info
Environment=RUSTFS_ACCESS_KEY=minioadmin
Environment=RUSTFS_SECRET_KEY=minioadmin
# working directory
WorkingDirectory=/opt/rustfs
# main program
# environment variable configuration and main program (Option 1: Directly specify arguments)
Environment=RUSTFS_ACCESS_KEY=minioadmin
Environment=RUSTFS_SECRET_KEY=minioadmin
ExecStart=/usr/local/bin/rustfs \
--address 0.0.0.0:9000 \
--volumes /data/rustfs/vol1,/data/rustfs/vol2 \
@@ -29,6 +26,10 @@ ExecStart=/usr/local/bin/rustfs \
--console-enable \
--console-address 0.0.0.0:9002
# environment variable configuration (Option 2: Use environment variables)
EnvironmentFile=-/etc/default/rustfs
ExecStart=/usr/local/bin/rustfs $RUSTFS_VOLUMES $RUSTFS_OPTS
# resource constraints
LimitNOFILE=1048576
LimitNPROC=32768

View File

@@ -0,0 +1,27 @@
OBSERVABILITY__ENDPOINT=http://localhost:4317
OBSERVABILITY__USE_STDOUT=true
OBSERVABILITY__SAMPLE_RATIO=2.0
OBSERVABILITY__METER_INTERVAL=30
OBSERVABILITY__SERVICE_NAME=rustfs
OBSERVABILITY__SERVICE_VERSION=0.1.0
OBSERVABILITY__ENVIRONMENT=develop
OBSERVABILITY__LOGGER_LEVEL=debug
SINKS__KAFKA__ENABLED=false
SINKS__KAFKA__BOOTSTRAP_SERVERS=localhost:9092
SINKS__KAFKA__TOPIC=logs
SINKS__KAFKA__BATCH_SIZE=100
SINKS__KAFKA__BATCH_TIMEOUT_MS=1000
SINKS__WEBHOOK__ENABLED=false
SINKS__WEBHOOK__ENDPOINT=http://localhost:8080/webhook
SINKS__WEBHOOK__AUTH_TOKEN=
SINKS__WEBHOOK__BATCH_SIZE=100
SINKS__WEBHOOK__BATCH_TIMEOUT_MS=1000
SINKS__FILE__ENABLED=true
SINKS__FILE__PATH=./deploy/logs/app.log
SINKS__FILE__BATCH_SIZE=10
SINKS__FILE__BATCH_TIMEOUT_MS=1000
LOGGER__QUEUE_CAPACITY=10

View File

@@ -0,0 +1,33 @@
[observability]
endpoint = "http://localhost:4317" # 可观测性数据上报的终端地址,默认为"http://localhost:4317"
use_stdout = true # 是否将日志输出到标准输出
sample_ratio = 2.0 # 采样率,表示每 2 条数据采样 1 条
meter_interval = 30 # 指标收集间隔,单位为秒
service_name = "rustfs" # 服务名称,用于标识当前服务
service_version = "0.1.0" # 服务版本号
environments = "develop" # 运行环境,如开发环境 (develop)
logger_level = "debug" # 日志级别,可选 debug/info/warn/error 等
[sinks]
[sinks.kafka] # Kafka 接收器配置
enabled = false # 是否启用 Kafka 接收器,默认禁用
bootstrap_servers = "localhost:9092" # Kafka 服务器地址
topic = "logs" # Kafka 主题名称
batch_size = 100 # 批处理大小,每次发送的消息数量
batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
[sinks.webhook] # Webhook 接收器配置
enabled = false # 是否启用 Webhook 接收器
endpoint = "http://localhost:8080/webhook" # Webhook 接收地址
auth_token = "" # 认证令牌
batch_size = 100 # 批处理大小
batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
[sinks.file] # 文件接收器配置
enabled = true # 是否启用文件接收器
path = "/Users/qun/Documents/rust/rustfs/s3-rustfs/logs/app.log" # 日志文件路径
batch_size = 10 # 批处理大小
batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
[logger] # 日志器配置
queue_capacity = 10 # 日志队列容量,表示可以缓存的日志条数

View File

@@ -25,7 +25,7 @@ batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "logs/app.log"
path = "./deploy/logs/app.log"
batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified

View File

@@ -0,0 +1,26 @@
# RustFS 管理员用户名
RUSTFS_ROOT_USER=rustfsadmin
# RustFS 管理员密码
RUSTFS_ROOT_PASSWORD=rustfsadmin
# 数据卷配置示例路径deploy/data/rustfs.env
# RustFS 数据卷存储路径支持多卷配置vol1 到 vol4
RUSTFS_VOLUMES="./deploy/deploy/vol{1...4}"
# RustFS 服务启动参数,指定监听地址和端口
RUSTFS_OPTS="--address 0.0.0.0:9000"
# RustFS 服务监听地址和端口
RUSTFS_ADDRESS="0.0.0.0:9000"
# 是否启用 RustFS 控制台功能
RUSTFS_CONSOLE_ENABLE=true
# RustFS 控制台监听地址和端口
RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002"
# RustFS 服务端点地址,用于客户端访问
RUSTFS_SERVER_ENDPOINT="http://127.0.0.1:9000"
# RustFS 服务域名配置
RUSTFS_SERVER_DOMAINS=127.0.0.1:9002
# RustFS 许可证内容
RUSTFS_LICENSE="license content"
# 可观测性配置文件路径deploy/config/obs.example.toml
RUSTFS_OBS_CONFIG=/etc/default/obs.toml
# TLS 证书目录路径deploy/certs
RUSTFS_TLS_PATH=/etc/default/tls

26
deploy/config/rustfs.env Normal file
View File

@@ -0,0 +1,26 @@
# RustFS administrator username
RUSTFS_ROOT_USER=rustfsadmin
# RustFS administrator password
RUSTFS_ROOT_PASSWORD=rustfsadmin
# Data volume configuration example path: deploy/data/rustfs.env
# RustFS data volume storage paths, supports multiple volumes from vol1 to vol4
RUSTFS_VOLUMES="./deploy/deploy/vol{1...4}"
# RustFS service startup parameters, specifying listen address and port
RUSTFS_OPTS="--address 0.0.0.0:9000"
# RustFS service listen address and port
RUSTFS_ADDRESS="0.0.0.0:9000"
# Enable RustFS console functionality
RUSTFS_CONSOLE_ENABLE=true
# RustFS console listen address and port
RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002"
# RustFS service endpoint for client access
RUSTFS_SERVER_ENDPOINT="http://127.0.0.1:9000"
# RustFS service domain configuration
RUSTFS_SERVER_DOMAINS=127.0.0.1:9002
# RustFS license content
RUSTFS_LICENSE="license content"
# Observability configuration file path: deploy/config/obs.example.toml
RUSTFS_OBS_CONFIG=/etc/default/obs.toml
# TLS certificates directory path: deploy/certs
RUSTFS_TLS_PATH=/etc/default/tls

1
deploy/data/README.md Normal file
View File

@@ -0,0 +1 @@
# Data Volumes

1
deploy/logs/README.md Normal file
View File

@@ -0,0 +1 @@
# RustFS Logs

View File

@@ -19,6 +19,7 @@ madmin.workspace = true
api = { path = "../s3select/api" }
appauth = { version = "0.0.1", path = "../appauth" }
atoi = { workspace = true }
atomic_enum = { workspace = true }
axum.workspace = true
axum-extra = { workspace = true }
axum-server = { workspace = true }
@@ -26,7 +27,6 @@ async-trait.workspace = true
bytes.workspace = true
chrono = { workspace = true }
clap.workspace = true
csv = "1.3.1"
crypto = { path = "../crypto" }
datafusion = { workspace = true }
common.workspace = true
@@ -36,26 +36,18 @@ policy.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
#h2 = "0.4.7"
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
lock.workspace = true
local-ip-address = { workspace = true }
matchit = { workspace = true }
mime.workspace = true
mime_guess = "2.0.5"
netif.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
prost.workspace = true
prost-types.workspace = true
protos.workspace = true
protobuf.workspace = true
query = { path = "../s3select/query" }
rmp-serde.workspace = true
rustfs-obs = { workspace = true }
@@ -81,18 +73,13 @@ tokio-rustls.workspace = true
lazy_static.workspace = true
tokio-stream.workspace = true
tonic = { workspace = true }
tonic-reflection.workspace = true
tower.workspace = true
tracing-core = { workspace = true }
tracing-error.workspace = true
tracing-subscriber.workspace = true
transform-stream.workspace = true
tower-http.workspace = true
url.workspace = true
uuid = "1.15.1"
[target.'cfg(target_os = "linux")'.dependencies]
libsystemd = "0.7"
libsystemd.workspace = true
[build-dependencies]
prost-build.workspace = true
@@ -105,7 +92,6 @@ futures-util.workspace = true
ecstore = { path = "../ecstore" }
s3s.workspace = true
clap = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "time"] }
hyper-util = { workspace = true, features = [
"tokio",
"server-auto",

30
rustfs/README.md Normal file
View File

@@ -0,0 +1,30 @@
rustfs/
├── Cargo.toml
├── src/
│ ├── main.rs # 主入口
│ ├── admin/
│ │ └── mod.rs # 管理接口
│ ├── auth/
│ │ └── mod.rs # 认证模块
│ ├── config/
│ │ ├── mod.rs # 配置模块
│ │ └── options.rs # 命令行参数
│ ├── console/
│ │ ├── mod.rs # 控制台模块
│ │ └── server.rs # 控制台服务器
│ ├── grpc/
│ │ └── mod.rs # gRPC 服务
│ ├── license/
│ │ └── mod.rs # 许可证管理
│ ├── logging/
│ │ └── mod.rs # 日志管理
│ ├── server/
│ │ ├── mod.rs # 服务器实现
│ │ ├── connection.rs # 连接处理
│ │ ├── service.rs # 服务实现
│ │ └── state.rs # 状态管理
│ ├── storage/
│ │ ├── mod.rs # 存储模块
│ │ └── fs.rs # 文件系统实现
│ └── utils/
│ └── mod.rs # 工具函数

View File

@@ -5,14 +5,15 @@ mod console;
mod grpc;
pub mod license;
mod logging;
mod server;
mod service;
mod storage;
mod utils;
use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
use crate::utils::error;
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use crate::utils::error;
use chrono::Datelike;
use clap::Parser;
use common::{
@@ -31,6 +32,7 @@ use ecstore::{
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
use grpc::make_server;
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
@@ -43,60 +45,14 @@ use rustfs_obs::{init_obs, load_config, set_global_guard, InitLogStatus};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{io::IsTerminal, net::SocketAddr};
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tracing::{debug, error, info, info_span, warn};
use tracing_error::ErrorLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[cfg(target_os = "linux")]
fn notify_systemd(state: &str) {
use libsystemd::daemon::{notify, NotifyState};
let notify_state = match state {
"ready" => NotifyState::Ready,
"stopping" => NotifyState::Stopping,
_ => {
warn!("Unsupported state passed to notify_systemd: {}", state);
return;
}
};
if let Err(e) = notify(false, &[notify_state]) {
error!("Failed to notify systemd: {}", e);
} else {
debug!("Successfully notified systemd: {}", state);
}
info!("Systemd notifications are enabled on linux (state: {})", state);
}
#[cfg(not(target_os = "linux"))]
fn notify_systemd(state: &str) {
info!("Systemd notifications are not available on this platform not linux (state: {})", state);
}
#[allow(dead_code)]
fn setup_tracing() {
use tracing_subscriber::EnvFilter;
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let enable_color = std::io::stdout().is_terminal();
let subscriber = tracing_subscriber::fmt::fmt()
.pretty()
.with_env_filter(env_filter)
.with_ansi(enable_color)
.with_file(true)
.with_line_number(true)
.finish()
.with(ErrorLayer::default());
subscriber.try_init().expect("failed to set global default subscriber");
}
fn check_auth(req: Request<()>) -> Result<Request<()>, Status> {
let token: MetadataValue<_> = "rustfs rpc".parse().unwrap();
@@ -286,8 +242,14 @@ async fn run(opt: config::Opt) -> Result<()> {
None
};
// Create an oneshot channel to wait for the service to start
let (tx, rx) = tokio::sync::oneshot::channel();
let state_manager = ServiceStateManager::new();
let worker_state_manager = state_manager.clone();
// 更新服务状态为启动中
state_manager.update(ServiceState::Starting);
// Create shutdown channel
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1);
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
// 错误处理改进
@@ -316,11 +278,11 @@ async fn run(opt: config::Opt) -> Result<()> {
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let graceful = GracefulShutdown::new();
debug!("graceful initiated");
// Send a message to the main thread to indicate that the server has started
let _ = tx.send(());
// 服务准备就绪
worker_state_manager.update(ServiceState::Ready);
loop {
debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs);
@@ -336,17 +298,23 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
_ = ctrl_c.as_mut() => {
drop(listener);
eprintln!("Ctrl-C received, starting shutdown");
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
_ = sigint_inner.recv() => {
info!("SIGINT received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
_ = sigterm_inner.recv() => {
info!("SIGTERM received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
}
};
@@ -390,7 +358,7 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("Http handshake success");
}
}
worker_state_manager.update(ServiceState::Stopping);
tokio::select! {
() = graceful.shutdown() => {
debug!("Gracefully shutdown!");
@@ -399,6 +367,7 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
worker_state_manager.update(ServiceState::Stopped);
});
// init store
@@ -448,97 +417,24 @@ async fn run(opt: config::Opt) -> Result<()> {
});
}
// 执行休眠 1 秒钟
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Wait for the HTTP service to finish starting
if rx.await.is_ok() {
notify_systemd("ready");
} else {
info!("Failed to start the server");
}
// 主线程中监听信号
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;
tokio::select! {
_ = tokio::signal::ctrl_c() => {
eprintln!("Ctrl-C received, starting shutdown");
notify_systemd("stopping");
}
_ = sigint.recv() => {
info!("SIGINT received, starting shutdown");
notify_systemd("stopping");
}
_ = sigterm.recv() => {
info!("SIGTERM received, starting shutdown");
notify_systemd("stopping");
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal
match wait_for_shutdown().await {
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
info!("Shutdown signal received in main thread");
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
info!("Server is stopping...");
let _ = shutdown_tx.send(());
// Wait for the worker thread to complete the cleaning work
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!("Server stopped current ");
}
}
info!("server is stopped");
info!("server is stopped state: {:?}", state_manager.current_state());
Ok(())
}
// #[allow(dead_code)]
// #[derive(Debug)]
// enum ShutdownSignal {
// CtrlC,
// Sigterm,
// Sigint,
// }
// #[allow(dead_code)]
// async fn wait_for_shutdown() -> ShutdownSignal {
// let mut sigterm = signal(SignalKind::terminate()).unwrap();
// let mut sigint = signal(SignalKind::interrupt()).unwrap();
//
// tokio::select! {
// _ = tokio::signal::ctrl_c() => {
// info!("Received Ctrl-C signal");
// ShutdownSignal::CtrlC
// }
// _ = sigint.recv() => {
// info!("Received SIGINT signal");
// ShutdownSignal::Sigint
// }
// _ = sigterm.recv() => {
// info!("Received SIGTERM signal");
// ShutdownSignal::Sigterm
// }
// }
// }
// #[allow(dead_code)]
// #[derive(Debug)]
// enum ServiceState {
// Starting,
// Ready,
// Stopping,
// Stopped,
// }
// #[allow(dead_code)]
// fn notify_service_state(state: ServiceState) {
// match state {
// ServiceState::Starting => {
// info!("Service is starting...");
// #[cfg(target_os = "linux")]
// if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...")]) {
// error!("Failed to notify systemd of starting state: {}", e);
// }
// }
// ServiceState::Ready => {
// info!("Service is ready");
// notify_systemd("ready");
// }
// ServiceState::Stopping => {
// info!("Service is stopping...");
// notify_systemd("stopping");
// }
// ServiceState::Stopped => {
// info!("Service has stopped");
// #[cfg(target_os = "linux")]
// if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped")]) {
// error!("Failed to notify systemd of stopped state: {}", e);
// }
// }
// }
// }

6
rustfs/src/server/mod.rs Normal file
View File

@@ -0,0 +1,6 @@
mod service_state;
pub(crate) use service_state::wait_for_shutdown;
pub(crate) use service_state::ServiceState;
pub(crate) use service_state::ServiceStateManager;
pub(crate) use service_state::ShutdownSignal;
pub(crate) use service_state::SHUTDOWN_TIMEOUT;

View File

@@ -0,0 +1,152 @@
use atomic_enum::atomic_enum;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal::unix::{signal, SignalKind};
use tracing::info;
// a configurable shutdown timeout
pub(crate) const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);
#[cfg(target_os = "linux")]
fn notify_systemd(state: &str) {
use libsystemd::daemon::{notify, NotifyState};
use tracing::{debug, error};
let notify_state = match state {
"ready" => NotifyState::Ready,
"stopping" => NotifyState::Stopping,
_ => {
info!("Unsupported state passed to notify_systemd: {}", state);
return;
}
};
if let Err(e) = notify(false, &[notify_state]) {
error!("Failed to notify systemd: {}", e);
} else {
debug!("Successfully notified systemd: {}", state);
}
info!("Systemd notifications are enabled on linux (state: {})", state);
}
#[cfg(not(target_os = "linux"))]
fn notify_systemd(state: &str) {
info!("Systemd notifications are not available on this platform not linux (state: {})", state);
}
#[derive(Debug)]
pub enum ShutdownSignal {
CtrlC,
Sigterm,
Sigint,
}
#[atomic_enum]
#[derive(PartialEq)]
pub(crate) enum ServiceState {
Starting,
Ready,
Stopping,
Stopped,
}
pub(crate) async fn wait_for_shutdown() -> ShutdownSignal {
let mut sigterm = signal(SignalKind::terminate()).expect("failed to create SIGTERM signal handler");
let mut sigint = signal(SignalKind::interrupt()).expect("failed to create SIGINT signal handler");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received Ctrl-C signal");
ShutdownSignal::CtrlC
}
_ = sigint.recv() => {
info!("Received SIGINT signal");
ShutdownSignal::Sigint
}
_ = sigterm.recv() => {
info!("Received SIGTERM signal");
ShutdownSignal::Sigterm
}
}
}
#[derive(Clone)]
pub(crate) struct ServiceStateManager {
state: Arc<AtomicServiceState>,
}
impl ServiceStateManager {
pub fn new() -> Self {
Self {
state: Arc::new(AtomicServiceState::new(ServiceState::Starting)),
}
}
pub fn update(&self, new_state: ServiceState) {
self.state.store(new_state, Ordering::SeqCst);
self.notify_systemd(&new_state);
}
pub fn current_state(&self) -> ServiceState {
self.state.load(Ordering::SeqCst)
}
fn notify_systemd(&self, state: &ServiceState) {
match state {
ServiceState::Starting => {
info!("Service is starting...");
#[cfg(target_os = "linux")]
if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...")]) {
tracing::error!("Failed to notify systemd of starting state: {}", e);
}
}
ServiceState::Ready => {
info!("Service is ready");
notify_systemd("ready");
}
ServiceState::Stopping => {
info!("Service is stopping...");
notify_systemd("stopping");
}
ServiceState::Stopped => {
info!("Service has stopped");
#[cfg(target_os = "linux")]
if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped")]) {
tracing::error!("Failed to notify systemd of stopped state: {}", e);
}
}
}
}
}
impl Default for ServiceStateManager {
fn default() -> Self {
Self::new()
}
}
// 使用示例
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_service_state_manager() {
let manager = ServiceStateManager::new();
// 初始状态应该是 Starting
assert_eq!(manager.current_state(), ServiceState::Starting);
// 更新状态到 Ready
manager.update(ServiceState::Ready);
assert_eq!(manager.current_state(), ServiceState::Ready);
// 更新状态到 Stopping
manager.update(ServiceState::Stopping);
assert_eq!(manager.current_state(), ServiceState::Stopping);
// 更新状态到 Stopped
manager.update(ServiceState::Stopped);
assert_eq!(manager.current_state(), ServiceState::Stopped);
}
}

View File

@@ -33,8 +33,27 @@ export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002"
# export RUSTFS_SERVER_DOMAINS="localhost:9000"
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考
export RUSTFS_OBS_CONFIG="./config/obs.example.toml"
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一
export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"
# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值
export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:43178
export RUSTFS__OBSERVABILITY__USE_STDOUT=true
export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
export RUSTFS__OBSERVABILITY__METER_INTERVAL=30
export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info
export RUSTFS__SINKS__FILE__ENABLED=true
export RUSTFS__SINKS__FILE__PATH="./deploy/logs/app.log"
export RUSTFS__SINKS__WEBHOOK__ENABLED=false
export RUSTFS__SINKS__WEBHOOK__ENDPOINT=""
export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN=""
export RUSTFS__SINKS__KAFKA__ENABLED=false
export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS=""
export RUSTFS__SINKS__KAFKA__TOPIC=""
export RUSTFS__LOGGER__QUEUE_CAPACITY=10
if [ -n "$1" ]; then
export RUSTFS_VOLUMES="$1"