improve code for FileSink

This commit is contained in:
houseme
2025-03-30 21:28:29 +08:00
parent d516eec200
commit c87e50b002
4 changed files with 22 additions and 31 deletions

3
.gitignore vendored
View File

@@ -9,4 +9,5 @@
rustfs/static/*
vendor
cli/rustfs-gui/embedded-rustfs/rustfs
config/obs.toml
config/obs.toml
*.log

View File

@@ -8,7 +8,7 @@ pub(crate) const SERVICE_NAME: &str = "RustFS";
pub(crate) const SAMPLE_RATIO: f64 = 1.0;
pub(crate) const METER_INTERVAL: u64 = 60;
pub(crate) const SERVICE_VERSION: &str = "0.1.0";
pub(crate) const ENVIRONMENT: &str = "development";
pub(crate) const ENVIRONMENT: &str = "production";
pub(crate) const LOGGER_LEVEL: &str = "info";
/// Global guard for OpenTelemetry tracing

View File

@@ -72,7 +72,7 @@ pub use worker::start_worker;
/// ```
pub async fn init_obs(config: AppConfig) -> (Arc<Mutex<Logger>>, telemetry::OtelGuard) {
let guard = init_telemetry(&config.observability);
let sinks = sink::create_sinks(&config);
let sinks = sink::create_sinks(&config).await;
let logger = init_global_logger(&config, sinks).await;
(logger, guard)
}

View File

@@ -262,7 +262,7 @@ pub struct FileSink {
#[cfg(feature = "file")]
impl FileSink {
#[allow(dead_code)]
/// Create a new FileSink instance
pub async fn new(
path: String,
buffer_size: usize,
@@ -271,6 +271,11 @@ impl FileSink {
) -> Result<Self, io::Error> {
// check if the file exists
let file_exists = tokio::fs::metadata(&path).await.is_ok();
// if the file not exists, create it
if !file_exists {
tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?;
debug!("the file not exists,create if. path: {:?}", path)
}
let file = if file_exists {
// If the file exists, open it in append mode
debug!("FileSink: File exists, opening in append mode.");
@@ -281,7 +286,6 @@ impl FileSink {
// Create the file and write a header or initial content if needed
OpenOptions::new().create(true).write(true).open(&path).await?
};
// let file = OpenOptions::new().append(true).create(true).open(&path).await?;
let writer = io::BufWriter::with_capacity(buffer_size, file);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
@@ -406,7 +410,7 @@ impl Drop for FileSink {
}
/// Create a list of Sink instances
pub fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
pub async fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
let mut sinks: Vec<Arc<dyn Sink>> = Vec::new();
#[cfg(feature = "kafka")]
@@ -445,31 +449,17 @@ pub fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
} else {
"default.log".to_string()
};
// Use synchronous file operations
let file_result = std::fs::OpenOptions::new().append(true).create(true).open(&path);
match file_result {
Ok(file) => {
let buffer_size = config.sinks.file.buffer_size.unwrap_or(8192);
let writer = io::BufWriter::with_capacity(buffer_size, tokio::fs::File::from_std(file));
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
sinks.push(Arc::new(FileSink {
path: path.clone(),
buffer_size,
writer: Arc::new(tokio::sync::Mutex::new(writer)),
entry_count: std::sync::atomic::AtomicUsize::new(0),
last_flush: std::sync::atomic::AtomicU64::new(now),
flush_interval_ms: config.sinks.file.flush_interval_ms.unwrap_or(1000),
flush_threshold: config.sinks.file.flush_threshold.unwrap_or(100),
}));
}
Err(e) => eprintln!("Failed to create file sink: {}", e),
}
debug!("FileSink: Using path: {}", path);
sinks.push(Arc::new(
FileSink::new(
path.clone(),
config.sinks.file.buffer_size.unwrap_or(8192),
config.sinks.file.flush_interval_ms.unwrap_or(1000),
config.sinks.file.flush_threshold.unwrap_or(100),
)
.await
.unwrap(),
));
}
sinks