Feature/rustfs config (#396)

* init rustfs config

* improve code for rustfs-config crate

* add

* improve code for comment

* fix: modify rustfs-config crate name

* add default fn

* improve error logger

* fix: modify docker config yaml

* improve code for config

* feat: restrict kafka feature to Linux only

- Add target-specific feature configuration in Cargo.toml for obs and event-notifier crates
- Implement conditional compilation for kafka feature only on Linux systems
- Add appropriate error handling for non-Linux platforms
- Ensure backward compatibility with existing code

* refactor(ci): optimize build workflow for better efficiency

- Integrate GUI build steps into main build-rustfs job
- Add conditional GUI build execution based on tag releases
- Simplify workflow by removing redundant build-rustfs-gui job
- Copy binary directly to embedded-rustfs directory without downloading artifacts
- Update merge job dependency to only rely on build-rustfs
- Improve cross-platform compatibility for Windows binary naming (.exe)
- Streamline artifact uploading and OSS publishing process
- Maintain consistent conditional logic for release operations

* refactor(ci): optimize build workflow for better efficiency

- Integrate GUI build steps into main build-rustfs job
- Add conditional GUI build execution based on tag releases
- Simplify workflow by removing redundant build-rustfs-gui job
- Copy binary directly to embedded-rustfs directory without downloading artifacts
- Update merge job dependency to only rely on build-rustfs
- Improve cross-platform compatibility for Windows binary naming (.exe)
- Streamline artifact uploading and OSS publishing process
- Maintain consistent conditional logic for release operations

* fix(ci): add repo-token to setup-protoc action for authentication

- Add GITHUB_TOKEN parameter to arduino/setup-protoc@v3 action
- Ensure proper authentication for Protoc installation in CI workflow
- Maintain consistent setup across different CI environments

* modify config

* improve readme.md

* remove env config relation

* add allow(dead_code)
This commit is contained in:
houseme
2025-05-12 01:17:31 +08:00
committed by GitHub
parent ab54ff49eb
commit 4ac4b35c5e
64 changed files with 1283 additions and 1008 deletions

View File

@@ -9,26 +9,26 @@ environments = "production"
logger_level = "debug"
local_logging_enabled = true
[sinks]
[sinks.kafka] # Kafka sink is disabled by default
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
#[[sinks]]
#type = "Kafka"
#brokers = "localhost:9092"
#topic = "logs"
#batch_size = 100 # Default is 100 if not specified
#batch_timeout_ms = 1000 # Default is 1000ms if not specified
#
#[[sinks]]
#type = "Webhook"
#endpoint = "http://localhost:8080/webhook"
#auth_token = ""
#batch_size = 100 # Default is 3 if not specified
#batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.webhook]
enabled = false
endpoint = "http://localhost:8080/webhook"
auth_token = ""
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "/root/data/logs/app.log"
batch_size = 10
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[[sinks]]
type = "File"
path = "/root/data/logs/rustfs.log"
buffer_size = 100 # Default is 8192 bytes if not specified
flush_interval_ms = 1000
flush_threshold = 100
[logger]
queue_capacity = 10

View File

@@ -9,26 +9,26 @@ environments = "production"
logger_level = "debug"
local_logging_enabled = true
[sinks]
[sinks.kafka] # Kafka sink is disabled by default
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
#[[sinks]]
#type = "Kafka"
#brokers = "localhost:9092"
#topic = "logs"
#batch_size = 100 # Default is 100 if not specified
#batch_timeout_ms = 1000 # Default is 1000ms if not specified
#
#[[sinks]]
#type = "Webhook"
#endpoint = "http://localhost:8080/webhook"
#auth_token = ""
#batch_size = 100 # Default is 3 if not specified
#batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.webhook]
enabled = false
endpoint = "http://localhost:8080/webhook"
auth_token = ""
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "/root/data/logs/app.log"
batch_size = 10
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[[sinks]]
type = "File"
path = "/root/data/logs/rustfs.log"
buffer_size = 100 # Default is 8192 bytes if not specified
flush_interval_ms = 1000
flush_threshold = 100
[logger]
queue_capacity = 10

3
.gitignore vendored
View File

@@ -12,4 +12,5 @@ cli/rustfs-gui/embedded-rustfs/rustfs
deploy/config/obs.toml
*.log
deploy/certs/*
*jsonl
*jsonl
.env

26
Cargo.lock generated
View File

@@ -1746,7 +1746,7 @@ dependencies = [
"pbkdf2",
"rand 0.8.5",
"serde_json",
"sha2 0.10.8",
"sha2 0.10.9",
"test-case",
"thiserror 2.0.12",
"time",
@@ -2141,7 +2141,7 @@ dependencies = [
"md-5",
"rand 0.8.5",
"regex",
"sha2 0.10.8",
"sha2 0.10.9",
"unicode-segmentation",
"uuid",
]
@@ -3020,6 +3020,12 @@ dependencies = [
"const-random",
]
[[package]]
name = "dotenvy"
version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "dpi"
version = "0.1.1"
@@ -4807,7 +4813,7 @@ dependencies = [
"nom 8.0.0",
"once_cell",
"serde",
"sha2 0.10.8",
"sha2 0.10.9",
"thiserror 2.0.12",
"uuid",
]
@@ -6120,7 +6126,7 @@ checksum = "7f9f832470494906d1fca5329f8ab5791cc60beb230c74815dff541cbd2b5ca0"
dependencies = [
"once_cell",
"pest",
"sha2 0.10.8",
"sha2 0.10.9",
]
[[package]]
@@ -7209,7 +7215,7 @@ version = "8.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d55b95147fe01265d06b3955db798bdaed52e60e2211c41137701b3aba8e21"
dependencies = [
"sha2 0.10.8",
"sha2 0.10.9",
"walkdir",
]
@@ -7340,6 +7346,7 @@ dependencies = [
"async-trait",
"axum",
"config",
"dotenvy",
"http",
"rdkafka",
"reqwest",
@@ -7371,7 +7378,7 @@ dependencies = [
"rust-embed",
"serde",
"serde_json",
"sha2 0.10.8",
"sha2 0.10.9",
"tokio",
"tracing-appender",
"tracing-subscriber",
@@ -7394,6 +7401,7 @@ dependencies = [
"opentelemetry_sdk",
"rdkafka",
"reqwest",
"rustfs-config",
"serde",
"serde_json",
"smallvec",
@@ -7919,9 +7927,9 @@ dependencies = [
[[package]]
name = "sha2"
version = "0.10.8"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
@@ -10218,7 +10226,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"raw-window-handle 0.6.2",
"sha2 0.10.8",
"sha2 0.10.9",
"soup3",
"tao-macros",
"thiserror 1.0.69",

View File

@@ -157,7 +157,7 @@ serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_urlencoded = "0.7.1"
serde_with = "3.12.0"
sha2 = "0.10.8"
sha2 = "0.10.9"
smallvec = { version = "1.15.0", features = ["serde"] }
snafu = "0.8.5"
socket2 = "0.5.9"

View File

@@ -59,30 +59,12 @@ export RUSTFS_ADDRESS="0.0.0.0:9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001"
# Observability config (option 1: config file)
# Observability config
export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
# Observability config (option 2: environment variables)
export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
export RUSTFS__OBSERVABILITY__USE_STDOUT=true
export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
export RUSTFS__OBSERVABILITY__METER_INTERVAL=30
export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info
export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true
# Event message configuration
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml"
# Logging sinks
export RUSTFS__SINKS__FILE__ENABLED=true
export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log"
export RUSTFS__SINKS__WEBHOOK__ENABLED=false
export RUSTFS__SINKS__WEBHOOK__ENDPOINT=""
export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN=""
export RUSTFS__SINKS__KAFKA__ENABLED=false
export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS=""
export RUSTFS__SINKS__KAFKA__TOPIC=""
export RUSTFS__LOGGER__QUEUE_CAPACITY=10
```
#### Start the service

View File

@@ -59,30 +59,11 @@ export RUSTFS_ADDRESS="0.0.0.0:9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001"
# 可观测性配置(方式一:配置文件)
# 可观测性配置
export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
# 可观测性配置(方式二:环境变量)
export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
export RUSTFS__OBSERVABILITY__USE_STDOUT=true
export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
export RUSTFS__OBSERVABILITY__METER_INTERVAL=30
export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info
export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true
# 日志接收器
export RUSTFS__SINKS__FILE__ENABLED=true
export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log"
export RUSTFS__SINKS__WEBHOOK__ENABLED=false
export RUSTFS__SINKS__WEBHOOK__ENDPOINT=""
export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN=""
export RUSTFS__SINKS__KAFKA__ENABLED=false
export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS=""
export RUSTFS__SINKS__KAFKA__TOPIC=""
export RUSTFS__LOGGER__QUEUE_CAPACITY=10
# 事件消息配置
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml"
```
#### 启动服务

View File

@@ -1,17 +1,17 @@
use crate::event::config::EventConfig;
use crate::event::config::NotifierConfig;
use crate::ObservabilityConfig;
/// RustFs configuration
pub struct RustFsConfig {
pub observability: ObservabilityConfig,
pub event: EventConfig,
pub event: NotifierConfig,
}
impl RustFsConfig {
pub fn new() -> Self {
Self {
observability: ObservabilityConfig::new(),
event: EventConfig::new(),
event: NotifierConfig::new(),
}
}
}

View File

@@ -14,6 +14,25 @@ pub const VERSION: &str = "0.0.1";
/// Environment variable: RUSTFS_LOG_LEVEL
pub const DEFAULT_LOG_LEVEL: &str = "info";
/// Default configuration use stdout
/// Default value: true
pub(crate) const USE_STDOUT: bool = true;
/// Default configuration sample ratio
/// Default value: 1.0
pub(crate) const SAMPLE_RATIO: f64 = 1.0;
/// Default configuration meter interval
/// Default value: 30
pub(crate) const METER_INTERVAL: u64 = 30;
/// Default configuration service version
/// Default value: 0.0.1
pub(crate) const SERVICE_VERSION: &str = "0.0.1";
/// Default configuration environment
/// Default value: production
pub(crate) const ENVIRONMENT: &str = "production";
/// maximum number of connections
/// This is the maximum number of connections that the server will accept.
/// This is used to limit the number of connections to the server.

View File

@@ -0,0 +1,27 @@
use crate::event::kafka::KafkaAdapter;
use crate::event::mqtt::MqttAdapter;
use crate::event::webhook::WebhookAdapter;
use serde::{Deserialize, Serialize};
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AdapterConfig {
Webhook(WebhookAdapter),
Kafka(KafkaAdapter),
Mqtt(MqttAdapter),
}
impl AdapterConfig {
/// create a new configuration with default values
pub fn new() -> Self {
Self::Webhook(WebhookAdapter::new())
}
}
impl Default for AdapterConfig {
/// create a new configuration with default values
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,23 +1,43 @@
/// Event configuration module
pub struct EventConfig {
pub event_type: String,
pub event_source: String,
pub event_destination: String,
use crate::event::adapters::AdapterConfig;
use serde::{Deserialize, Serialize};
use std::env;
#[allow(dead_code)]
const DEFAULT_CONFIG_FILE: &str = "event";
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotifierConfig {
#[serde(default = "default_store_path")]
pub store_path: String,
#[serde(default = "default_channel_capacity")]
pub channel_capacity: usize,
pub adapters: Vec<AdapterConfig>,
}
impl EventConfig {
/// Creates a new instance of `EventConfig` with default values.
pub fn new() -> Self {
Self {
event_type: "default".to_string(),
event_source: "default".to_string(),
event_destination: "default".to_string(),
}
}
}
impl Default for EventConfig {
impl Default for NotifierConfig {
fn default() -> Self {
Self::new()
}
}
impl NotifierConfig {
/// create a new configuration with default values
pub fn new() -> Self {
Self {
store_path: default_store_path(),
channel_capacity: default_channel_capacity(),
adapters: vec![AdapterConfig::new()],
}
}
}
/// Provide temporary directories as default storage paths
fn default_store_path() -> String {
env::temp_dir().join("event-notification").to_string_lossy().to_string()
}
/// Provides the recommended default channel capacity for high concurrency systems
fn default_channel_capacity() -> usize {
10000 // Reasonable default values for high concurrency systems
}

View File

@@ -1,17 +0,0 @@
/// Event configuration module
pub struct EventConfig {
pub event_type: String,
pub event_source: String,
pub event_destination: String,
}
impl EventConfig {
/// Creates a new instance of `EventConfig` with default values.
pub fn new() -> Self {
Self {
event_type: "default".to_string(),
event_source: "default".to_string(),
event_destination: "default".to_string(),
}
}
}

View File

@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
/// Configuration for the Kafka adapter.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaAdapter {
pub brokers: String,
pub topic: String,
pub max_retries: u32,
pub timeout: u64,
}
impl KafkaAdapter {
/// create a new configuration with default values
pub fn new() -> Self {
Self {
brokers: "localhost:9092".to_string(),
topic: "kafka_topic".to_string(),
max_retries: 3,
timeout: 1000,
}
}
}
impl Default for KafkaAdapter {
/// create a new configuration with default values
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,2 +1,5 @@
pub(crate) mod adapters;
pub(crate) mod config;
pub(crate) mod event;
pub(crate) mod kafka;
pub(crate) mod mqtt;
pub(crate) mod webhook;

View File

@@ -0,0 +1,31 @@
use serde::{Deserialize, Serialize};
/// Configuration for the MQTT adapter.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttAdapter {
pub broker: String,
pub port: u16,
pub client_id: String,
pub topic: String,
pub max_retries: u32,
}
impl MqttAdapter {
/// create a new configuration with default values
pub fn new() -> Self {
Self {
broker: "localhost".to_string(),
port: 1883,
client_id: "mqtt_client".to_string(),
topic: "mqtt_topic".to_string(),
max_retries: 3,
}
}
}
impl Default for MqttAdapter {
/// create a new configuration with default values
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookAdapter {
pub endpoint: String,
pub auth_token: Option<String>,
pub custom_headers: Option<HashMap<String, String>>,
pub max_retries: u32,
pub timeout: u64,
}
impl WebhookAdapter {
/// verify that the configuration is valid
pub fn validate(&self) -> Result<(), String> {
// verify that endpoint cannot be empty
if self.endpoint.trim().is_empty() {
return Err("Webhook endpoint cannot be empty".to_string());
}
// verification timeout must be reasonable
if self.timeout == 0 {
return Err("Webhook timeout must be greater than 0".to_string());
}
// Verify that the maximum number of retry is reasonable
if self.max_retries > 10 {
return Err("Maximum retry count cannot exceed 10".to_string());
}
Ok(())
}
/// Get the default configuration
pub fn new() -> Self {
Self {
endpoint: "".to_string(),
auth_token: None,
custom_headers: Some(HashMap::new()),
max_retries: 3,
timeout: 1000,
}
}
}
impl Default for WebhookAdapter {
fn default() -> Self {
Self::new()
}
}

View File

@@ -7,3 +7,5 @@ mod observability;
pub use config::RustFsConfig;
pub use constants::app::*;
pub use event::config::NotifierConfig;

View File

@@ -1,13 +1,13 @@
use crate::observability::logger::LoggerConfig;
use crate::observability::otel::OtelConfig;
use crate::observability::sink::SinkConfig;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
/// Observability configuration
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ObservabilityConfig {
pub otel: OtelConfig,
pub sinks: SinkConfig,
pub sinks: Vec<SinkConfig>,
pub logger: Option<LoggerConfig>,
}
@@ -15,7 +15,7 @@ impl ObservabilityConfig {
pub fn new() -> Self {
Self {
otel: OtelConfig::new(),
sinks: SinkConfig::new(),
sinks: vec![SinkConfig::new()],
logger: Some(LoggerConfig::new()),
}
}

View File

@@ -0,0 +1,59 @@
use serde::{Deserialize, Serialize};
use std::env;
/// File sink configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileSink {
pub path: String,
#[serde(default = "default_buffer_size")]
pub buffer_size: Option<usize>,
#[serde(default = "default_flush_interval_ms")]
pub flush_interval_ms: Option<u64>,
#[serde(default = "default_flush_threshold")]
pub flush_threshold: Option<usize>,
}
impl FileSink {
pub fn new() -> Self {
Self {
path: env::var("RUSTFS_SINKS_FILE_PATH")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(default_path),
buffer_size: default_buffer_size(),
flush_interval_ms: default_flush_interval_ms(),
flush_threshold: default_flush_threshold(),
}
}
}
impl Default for FileSink {
fn default() -> Self {
Self::new()
}
}
fn default_buffer_size() -> Option<usize> {
Some(8192)
}
fn default_flush_interval_ms() -> Option<u64> {
Some(1000)
}
fn default_flush_threshold() -> Option<usize> {
Some(100)
}
fn default_path() -> String {
let temp_dir = env::temp_dir().join("rustfs");
if let Err(e) = std::fs::create_dir_all(&temp_dir) {
eprintln!("Failed to create log directory: {}", e);
return "rustfs/rustfs.log".to_string();
}
temp_dir
.join("rustfs.log")
.to_str()
.unwrap_or("rustfs/rustfs.log")
.to_string()
}

View File

@@ -1,25 +0,0 @@
use serde::Deserialize;
/// File sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct FileSinkConfig {
pub path: String,
pub max_size: u64,
pub max_backups: u64,
}
impl FileSinkConfig {
pub fn new() -> Self {
Self {
path: "".to_string(),
max_size: 0,
max_backups: 0,
}
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,36 @@
use serde::{Deserialize, Serialize};
/// Kafka sink configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaSink {
pub brokers: String,
pub topic: String,
#[serde(default = "default_batch_size")]
pub batch_size: Option<usize>,
#[serde(default = "default_batch_timeout_ms")]
pub batch_timeout_ms: Option<u64>,
}
impl KafkaSink {
pub fn new() -> Self {
Self {
brokers: "localhost:9092".to_string(),
topic: "rustfs".to_string(),
batch_size: default_batch_size(),
batch_timeout_ms: default_batch_timeout_ms(),
}
}
}
impl Default for KafkaSink {
fn default() -> Self {
Self::new()
}
}
fn default_batch_size() -> Option<usize> {
Some(100)
}
fn default_batch_timeout_ms() -> Option<u64> {
Some(1000)
}

View File

@@ -1,23 +0,0 @@
use serde::Deserialize;
/// Kafka sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct KafkaSinkConfig {
pub brokers: Vec<String>,
pub topic: String,
}
impl KafkaSinkConfig {
pub fn new() -> Self {
Self {
brokers: vec!["localhost:9092".to_string()],
topic: "rustfs".to_string(),
}
}
}
impl Default for KafkaSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,7 +1,7 @@
use serde::Deserialize;
use serde::{Deserialize, Serialize};
/// Logger configuration
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LoggerConfig {
pub queue_capacity: Option<usize>,
}

View File

@@ -1,8 +1,7 @@
pub(crate) mod config;
pub(crate) mod file_sink;
pub(crate) mod kafka_sink;
pub(crate) mod file;
pub(crate) mod kafka;
pub(crate) mod logger;
pub(crate) mod observability;
pub(crate) mod otel;
pub(crate) mod sink;
pub(crate) mod webhook_sink;
pub(crate) mod webhook;

View File

@@ -1,22 +0,0 @@
use crate::observability::logger::LoggerConfig;
use crate::observability::otel::OtelConfig;
use crate::observability::sink::SinkConfig;
use serde::Deserialize;
/// Observability configuration
#[derive(Debug, Deserialize, Clone)]
pub struct ObservabilityConfig {
pub otel: OtelConfig,
pub sinks: SinkConfig,
pub logger: Option<LoggerConfig>,
}
impl ObservabilityConfig {
pub fn new() -> Self {
Self {
otel: OtelConfig::new(),
sinks: SinkConfig::new(),
logger: Some(LoggerConfig::new()),
}
}
}

View File

@@ -1,22 +1,25 @@
use serde::Deserialize;
use crate::constants::app::{ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT};
use crate::{APP_NAME, DEFAULT_LOG_LEVEL};
use serde::{Deserialize, Serialize};
use std::env;
/// OpenTelemetry configuration
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct OtelConfig {
pub endpoint: String,
pub service_name: String,
pub service_version: String,
pub resource_attributes: Vec<String>,
pub endpoint: String, // Endpoint for metric collection
pub use_stdout: Option<bool>, // Output to stdout
pub sample_ratio: Option<f64>, // Trace sampling ratio
pub meter_interval: Option<u64>, // Metric collection interval
pub service_name: Option<String>, // Service name
pub service_version: Option<String>, // Service version
pub environment: Option<String>, // Environment
pub logger_level: Option<String>, // Logger level
pub local_logging_enabled: Option<bool>, // Local logging enabled
}
impl OtelConfig {
pub fn new() -> Self {
Self {
endpoint: "http://localhost:4317".to_string(),
service_name: "rustfs".to_string(),
service_version: "0.1.0".to_string(),
resource_attributes: vec![],
}
extract_otel_config_from_env()
}
}
@@ -25,3 +28,42 @@ impl Default for OtelConfig {
Self::new()
}
}
// Helper function: Extract observable configuration from environment variables
fn extract_otel_config_from_env() -> OtelConfig {
OtelConfig {
endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()),
use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(USE_STDOUT)),
sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SAMPLE_RATIO)),
meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(METER_INTERVAL)),
service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(APP_NAME.to_string())),
service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_VERSION.to_string())),
environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(ENVIRONMENT.to_string())),
logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(false)),
}
}

View File

@@ -1,23 +1,20 @@
use crate::observability::file_sink::FileSinkConfig;
use crate::observability::kafka_sink::KafkaSinkConfig;
use crate::observability::webhook_sink::WebhookSinkConfig;
use serde::Deserialize;
use crate::observability::file::FileSink;
use crate::observability::kafka::KafkaSink;
use crate::observability::webhook::WebhookSink;
use serde::{Deserialize, Serialize};
/// Sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct SinkConfig {
pub kafka: Option<KafkaSinkConfig>,
pub webhook: Option<WebhookSinkConfig>,
pub file: Option<FileSinkConfig>,
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SinkConfig {
Kafka(KafkaSink),
Webhook(WebhookSink),
File(FileSink),
}
impl SinkConfig {
pub fn new() -> Self {
Self {
kafka: None,
webhook: None,
file: Some(FileSinkConfig::new()),
}
Self::File(FileSink::new())
}
}

View File

@@ -0,0 +1,39 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Webhook sink configuration
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebhookSink {
pub endpoint: String,
pub auth_token: String,
pub headers: Option<HashMap<String, String>>,
#[serde(default = "default_max_retries")]
pub max_retries: Option<usize>,
#[serde(default = "default_retry_delay_ms")]
pub retry_delay_ms: Option<u64>,
}
impl WebhookSink {
pub fn new() -> Self {
Self {
endpoint: "".to_string(),
auth_token: "".to_string(),
headers: Some(HashMap::new()),
max_retries: default_max_retries(),
retry_delay_ms: default_retry_delay_ms(),
}
}
}
impl Default for WebhookSink {
fn default() -> Self {
Self::new()
}
}
fn default_max_retries() -> Option<usize> {
Some(3)
}
fn default_retry_delay_ms() -> Option<u64> {
Some(100)
}

View File

@@ -1,25 +0,0 @@
use serde::Deserialize;
/// Webhook sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct WebhookSinkConfig {
pub url: String,
pub method: String,
pub headers: Vec<(String, String)>,
}
impl WebhookSinkConfig {
pub fn new() -> Self {
Self {
url: "http://localhost:8080/webhook".to_string(),
method: "POST".to_string(),
headers: vec![],
}
}
}
impl Default for WebhookSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -9,13 +9,12 @@ version.workspace = true
[features]
default = ["webhook"]
webhook = ["dep:reqwest"]
kafka = ["rdkafka"]
mqtt = ["rumqttc"]
kafka = ["dep:rdkafka"]
[dependencies]
async-trait = { workspace = true }
config = { workspace = true }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, optional = true }
rumqttc = { workspace = true, optional = true }
serde = { workspace = true }
@@ -29,12 +28,16 @@ tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-m
tokio-util = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
# Only enable kafka features and related dependencies on Linux
[target.'cfg(target_os = "linux")'.dependencies]
rdkafka = { workspace = true, features = ["tokio"], optional = true }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
http = { workspace = true }
axum = { workspace = true }
dotenvy = "0.15.7"
[lints]
workspace = true

View File

@@ -1,28 +0,0 @@
# ===== 全局配置 =====
NOTIFIER__STORE_PATH=/var/log/event-notification
NOTIFIER__CHANNEL_CAPACITY=5000
# ===== 适配器配置(数组格式) =====
# Webhook 适配器(索引 0
NOTIFIER__ADAPTERS_0__type=Webhook
NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook
NOTIFIER__ADAPTERS_0__auth_token=your-auth-token
NOTIFIER__ADAPTERS_0__max_retries=3
NOTIFIER__ADAPTERS_0__timeout=50
NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=value
NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=value
# Kafka 适配器(索引 1
NOTIFIER__ADAPTERS_1__type=Kafka
NOTIFIER__ADAPTERS_1__brokers=localhost:9092
NOTIFIER__ADAPTERS_1__topic=notifications
NOTIFIER__ADAPTERS_1__max_retries=3
NOTIFIER__ADAPTERS_1__timeout=60
# MQTT 适配器(索引 2
NOTIFIER__ADAPTERS_2__type=Mqtt
NOTIFIER__ADAPTERS_2__broker=mqtt.example.com
NOTIFIER__ADAPTERS_2__port=1883
NOTIFIER__ADAPTERS_2__client_id=event-notifier
NOTIFIER__ADAPTERS_2__topic=events
NOTIFIER__ADAPTERS_2__max_retries=3

View File

@@ -1,28 +1,28 @@
# ===== global configuration =====
NOTIFIER__STORE_PATH=/var/log/event-notification
NOTIFIER__CHANNEL_CAPACITY=5000
# ===== adapter configuration array format =====
# webhook adapter index 0
NOTIFIER__ADAPTERS_0__type=Webhook
NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook
NOTIFIER__ADAPTERS_0__auth_token=your-auth-token
NOTIFIER__ADAPTERS_0__max_retries=3
NOTIFIER__ADAPTERS_0__timeout=50
NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=server-value
NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=client-value
# kafka adapter index 1
NOTIFIER__ADAPTERS_1__type=Kafka
NOTIFIER__ADAPTERS_1__brokers=localhost:9092
NOTIFIER__ADAPTERS_1__topic=notifications
NOTIFIER__ADAPTERS_1__max_retries=3
NOTIFIER__ADAPTERS_1__timeout=60
# mqtt adapter index 2
NOTIFIER__ADAPTERS_2__type=Mqtt
NOTIFIER__ADAPTERS_2__broker=mqtt.example.com
NOTIFIER__ADAPTERS_2__port=1883
NOTIFIER__ADAPTERS_2__client_id=event-notifier
NOTIFIER__ADAPTERS_2__topic=events
NOTIFIER__ADAPTERS_2__max_retries=3
## ===== global configuration =====
#NOTIFIER__STORE_PATH=/var/log/event-notification
#NOTIFIER__CHANNEL_CAPACITY=5000
#
## ===== adapter configuration array format =====
## webhook adapter index 0
#NOTIFIER__ADAPTERS_0__type=Webhook
#NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook
#NOTIFIER__ADAPTERS_0__auth_token=your-auth-token
#NOTIFIER__ADAPTERS_0__max_retries=3
#NOTIFIER__ADAPTERS_0__timeout=50
#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=server-value
#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=client-value
#
## kafka adapter index 1
#NOTIFIER__ADAPTERS_1__type=Kafka
#NOTIFIER__ADAPTERS_1__brokers=localhost:9092
#NOTIFIER__ADAPTERS_1__topic=notifications
#NOTIFIER__ADAPTERS_1__max_retries=3
#NOTIFIER__ADAPTERS_1__timeout=60
#
## mqtt adapter index 2
#NOTIFIER__ADAPTERS_2__type=Mqtt
#NOTIFIER__ADAPTERS_2__broker=mqtt.example.com
#NOTIFIER__ADAPTERS_2__port=1883
#NOTIFIER__ADAPTERS_2__client_id=event-notifier
#NOTIFIER__ADAPTERS_2__topic=events
#NOTIFIER__ADAPTERS_2__max_retries=3

View File

@@ -0,0 +1,28 @@
## ===== 全局配置 =====
#NOTIFIER__STORE_PATH=/var/log/event-notification
#NOTIFIER__CHANNEL_CAPACITY=5000
#
## ===== 适配器配置(数组格式) =====
## Webhook 适配器(索引 0
#NOTIFIER__ADAPTERS_0__type=Webhook
#NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook
#NOTIFIER__ADAPTERS_0__auth_token=your-auth-token
#NOTIFIER__ADAPTERS_0__max_retries=3
#NOTIFIER__ADAPTERS_0__timeout=50
#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=value
#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=value
#
## Kafka 适配器(索引 1
#NOTIFIER__ADAPTERS_1__type=Kafka
#NOTIFIER__ADAPTERS_1__brokers=localhost:9092
#NOTIFIER__ADAPTERS_1__topic=notifications
#NOTIFIER__ADAPTERS_1__max_retries=3
#NOTIFIER__ADAPTERS_1__timeout=60
#
## MQTT 适配器(索引 2
#NOTIFIER__ADAPTERS_2__type=Mqtt
#NOTIFIER__ADAPTERS_2__broker=mqtt.example.com
#NOTIFIER__ADAPTERS_2__port=1883
#NOTIFIER__ADAPTERS_2__client_id=event-notifier
#NOTIFIER__ADAPTERS_2__topic=events
#NOTIFIER__ADAPTERS_2__max_retries=3

View File

@@ -33,7 +33,9 @@ async fn main() -> Result<(), Box<dyn error::Error>> {
// loading configuration from environment variables
let _config = NotifierConfig::event_load_config(Some("./crates/event-notifier/examples/event.toml".to_string()));
tracing::info!("event_load_config config: {:?} \n", _config);
dotenvy::dotenv()?;
let _config = NotifierConfig::event_load_config(None);
tracing::info!("event_load_config config: {:?} \n", _config);
let system = Arc::new(tokio::sync::Mutex::new(NotifierSystem::new(config.clone()).await?));
let adapters = create_adapters(&config.adapters)?;

View File

@@ -4,7 +4,7 @@ use crate::Event;
use async_trait::async_trait;
use std::sync::Arc;
#[cfg(feature = "kafka")]
#[cfg(all(feature = "kafka", target_os = "linux"))]
pub(crate) mod kafka;
#[cfg(feature = "mqtt")]
pub(crate) mod mqtt;
@@ -31,7 +31,7 @@ pub fn create_adapters(configs: &[AdapterConfig]) -> Result<Vec<Arc<dyn ChannelA
webhook_config.validate().map_err(Error::ConfigError)?;
adapters.push(Arc::new(webhook::WebhookAdapter::new(webhook_config.clone())));
}
#[cfg(feature = "kafka")]
#[cfg(all(feature = "kafka", target_os = "linux"))]
AdapterConfig::Kafka(kafka_config) => {
adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?));
}
@@ -43,7 +43,7 @@ pub fn create_adapters(configs: &[AdapterConfig]) -> Result<Vec<Arc<dyn ChannelA
}
#[cfg(not(feature = "webhook"))]
AdapterConfig::Webhook(_) => return Err(Error::FeatureDisabled("webhook")),
#[cfg(not(feature = "kafka"))]
#[cfg(any(not(feature = "kafka"), not(target_os = "linux")))]
AdapterConfig::Kafka(_) => return Err(Error::FeatureDisabled("kafka")),
#[cfg(not(feature = "mqtt"))]
AdapterConfig::Mqtt(_) => return Err(Error::FeatureDisabled("mqtt")),

View File

@@ -1,4 +1,4 @@
use config::{Config, Environment, File, FileFormat};
use config::{Config, File, FileFormat};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
@@ -138,15 +138,6 @@ impl NotifierConfig {
let app_config = Config::builder()
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false))
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false))
.add_source(
Environment::default()
.prefix("NOTIFIER")
.prefix_separator("__")
.separator("__")
.list_separator("_")
.with_list_parse_key("adapters")
.try_parsing(true),
)
.build()
.unwrap_or_default();
match app_config.try_deserialize::<NotifierConfig>() {

View File

@@ -15,7 +15,7 @@ pub enum Error {
Serde(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[cfg(feature = "kafka")]
#[cfg(all(feature = "kafka", target_os = "linux"))]
#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),
#[cfg(feature = "mqtt")]

View File

@@ -8,7 +8,7 @@ mod notifier;
mod store;
pub use adapter::create_adapters;
#[cfg(feature = "kafka")]
#[cfg(all(feature = "kafka", target_os = "linux"))]
pub use adapter::kafka::KafkaAdapter;
#[cfg(feature = "mqtt")]
pub use adapter::mqtt::MqttAdapter;
@@ -16,7 +16,7 @@ pub use adapter::mqtt::MqttAdapter;
pub use adapter::webhook::WebhookAdapter;
pub use adapter::ChannelAdapter;
pub use bus::event_bus;
#[cfg(feature = "kafka")]
#[cfg(all(feature = "kafka", target_os = "linux"))]
pub use config::KafkaConfig;
#[cfg(feature = "mqtt")]
pub use config::MqttConfig;

View File

@@ -13,11 +13,11 @@ workspace = true
default = ["file"]
file = []
gpu = ["dep:nvml-wrapper"]
kafka = ["dep:rdkafka"]
webhook = ["dep:reqwest"]
full = ["file", "gpu", "kafka", "webhook"]
kafka = ["dep:rdkafka"]
[dependencies]
rustfs-config = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
@@ -37,12 +37,14 @@ tracing-error = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, optional = true, default-features = false }
serde_json = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
# Only enable kafka features and related dependencies on Linux
[target.'cfg(target_os = "linux")'.dependencies]
rdkafka = { workspace = true, features = ["tokio"], optional = true }
[dev-dependencies]

View File

@@ -7,27 +7,28 @@ service_name = "rustfs_obs"
service_version = "0.1.0"
environments = "develop"
logger_level = "debug"
local_logging_enabled = true
[sinks]
[sinks.kafka]
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
#[[sinks]]
#type = "Kafka"
#bootstrap_servers = "localhost:9092"
#topic = "logs"
#batch_size = 100 # Default is 100 if not specified
#batch_timeout_ms = 100 # Default is 1000ms if not specified
#
#[[sinks]]
#type = "Webhook"
#endpoint = "http://localhost:8080/webhook"
#auth_token = ""
#batch_size = 100 # Default is 3 if not specified
#batch_timeout_ms = 100 # Default is 100ms if not specified
[sinks.webhook]
enabled = false
endpoint = "http://localhost:8080/webhook"
auth_token = ""
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "deploy/logs/app.log"
batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[[sinks]]
type = "File"
path = "deploy/logs/rustfs.log"
buffer_size = 102 # Default is 8192 bytes if not specified
flush_interval_ms = 1000
flush_threshold = 100
[logger]
queue_capacity = 10000

View File

@@ -1,6 +1,6 @@
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT};
use config::{Config, Environment, File, FileFormat};
use serde::Deserialize;
use config::{Config, File, FileFormat};
use serde::{Deserialize, Serialize};
use std::env;
/// OpenTelemetry Configuration
@@ -11,7 +11,7 @@ use std::env;
/// Add use_stdout for output to stdout
/// Add logger level for log level
/// Add local_logging_enabled for local logging enabled
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct OtelConfig {
pub endpoint: String, // Endpoint for metric collection
pub use_stdout: Option<bool>, // Output to stdout
@@ -24,7 +24,7 @@ pub struct OtelConfig {
pub local_logging_enabled: Option<bool>, // Local logging enabled
}
// Helper function: Extract observable configuration from environment variables
/// Helper function: Extract observable configuration from environment variables
fn extract_otel_config_from_env() -> OtelConfig {
OtelConfig {
endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()),
@@ -63,36 +63,89 @@ fn extract_otel_config_from_env() -> OtelConfig {
}
}
impl Default for OtelConfig {
fn default() -> Self {
impl OtelConfig {
/// Create a new instance of OtelConfig with default values
///
/// # Returns
/// A new instance of OtelConfig
pub fn new() -> Self {
extract_otel_config_from_env()
}
}
impl Default for OtelConfig {
fn default() -> Self {
Self::new()
}
}
/// Kafka Sink Configuration - Add batch parameters
#[derive(Debug, Deserialize, Clone, Default)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct KafkaSinkConfig {
pub enabled: bool,
pub bootstrap_servers: String,
pub brokers: String,
pub topic: String,
pub batch_size: Option<usize>, // Batch size, default 100
pub batch_timeout_ms: Option<u64>, // Batch timeout time, default 1000ms
}
impl KafkaSinkConfig {
pub fn new() -> Self {
Self {
brokers: env::var("RUSTFS__SINKS_0_KAFKA_BROKERS")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "localhost:9092".to_string()),
topic: env::var("RUSTFS__SINKS_0_KAFKA_TOPIC")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default_topic".to_string()),
batch_size: Some(100),
batch_timeout_ms: Some(1000),
}
}
}
impl Default for KafkaSinkConfig {
fn default() -> Self {
Self::new()
}
}
/// Webhook Sink Configuration - Add Retry Parameters
#[derive(Debug, Deserialize, Clone, Default)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebhookSinkConfig {
pub enabled: bool,
pub endpoint: String,
pub auth_token: String,
pub max_retries: Option<usize>, // Maximum number of retry times, default 3
pub retry_delay_ms: Option<u64>, // Retry the delay cardinality, default 100ms
}
impl WebhookSinkConfig {
pub fn new() -> Self {
Self {
endpoint: env::var("RUSTFS__SINKS_0_WEBHOOK_ENDPOINT")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "http://localhost:8080".to_string()),
auth_token: env::var("RUSTFS__SINKS_0_WEBHOOK_AUTH_TOKEN")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default_token".to_string()),
max_retries: Some(3),
retry_delay_ms: Some(100),
}
}
}
impl Default for WebhookSinkConfig {
fn default() -> Self {
Self::new()
}
}
/// File Sink Configuration - Add buffering parameters
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct FileSinkConfig {
pub enabled: bool,
pub path: String,
pub buffer_size: Option<usize>, // Write buffer size, default 8192
pub flush_interval_ms: Option<u64>, // Refresh interval time, default 1000ms
@@ -114,13 +167,9 @@ impl FileSinkConfig {
.unwrap_or("rustfs/rustfs.log")
.to_string()
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
FileSinkConfig {
enabled: true,
path: env::var("RUSTFS_SINKS_FILE_PATH")
pub fn new() -> Self {
Self {
path: env::var("RUSTFS__SINKS_0_FILE_PATH")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(Self::get_default_log_path),
@@ -131,38 +180,53 @@ impl Default for FileSinkConfig {
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
Self::new()
}
}
/// Sink configuration collection
#[derive(Debug, Deserialize, Clone)]
pub struct SinkConfig {
pub kafka: Option<KafkaSinkConfig>,
pub webhook: Option<WebhookSinkConfig>,
pub file: Option<FileSinkConfig>,
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SinkConfig {
File(FileSinkConfig),
Kafka(KafkaSinkConfig),
Webhook(WebhookSinkConfig),
}
impl SinkConfig {
pub fn new() -> Self {
Self::File(FileSinkConfig::new())
}
}
impl Default for SinkConfig {
fn default() -> Self {
SinkConfig {
kafka: None,
webhook: None,
file: Some(FileSinkConfig::default()),
}
Self::new()
}
}
///Logger Configuration
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LoggerConfig {
pub queue_capacity: Option<usize>,
}
impl Default for LoggerConfig {
fn default() -> Self {
LoggerConfig {
impl LoggerConfig {
pub fn new() -> Self {
Self {
queue_capacity: Some(10000),
}
}
}
impl Default for LoggerConfig {
fn default() -> Self {
Self::new()
}
}
/// Overall application configuration
/// Add observability, sinks, and logger configuration
///
@@ -180,7 +244,7 @@ impl Default for LoggerConfig {
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub observability: OtelConfig,
pub sinks: SinkConfig,
pub sinks: Vec<SinkConfig>,
pub logger: Option<LoggerConfig>,
}
@@ -192,7 +256,7 @@ impl AppConfig {
pub fn new() -> Self {
Self {
observability: OtelConfig::default(),
sinks: SinkConfig::default(),
sinks: vec![SinkConfig::default()],
logger: Some(LoggerConfig::default()),
}
}
@@ -258,14 +322,6 @@ pub fn load_config(config_dir: Option<String>) -> AppConfig {
let app_config = Config::builder()
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false))
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false))
.add_source(
Environment::default()
.prefix("RUSTFS")
.prefix_separator("__")
.separator("__")
.with_list_parse_key("volumes")
.try_parsing(true),
)
.build()
.unwrap_or_default();

View File

@@ -32,7 +32,7 @@ mod config;
mod entry;
mod global;
mod logger;
mod sink;
mod sinks;
mod system;
mod telemetry;
mod utils;
@@ -40,12 +40,6 @@ mod worker;
use crate::logger::InitLogStatus;
pub use config::load_config;
#[cfg(feature = "file")]
pub use config::FileSinkConfig;
#[cfg(feature = "kafka")]
pub use config::KafkaSinkConfig;
#[cfg(feature = "webhook")]
pub use config::WebhookSinkConfig;
pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig};
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
@@ -79,7 +73,7 @@ use tracing::{error, info};
/// ```
pub async fn init_obs(config: AppConfig) -> (Arc<Mutex<Logger>>, telemetry::OtelGuard) {
let guard = init_telemetry(&config.observability);
let sinks = sink::create_sinks(&config).await;
let sinks = sinks::create_sinks(&config).await;
let logger = init_global_logger(&config, sinks).await;
let obs_config = config.observability.clone();
tokio::spawn(async move {

View File

@@ -1,5 +1,5 @@
use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION};
use crate::sink::Sink;
use crate::sinks::Sink;
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry};
use std::sync::Arc;
use std::time::SystemTime;

View File

@@ -1,497 +0,0 @@
use crate::{AppConfig, LogRecord, UnifiedLogEntry};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::AsyncWriteExt;
/// Sink Trait definition, asynchronously write logs
#[async_trait]
pub trait Sink: Send + Sync {
async fn write(&self, entry: &UnifiedLogEntry);
}
#[cfg(feature = "kafka")]
/// Kafka Sink Implementation
pub struct KafkaSink {
producer: rdkafka::producer::FutureProducer,
topic: String,
batch_size: usize,
batch_timeout_ms: u64,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
}
#[cfg(feature = "kafka")]
impl KafkaSink {
/// Create a new KafkaSink instance
pub fn new(producer: rdkafka::producer::FutureProducer, topic: String, batch_size: usize, batch_timeout_ms: u64) -> Self {
// Create Arc-wrapped values first
let entries = Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(batch_size)));
let last_flush = Arc::new(std::sync::atomic::AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
));
let sink = KafkaSink {
producer: producer.clone(),
topic: topic.clone(),
batch_size,
batch_timeout_ms,
entries: entries.clone(),
last_flush: last_flush.clone(),
};
// Start background flusher
tokio::spawn(Self::periodic_flush(producer, topic, entries, last_flush, batch_timeout_ms));
sink
}
/// Add a getter method to read the batch_timeout_ms field
#[allow(dead_code)]
pub fn batch_timeout(&self) -> u64 {
self.batch_timeout_ms
}
/// Add a method to dynamically adjust the timeout if needed
#[allow(dead_code)]
pub fn set_batch_timeout(&mut self, new_timeout_ms: u64) {
self.batch_timeout_ms = new_timeout_ms;
}
async fn periodic_flush(
producer: rdkafka::producer::FutureProducer,
topic: String,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
timeout_ms: u64,
) {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms / 2)).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = last_flush.load(std::sync::atomic::Ordering::Relaxed);
if now - last >= timeout_ms {
let mut batch = entries.lock().await;
if !batch.is_empty() {
Self::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec<UnifiedLogEntry>) {
for entry in entries {
let payload = match serde_json::to_string(&entry) {
Ok(p) => p,
Err(e) => {
eprintln!("Failed to serialize log entry: {}", e);
continue;
}
};
let span_id = entry.get_timestamp().to_rfc3339();
let _ = producer
.send(
rdkafka::producer::FutureRecord::to(topic).payload(&payload).key(&span_id),
std::time::Duration::from_secs(5),
)
.await;
}
}
}
#[cfg(feature = "kafka")]
#[async_trait]
impl Sink for KafkaSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut batch = self.entries.lock().await;
batch.push(entry.clone());
let should_flush_by_size = batch.len() >= self.batch_size;
let should_flush_by_time = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.batch_timeout_ms
};
if should_flush_by_size || should_flush_by_time {
// Existing flush logic
let entries_to_send: Vec<UnifiedLogEntry> = batch.drain(..).collect();
let producer = self.producer.clone();
let topic = self.topic.clone();
self.last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
tokio::spawn(async move {
KafkaSink::send_batch(&producer, &topic, entries_to_send).await;
});
}
}
}
#[cfg(feature = "kafka")]
impl Drop for KafkaSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to flush any remaining entries
let producer = self.producer.clone();
let topic = self.topic.clone();
let entries = self.entries.clone();
let last_flush = self.last_flush.clone();
tokio::spawn(async move {
let mut batch = entries.lock().await;
if !batch.is_empty() {
KafkaSink::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
}
});
eprintln!("Dropping KafkaSink with topic: {}", self.topic);
}
}
#[cfg(feature = "webhook")]
/// Webhook Sink Implementation
pub struct WebhookSink {
endpoint: String,
auth_token: String,
client: reqwest::Client,
max_retries: usize,
retry_delay_ms: u64,
}
#[cfg(feature = "webhook")]
impl WebhookSink {
pub fn new(endpoint: String, auth_token: String, max_retries: usize, retry_delay_ms: u64) -> Self {
WebhookSink {
endpoint,
auth_token,
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_else(|_| reqwest::Client::new()),
max_retries,
retry_delay_ms,
}
}
}
#[cfg(feature = "webhook")]
#[async_trait]
impl Sink for WebhookSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut retries = 0;
let url = self.endpoint.clone();
let entry_clone = entry.clone();
let auth_value = reqwest::header::HeaderValue::from_str(format!("Bearer {}", self.auth_token.clone()).as_str()).unwrap();
while retries < self.max_retries {
match self
.client
.post(&url)
.header(reqwest::header::AUTHORIZATION, auth_value.clone())
.json(&entry_clone)
.send()
.await
{
Ok(response) if response.status().is_success() => {
return;
}
_ => {
retries += 1;
if retries < self.max_retries {
tokio::time::sleep(tokio::time::Duration::from_millis(
self.retry_delay_ms * (1 << retries), // Exponential backoff
))
.await;
}
}
}
}
eprintln!("Failed to send log to webhook after {} retries", self.max_retries);
}
}
#[cfg(feature = "webhook")]
impl Drop for WebhookSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to log that the sink is being dropped
eprintln!("Dropping WebhookSink with URL: {}", self.endpoint);
}
}
#[cfg(feature = "file")]
/// File Sink Implementation
pub struct FileSink {
path: String,
buffer_size: usize,
writer: Arc<tokio::sync::Mutex<io::BufWriter<tokio::fs::File>>>,
entry_count: std::sync::atomic::AtomicUsize,
last_flush: std::sync::atomic::AtomicU64,
flush_interval_ms: u64, // Time between flushes
flush_threshold: usize, // Number of entries before flush
}
#[cfg(feature = "file")]
impl FileSink {
/// Create a new FileSink instance
pub async fn new(
path: String,
buffer_size: usize,
flush_interval_ms: u64,
flush_threshold: usize,
) -> Result<Self, io::Error> {
// check if the file exists
let file_exists = tokio::fs::metadata(&path).await.is_ok();
// if the file not exists, create it
if !file_exists {
tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?;
tracing::debug!("File does not exist, creating it. Path: {:?}", path)
}
let file = if file_exists {
// If the file exists, open it in append mode
tracing::debug!("FileSink: File exists, opening in append mode.");
OpenOptions::new().append(true).create(true).open(&path).await?
} else {
// If the file does not exist, create it
tracing::debug!("FileSink: File does not exist, creating a new file.");
// Create the file and write a header or initial content if needed
OpenOptions::new().create(true).truncate(true).write(true).open(&path).await?
};
let writer = io::BufWriter::with_capacity(buffer_size, file);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(FileSink {
path,
buffer_size,
writer: Arc::new(tokio::sync::Mutex::new(writer)),
entry_count: std::sync::atomic::AtomicUsize::new(0),
last_flush: std::sync::atomic::AtomicU64::new(now),
flush_interval_ms,
flush_threshold,
})
}
#[allow(dead_code)]
async fn initialize_writer(&mut self) -> io::Result<()> {
let file = tokio::fs::File::create(&self.path).await?;
// Use buffer_size to create a buffer writer with a specified capacity
let buf_writer = io::BufWriter::with_capacity(self.buffer_size, file);
// Replace the original writer with the new Mutex
self.writer = Arc::new(tokio::sync::Mutex::new(buf_writer));
Ok(())
}
// Get the current buffer size
#[allow(dead_code)]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
// How to dynamically adjust the buffer size
#[allow(dead_code)]
pub async fn set_buffer_size(&mut self, new_size: usize) -> io::Result<()> {
if self.buffer_size != new_size {
self.buffer_size = new_size;
// Reinitialize the writer directly, without checking is_some()
self.initialize_writer().await?;
}
Ok(())
}
// Check if flushing is needed based on count or time
fn should_flush(&self) -> bool {
// Check entry count threshold
if self.entry_count.load(std::sync::atomic::Ordering::Relaxed) >= self.flush_threshold {
return true;
}
// Check time threshold
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.flush_interval_ms
}
}
#[cfg(feature = "file")]
#[async_trait]
impl Sink for FileSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let line = format!("{:?}\n", entry);
let mut writer = self.writer.lock().await;
if let Err(e) = writer.write_all(line.as_bytes()).await {
eprintln!(
"Failed to write log to file {}: {},entry timestamp:{:?}",
self.path,
e,
entry.get_timestamp()
);
return;
}
// Only flush periodically to improve performance
// Logic to determine when to flush could be added here
// Increment the entry count
self.entry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Check if we should flush
if self.should_flush() {
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush log file {}: {}", self.path, e);
return;
}
// Reset counters
self.entry_count.store(0, std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
#[cfg(feature = "file")]
impl Drop for FileSink {
fn drop(&mut self) {
let writer = self.writer.clone();
let path = self.path.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut writer = writer.lock().await;
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush log file {}: {}", path, e);
}
});
});
}
}
/// Create a list of Sink instances
pub async fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
let mut sinks: Vec<Arc<dyn Sink>> = Vec::new();
#[cfg(feature = "kafka")]
{
match &config.sinks.kafka {
Some(sink_kafka) => {
if sink_kafka.enabled {
match rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &sink_kafka.bootstrap_servers)
.set("message.timeout.ms", "5000")
.create()
{
Ok(producer) => {
sinks.push(Arc::new(KafkaSink::new(
producer,
sink_kafka.topic.clone(),
sink_kafka.batch_size.unwrap_or(100),
sink_kafka.batch_timeout_ms.unwrap_or(1000),
)));
}
Err(e) => {
tracing::error!("Failed to create Kafka producer: {}", e);
}
}
} else {
tracing::info!("Kafka sink is disabled in the configuration");
}
}
_ => {
tracing::info!("Kafka sink is not configured or disabled");
}
}
}
#[cfg(feature = "webhook")]
{
match &config.sinks.webhook {
Some(sink_webhook) => {
if sink_webhook.enabled {
sinks.push(Arc::new(WebhookSink::new(
sink_webhook.endpoint.clone(),
sink_webhook.auth_token.clone(),
sink_webhook.max_retries.unwrap_or(3),
sink_webhook.retry_delay_ms.unwrap_or(100),
)));
} else {
tracing::info!("Webhook sink is disabled in the configuration");
}
}
_ => {
tracing::info!("Webhook sink is not configured or disabled");
}
}
}
#[cfg(feature = "file")]
{
// let config = config.clone();
match &config.sinks.file {
Some(sink_file) => {
tracing::info!("File sink is enabled in the configuration");
let path = if sink_file.enabled {
sink_file.path.clone()
} else {
"rustfs.log".to_string()
};
tracing::debug!("FileSink: Using path: {}", path);
sinks.push(Arc::new(
FileSink::new(
path.clone(),
sink_file.buffer_size.unwrap_or(8192),
sink_file.flush_interval_ms.unwrap_or(1000),
sink_file.flush_threshold.unwrap_or(100),
)
.await
.unwrap(),
));
}
_ => {
tracing::info!("File sink is not configured or disabled");
}
}
}
sinks
}

View File

@@ -0,0 +1,164 @@
use crate::sinks::Sink;
use crate::{LogRecord, UnifiedLogEntry};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::AsyncWriteExt;
/// File Sink Implementation
pub struct FileSink {
path: String,
buffer_size: usize,
writer: Arc<tokio::sync::Mutex<io::BufWriter<tokio::fs::File>>>,
entry_count: std::sync::atomic::AtomicUsize,
last_flush: std::sync::atomic::AtomicU64,
flush_interval_ms: u64, // Time between flushes
flush_threshold: usize, // Number of entries before flush
}
impl FileSink {
/// Create a new FileSink instance
pub async fn new(
path: String,
buffer_size: usize,
flush_interval_ms: u64,
flush_threshold: usize,
) -> Result<Self, io::Error> {
// check if the file exists
let file_exists = tokio::fs::metadata(&path).await.is_ok();
// if the file not exists, create it
if !file_exists {
tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?;
tracing::debug!("File does not exist, creating it. Path: {:?}", path)
}
let file = if file_exists {
// If the file exists, open it in append mode
tracing::debug!("FileSink: File exists, opening in append mode.");
OpenOptions::new().append(true).create(true).open(&path).await?
} else {
// If the file does not exist, create it
tracing::debug!("FileSink: File does not exist, creating a new file.");
// Create the file and write a header or initial content if needed
OpenOptions::new().create(true).truncate(true).write(true).open(&path).await?
};
let writer = io::BufWriter::with_capacity(buffer_size, file);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(FileSink {
path,
buffer_size,
writer: Arc::new(tokio::sync::Mutex::new(writer)),
entry_count: std::sync::atomic::AtomicUsize::new(0),
last_flush: std::sync::atomic::AtomicU64::new(now),
flush_interval_ms,
flush_threshold,
})
}
#[allow(dead_code)]
async fn initialize_writer(&mut self) -> io::Result<()> {
let file = tokio::fs::File::create(&self.path).await?;
// Use buffer_size to create a buffer writer with a specified capacity
let buf_writer = io::BufWriter::with_capacity(self.buffer_size, file);
// Replace the original writer with the new Mutex
self.writer = Arc::new(tokio::sync::Mutex::new(buf_writer));
Ok(())
}
// Get the current buffer size
#[allow(dead_code)]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
// How to dynamically adjust the buffer size
#[allow(dead_code)]
pub async fn set_buffer_size(&mut self, new_size: usize) -> io::Result<()> {
if self.buffer_size != new_size {
self.buffer_size = new_size;
// Reinitialize the writer directly, without checking is_some()
self.initialize_writer().await?;
}
Ok(())
}
// Check if flushing is needed based on count or time
fn should_flush(&self) -> bool {
// Check entry count threshold
if self.entry_count.load(std::sync::atomic::Ordering::Relaxed) >= self.flush_threshold {
return true;
}
// Check time threshold
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.flush_interval_ms
}
}
#[async_trait]
impl Sink for FileSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let line = format!("{:?}\n", entry);
let mut writer = self.writer.lock().await;
if let Err(e) = writer.write_all(line.as_bytes()).await {
eprintln!(
"Failed to write log to file {}: {},entry timestamp:{:?}",
self.path,
e,
entry.get_timestamp()
);
return;
}
// Only flush periodically to improve performance
// Logic to determine when to flush could be added here
// Increment the entry count
self.entry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Check if we should flush
if self.should_flush() {
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush log file {}: {}", self.path, e);
return;
}
// Reset counters
self.entry_count.store(0, std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
impl Drop for FileSink {
fn drop(&mut self) {
let writer = self.writer.clone();
let path = self.path.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut writer = writer.lock().await;
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush log file {}: {}", path, e);
}
});
});
}
}

View File

@@ -0,0 +1,165 @@
use crate::sinks::Sink;
use crate::{LogRecord, UnifiedLogEntry};
use async_trait::async_trait;
use std::sync::Arc;
/// Kafka Sink Implementation
pub struct KafkaSink {
producer: rdkafka::producer::FutureProducer,
topic: String,
batch_size: usize,
batch_timeout_ms: u64,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
}
impl KafkaSink {
/// Create a new KafkaSink instance
pub fn new(producer: rdkafka::producer::FutureProducer, topic: String, batch_size: usize, batch_timeout_ms: u64) -> Self {
// Create Arc-wrapped values first
let entries = Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(batch_size)));
let last_flush = Arc::new(std::sync::atomic::AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
));
let sink = KafkaSink {
producer: producer.clone(),
topic: topic.clone(),
batch_size,
batch_timeout_ms,
entries: entries.clone(),
last_flush: last_flush.clone(),
};
// Start background flusher
tokio::spawn(Self::periodic_flush(producer, topic, entries, last_flush, batch_timeout_ms));
sink
}
/// Add a getter method to read the batch_timeout_ms field
#[allow(dead_code)]
pub fn batch_timeout(&self) -> u64 {
self.batch_timeout_ms
}
/// Add a method to dynamically adjust the timeout if needed
#[allow(dead_code)]
pub fn set_batch_timeout(&mut self, new_timeout_ms: u64) {
self.batch_timeout_ms = new_timeout_ms;
}
async fn periodic_flush(
producer: rdkafka::producer::FutureProducer,
topic: String,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
timeout_ms: u64,
) {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms / 2)).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = last_flush.load(std::sync::atomic::Ordering::Relaxed);
if now - last >= timeout_ms {
let mut batch = entries.lock().await;
if !batch.is_empty() {
Self::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec<UnifiedLogEntry>) {
for entry in entries {
let payload = match serde_json::to_string(&entry) {
Ok(p) => p,
Err(e) => {
eprintln!("Failed to serialize log entry: {}", e);
continue;
}
};
let span_id = entry.get_timestamp().to_rfc3339();
let _ = producer
.send(
rdkafka::producer::FutureRecord::to(topic).payload(&payload).key(&span_id),
std::time::Duration::from_secs(5),
)
.await;
}
}
}
#[async_trait]
impl Sink for KafkaSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut batch = self.entries.lock().await;
batch.push(entry.clone());
let should_flush_by_size = batch.len() >= self.batch_size;
let should_flush_by_time = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.batch_timeout_ms
};
if should_flush_by_size || should_flush_by_time {
// Existing flush logic
let entries_to_send: Vec<UnifiedLogEntry> = batch.drain(..).collect();
let producer = self.producer.clone();
let topic = self.topic.clone();
self.last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
tokio::spawn(async move {
KafkaSink::send_batch(&producer, &topic, entries_to_send).await;
});
}
}
}
impl Drop for KafkaSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to flush any remaining entries
let producer = self.producer.clone();
let topic = self.topic.clone();
let entries = self.entries.clone();
let last_flush = self.last_flush.clone();
tokio::spawn(async move {
let mut batch = entries.lock().await;
if !batch.is_empty() {
KafkaSink::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
}
});
eprintln!("Dropping KafkaSink with topic: {}", self.topic);
}
}

View File

@@ -0,0 +1,92 @@
use crate::{AppConfig, SinkConfig, UnifiedLogEntry};
use async_trait::async_trait;
use std::sync::Arc;
#[cfg(feature = "file")]
mod file;
#[cfg(all(feature = "kafka", target_os = "linux"))]
mod kafka;
#[cfg(feature = "webhook")]
mod webhook;
/// Sink Trait definition, asynchronously write logs
#[async_trait]
pub trait Sink: Send + Sync {
async fn write(&self, entry: &UnifiedLogEntry);
}
/// Create a list of Sink instances
pub async fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
let mut sinks: Vec<Arc<dyn Sink>> = Vec::new();
for sink_config in &config.sinks {
match sink_config {
#[cfg(all(feature = "kafka", target_os = "linux"))]
SinkConfig::Kafka(kafka_config) => {
match rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &kafka_config.brokers)
.set("message.timeout.ms", "5000")
.create()
{
Ok(producer) => {
sinks.push(Arc::new(kafka::KafkaSink::new(
producer,
kafka_config.topic.clone(),
kafka_config.batch_size.unwrap_or(100),
kafka_config.batch_timeout_ms.unwrap_or(1000),
)));
tracing::info!("Kafka sink created for topic: {}", kafka_config.topic);
}
Err(e) => {
tracing::error!("Failed to create Kafka producer: {}", e);
}
}
}
#[cfg(feature = "webhook")]
SinkConfig::Webhook(webhook_config) => {
sinks.push(Arc::new(webhook::WebhookSink::new(
webhook_config.endpoint.clone(),
webhook_config.auth_token.clone(),
webhook_config.max_retries.unwrap_or(3),
webhook_config.retry_delay_ms.unwrap_or(100),
)));
tracing::info!("Webhook sink created for endpoint: {}", webhook_config.endpoint);
}
#[cfg(feature = "file")]
SinkConfig::File(file_config) => {
tracing::debug!("FileSink: Using path: {}", file_config.path);
match file::FileSink::new(
file_config.path.clone(),
file_config.buffer_size.unwrap_or(8192),
file_config.flush_interval_ms.unwrap_or(1000),
file_config.flush_threshold.unwrap_or(100),
)
.await
{
Ok(sink) => {
sinks.push(Arc::new(sink));
tracing::info!("File sink created for path: {}", file_config.path);
}
Err(e) => {
tracing::error!("Failed to create File sink: {}", e);
}
}
}
#[cfg(any(not(feature = "kafka"), not(target_os = "linux")))]
SinkConfig::Kafka(_) => {
tracing::warn!("Kafka sink is configured but the 'kafka' feature is not enabled");
}
#[cfg(not(feature = "webhook"))]
SinkConfig::Webhook(_) => {
tracing::warn!("Webhook sink is configured but the 'webhook' feature is not enabled");
}
#[cfg(not(feature = "file"))]
SinkConfig::File(_) => {
tracing::warn!("File sink is configured but the 'file' feature is not enabled");
}
}
}
sinks
}

View File

@@ -0,0 +1,70 @@
use crate::sinks::Sink;
use crate::UnifiedLogEntry;
use async_trait::async_trait;
/// Webhook Sink Implementation
pub struct WebhookSink {
endpoint: String,
auth_token: String,
client: reqwest::Client,
max_retries: usize,
retry_delay_ms: u64,
}
impl WebhookSink {
pub fn new(endpoint: String, auth_token: String, max_retries: usize, retry_delay_ms: u64) -> Self {
WebhookSink {
endpoint,
auth_token,
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_else(|_| reqwest::Client::new()),
max_retries,
retry_delay_ms,
}
}
}
#[async_trait]
impl Sink for WebhookSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut retries = 0;
let url = self.endpoint.clone();
let entry_clone = entry.clone();
let auth_value = reqwest::header::HeaderValue::from_str(format!("Bearer {}", self.auth_token.clone()).as_str()).unwrap();
while retries < self.max_retries {
match self
.client
.post(&url)
.header(reqwest::header::AUTHORIZATION, auth_value.clone())
.json(&entry_clone)
.send()
.await
{
Ok(response) if response.status().is_success() => {
return;
}
_ => {
retries += 1;
if retries < self.max_retries {
tokio::time::sleep(tokio::time::Duration::from_millis(
self.retry_delay_ms * (1 << retries), // Exponential backoff
))
.await;
}
}
}
}
eprintln!("Failed to send log to webhook after {} retries", self.max_retries);
}
}
impl Drop for WebhookSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to log that the sink is being dropped
eprintln!("Dropping WebhookSink with URL: {}", self.endpoint);
}
}

View File

@@ -1,4 +1,4 @@
use crate::{sink::Sink, UnifiedLogEntry};
use crate::{sinks::Sink, UnifiedLogEntry};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;

View File

@@ -20,7 +20,7 @@ pub fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
// Load and return certificate.
let certs = certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| certs_error(format!("certificate file {} format error", filename)))?;
.map_err(|e| certs_error(format!("certificate file {} format error:{:?}", filename, e)))?;
if certs.is_empty() {
return Err(certs_error(format!(
"No valid certificate was found in the certificate file {}",
@@ -165,7 +165,7 @@ pub fn create_multi_cert_resolver(
for (domain, (certs, key)) in cert_key_pairs {
// create a signature
let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key)
.map_err(|_| certs_error(format!("unsupported private key types:{}", domain)))?;
.map_err(|e| certs_error(format!("unsupported private key types:{}, err:{:?}", domain, e)))?;
// create a CertifiedKey
let certified_key = CertifiedKey::new(certs, signing_key);

View File

@@ -17,7 +17,7 @@ chacha20poly1305 = { version = "0.10.1", optional = true }
jsonwebtoken = { workspace = true }
pbkdf2 = { version = "0.12.2", optional = true }
rand = { workspace = true, optional = true }
sha2 = { version = "0.10.8", optional = true }
sha2 = { workspace = true, optional = true }
thiserror.workspace = true
serde_json.workspace = true

View File

@@ -1,27 +1,28 @@
OBSERVABILITY__ENDPOINT=http://localhost:4317
OBSERVABILITY__USE_STDOUT=true
OBSERVABILITY__SAMPLE_RATIO=2.0
OBSERVABILITY__METER_INTERVAL=30
OBSERVABILITY__SERVICE_NAME=rustfs
OBSERVABILITY__SERVICE_VERSION=0.1.0
OBSERVABILITY__ENVIRONMENT=develop
OBSERVABILITY__LOGGER_LEVEL=debug
SINKS__KAFKA__ENABLED=false
SINKS__KAFKA__BOOTSTRAP_SERVERS=localhost:9092
SINKS__KAFKA__TOPIC=logs
SINKS__KAFKA__BATCH_SIZE=100
SINKS__KAFKA__BATCH_TIMEOUT_MS=1000
SINKS__WEBHOOK__ENABLED=false
SINKS__WEBHOOK__ENDPOINT=http://localhost:8080/webhook
SINKS__WEBHOOK__AUTH_TOKEN=
SINKS__WEBHOOK__BATCH_SIZE=100
SINKS__WEBHOOK__BATCH_TIMEOUT_MS=1000
SINKS__FILE__ENABLED=true
SINKS__FILE__PATH=./deploy/logs/rustfs.log
SINKS__FILE__BATCH_SIZE=10
SINKS__FILE__BATCH_TIMEOUT_MS=1000
LOGGER__QUEUE_CAPACITY=10
#RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
#RUSTFS__OBSERVABILITY__USE_STDOUT=true
#RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
#RUSTFS__OBSERVABILITY__METER_INTERVAL=30
#RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
#RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
#RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
#RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug
#
#RUSTFS__SINKS_0__type=Kakfa
#RUSTFS__SINKS_0__brokers=localhost:9092
#RUSTFS__SINKS_0__topic=logs
#RUSTFS__SINKS_0__batch_size=100
#RUSTFS__SINKS_0__batch_timeout_ms=1000
#
#RUSTFS__SINKS_1__type=Webhook
#RUSTFS__SINKS_1__endpoint=http://localhost:8080/webhook
#RUSTFS__SINKS_1__auth_token=you-auth-token
#RUSTFS__SINKS_1__batch_size=100
#RUSTFS__SINKS_1__batch_timeout_ms=1000
#
#RUSTFS__SINKS_2__type=File
#RUSTFS__SINKS_2__path=./deploy/logs/rustfs.log
#RUSTFS__SINKS_2__buffer_size=10
#RUSTFS__SINKS_2__flush_interval_ms=1000
#RUSTFS__SINKS_2__flush_threshold=100
#
#RUSTFS__LOGGER__QUEUE_CAPACITY=10

View File

@@ -9,26 +9,26 @@ environments = "develop" # 运行环境,如开发环境 (develop)
logger_level = "debug" # 日志级别,可选 debug/info/warn/error 等
local_logging_enabled = true # 是否启用本地 stdout 日志记录true 表示启用false 表示禁用
[sinks]
[sinks.kafka] # Kafka 接收器配置
enabled = false # 是否启用 Kafka 接收器,默认禁用
bootstrap_servers = "localhost:9092" # Kafka 服务器地址
topic = "logs" # Kafka 主题名称
batch_size = 100 # 批处理大小,每次发送的消息数量
batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
#[[sinks]] # Kafka 接收器配置
#type = "Kafka" # 是否启用 Kafka 接收器,默认禁用
#brokers = "localhost:9092" # Kafka 服务器地址
#topic = "logs" # Kafka 主题名称
#batch_size = 100 # 批处理大小,每次发送的消息数量
#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
[sinks.webhook] # Webhook 接收器配置
enabled = false # 是否启用 Webhook 接收器
endpoint = "http://localhost:8080/webhook" # Webhook 接收地址
auth_token = "" # 认证令牌
batch_size = 100 # 批处理大小
batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
#[[sinks]] # Webhook 接收器配置
#type = "Webhook" # 是否启用 Webhook 接收器
#endpoint = "http://localhost:8080/webhook" # Webhook 接收地址
#auth_token = "" # 认证令牌
#batch_size = 100 # 批处理大小
#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
[sinks.file] # 文件接收器配置
enabled = true # 是否启用文件接收器
[[sinks]] # 文件接收器配置
type = "File" # 是否启用文件接收器
path = "./deploy/logs/rustfs.log" # 日志文件路径
batch_size = 10 # 批处理大小
batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
buffer_size = 10 # 缓冲区大小,表示每次写入的字节数
flush_interval_ms = 100 # 批处理超时时间,单位为毫秒
flush_threshold = 100 # 刷新阈值,表示在达到该数量时刷新日志
[logger] # 日志器配置
queue_capacity = 10 # 日志队列容量,表示可以缓存的日志条数

View File

@@ -4,31 +4,31 @@ use_stdout = false # Output with stdout, true output, false no output
sample_ratio = 2.0
meter_interval = 30
service_name = "rustfs"
service_version = "0.1.0"
service_version = "0.0.1"
environment = "develop"
logger_level = "error"
logger_level = "info"
local_logging_enabled = true
[sinks]
[sinks.kafka] # Kafka sink is disabled by default
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
#[[sinks]]
#type = "Kafka"
#brokers = "localhost:9092"
#topic = "logs"
#batch_size = 100 # Default is 100 if not specified
#batch_timeout_ms = 100 # Default is 1000ms if not specified
#
#[[sinks]]
#type = "Webhook"
#endpoint = "http://localhost:8080/webhook"
#auth_token = ""
#batch_size = 100 # Default is 3 if not specified
#batch_timeout_ms = 100 # Default is 100ms if not specified
[sinks.webhook]
enabled = false
endpoint = "http://localhost:8080/webhook"
auth_token = ""
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "./deploy/logs/rustfs.log"
batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[[sinks]]
type = "File"
path = "deploy/logs/rustfs.log"
buffer_size = 101 # Default is 8192 bytes if not specified
flush_interval_ms = 1000
flush_threshold = 100
[logger]
queue_capacity = 10000

View File

@@ -23,4 +23,6 @@ RUSTFS_LICENSE="license content"
# 可观测性配置文件路径deploy/config/obs.example.toml
RUSTFS_OBS_CONFIG=/etc/default/obs.toml
# TLS 证书目录路径deploy/certs
RUSTFS_TLS_PATH=/etc/default/tls
RUSTFS_TLS_PATH=/etc/default/tls
# 事件通知配置文件路径deploy/config/event.example.toml
RUSTFS_EVENT_CONFIG=/etc/default/event.toml

View File

@@ -23,4 +23,6 @@ RUSTFS_LICENSE="license content"
# Observability configuration file path: deploy/config/obs.example.toml
RUSTFS_OBS_CONFIG=/etc/default/obs.toml
# TLS certificates directory path: deploy/certs
RUSTFS_TLS_PATH=/etc/default/tls
RUSTFS_TLS_PATH=/etc/default/tls
# event notification configuration file path: deploy/config/event.example.toml
RUSTFS_EVENT_CONFIG=/etc/default/event.toml

View File

@@ -83,7 +83,7 @@ services:
dockerfile: Dockerfile.obs
container_name: node2
environment:
- RUSTFS_VOLUMES=/root/data/target/volume/test{1...4}
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=:9002

View File

@@ -204,7 +204,7 @@ async fn apply_dynamic_config_for_sub_sys<S: StorageAPI>(cfg: &mut Config, api:
}
}
Err(err) => {
error!("init storageclass err:{:?}", &err);
error!("init storage class err:{:?}", &err);
break;
}
}

View File

@@ -141,7 +141,7 @@ impl Config {
}
pub fn merge(&self) -> Config {
// TODO: merge defauls
// TODO: merge default
self.clone()
}
}
@@ -158,6 +158,6 @@ pub fn register_default_kvs(kvs: HashMap<String, KVS>) {
pub fn init() {
let mut kvs = HashMap::new();
kvs.insert(STORAGE_CLASS_SUB_SYS.to_owned(), storageclass::DefaultKVS.clone());
// TODO: other defauls
// TODO: other default
register_default_kvs(kvs)
}

View File

@@ -8,8 +8,8 @@ use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use tracing::warn;
// default_partiy_count 默认配置,根据磁盘总数分配校验磁盘数量
pub fn default_partiy_count(drive: usize) -> usize {
// default_parity_count 默认配置,根据磁盘总数分配校验磁盘数量
pub fn default_parity_count(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
@@ -158,7 +158,7 @@ pub fn lookup_config(kvs: &KVS, set_drive_count: usize) -> Result<Config> {
parse_storage_class(&ssc_str)?
} else {
StorageClass {
parity: default_partiy_count(set_drive_count),
parity: default_parity_count(set_drive_count),
}
}
};

View File

@@ -48,6 +48,7 @@ use tokio::{select, spawn};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info, warn};
pub mod event;
pub mod group;
pub mod policys;
pub mod pools;

View File

@@ -0,0 +1 @@

View File

@@ -25,7 +25,7 @@ pub fn make_admin_route() -> Result<impl S3Route> {
r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?;
regist_rpc_route(&mut r)?;
regist_user_route(&mut r)?;
register_user_route(&mut r)?;
r.insert(
Method::POST,
@@ -124,7 +124,7 @@ pub fn make_admin_route() -> Result<impl S3Route> {
Ok(r)
}
fn regist_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
fn register_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
// 1
r.insert(
Method::GET,

View File

@@ -39,24 +39,35 @@ export RUSTFS_CONSOLE_ADDRESS=":9002"
export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"
# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值
export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
export RUSTFS__OBSERVABILITY__USE_STDOUT=false
export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
export RUSTFS__OBSERVABILITY__METER_INTERVAL=30
export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug
export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true
export RUSTFS__SINKS__FILE__ENABLED=true
export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log"
export RUSTFS__SINKS__WEBHOOK__ENABLED=false
export RUSTFS__SINKS__WEBHOOK__ENDPOINT=""
export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN=""
export RUSTFS__SINKS__KAFKA__ENABLED=false
export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS=""
export RUSTFS__SINKS__KAFKA__TOPIC=""
export RUSTFS__LOGGER__QUEUE_CAPACITY=10
#export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
#export RUSTFS__OBSERVABILITY__USE_STDOUT=false
#export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
#export RUSTFS__OBSERVABILITY__METER_INTERVAL=31
#export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
#export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
#export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
#export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug
#export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true
#
#export RUSTFS__SINKS_0__type=File
#export RUSTFS__SINKS_0__path=./deploy/logs/rustfs.log
#export RUSTFS__SINKS_0__buffer_size=12
#export RUSTFS__SINKS_0__flush_interval_ms=1000
#export RUSTFS__SINKS_0__flush_threshold=100
#
#export RUSTFS__SINKS_1__type=Kakfa
#export RUSTFS__SINKS_1__brokers=localhost:9092
#export RUSTFS__SINKS_1__topic=logs
#export RUSTFS__SINKS_1__batch_size=100
#export RUSTFS__SINKS_1__batch_timeout_ms=1000
#
#export RUSTFS__SINKS_2__type=Webhook
#export RUSTFS__SINKS_2__endpoint=http://localhost:8080/webhook
#export RUSTFS__SINKS_2__auth_token=you-auth-token
#export RUSTFS__SINKS_2__batch_size=100
#export RUSTFS__SINKS_2__batch_timeout_ms=1000
#
#export RUSTFS__LOGGER__QUEUE_CAPACITY=10
export OTEL_INSTRUMENTATION_NAME="rustfs"
export OTEL_INSTRUMENTATION_VERSION="0.1.1"
@@ -64,13 +75,13 @@ export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0"
export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production"
# 事件消息配置
export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"
if [ -n "$1" ]; then
export RUSTFS_VOLUMES="$1"
fi
# 启动 webhook 服务器
cargo run --example webhook -p rustfs-event-notifier &
#cargo run --example webhook -p rustfs-event-notifier &
# 启动主服务
cargo run --bin rustfs