mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-16 17:20:33 +00:00
* improve code for notify * improve code for logger and fix typo (#272) * Add GNU to build.yml (#275) * fix unzip error * fix url change error fix url change error * Simplify user experience and integrate console and endpoint Simplify user experience and integrate console and endpoint * Add gnu to build.yml * upgrade version * feat: add `cargo clippy --fix --allow-dirty` to pre-commit command (#282) Resolves #277 - Add --fix flag to automatically fix clippy warnings - Add --allow-dirty flag to run on dirty Git trees - Improves code quality in pre-commit workflow * fix: the issue where preview fails when the path length exceeds 255 characters (#280) * fix * fix: improve Windows build support and CI/CD workflow (#283) - Fix Windows zip command issue by using PowerShell Compress-Archive - Add Windows support for OSS upload with ossutil - Replace Chinese comments with English in build.yml - Fix bash syntax error in package_zip function - Improve code formatting and consistency - Update various configuration files for better cross-platform support Resolves Windows build failures in GitHub Actions. * fix: update link in README.md leading to a 404 error (#285) * add rustfs.spec for rustfs (#103) add support on loongarch64 * improve cargo.lock * build(deps): bump the dependencies group with 5 updates (#289) Bumps the dependencies group with 5 updates: | Package | From | To | | --- | --- | --- | | [hyper-util](https://github.com/hyperium/hyper-util) | `0.1.15` | `0.1.16` | | [rand](https://github.com/rust-random/rand) | `0.9.1` | `0.9.2` | | [serde_json](https://github.com/serde-rs/json) | `1.0.140` | `1.0.141` | | [strum](https://github.com/Peternator7/strum) | `0.27.1` | `0.27.2` | | [sysinfo](https://github.com/GuillaumeGomez/sysinfo) | `0.36.0` | `0.36.1` | Updates `hyper-util` from 0.1.15 to 0.1.16 - [Release notes](https://github.com/hyperium/hyper-util/releases) - [Changelog](https://github.com/hyperium/hyper-util/blob/master/CHANGELOG.md) - [Commits](https://github.com/hyperium/hyper-util/compare/v0.1.15...v0.1.16) Updates `rand` from 0.9.1 to 0.9.2 - [Release notes](https://github.com/rust-random/rand/releases) - [Changelog](https://github.com/rust-random/rand/blob/master/CHANGELOG.md) - [Commits](https://github.com/rust-random/rand/compare/rand_core-0.9.1...rand_core-0.9.2) Updates `serde_json` from 1.0.140 to 1.0.141 - [Release notes](https://github.com/serde-rs/json/releases) - [Commits](https://github.com/serde-rs/json/compare/v1.0.140...v1.0.141) Updates `strum` from 0.27.1 to 0.27.2 - [Release notes](https://github.com/Peternator7/strum/releases) - [Changelog](https://github.com/Peternator7/strum/blob/master/CHANGELOG.md) - [Commits](https://github.com/Peternator7/strum/compare/v0.27.1...v0.27.2) Updates `sysinfo` from 0.36.0 to 0.36.1 - [Changelog](https://github.com/GuillaumeGomez/sysinfo/blob/master/CHANGELOG.md) - [Commits](https://github.com/GuillaumeGomez/sysinfo/compare/v0.36.0...v0.36.1) --- updated-dependencies: - dependency-name: hyper-util dependency-version: 0.1.16 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: dependencies - dependency-name: rand dependency-version: 0.9.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: dependencies - dependency-name: serde_json dependency-version: 1.0.141 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: dependencies - dependency-name: strum dependency-version: 0.27.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: dependencies - dependency-name: sysinfo dependency-version: 0.36.1 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: dependencies ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * improve code for logger * improve * upgrade * refactor: 优化构建工作流,统一 latest 文件处理和简化制品上传 (#293) * Refactor: DatabaseManagerSystem as global Signed-off-by: junxiang Mu <1948535941@qq.com> * fix: fmt Signed-off-by: junxiang Mu <1948535941@qq.com> * Test: add e2e_test for s3select Signed-off-by: junxiang Mu <1948535941@qq.com> * Test: add test script for e2e Signed-off-by: junxiang Mu <1948535941@qq.com> * improve code for registry and intergation * improve code for registry `create_targets_from_config` * fix * Feature up/ilm (#305) * fix * fix * fix * fix delete-marker expiration. add api_restore. * fix * time retry object upload * lock file * make fmt * fix * restore object * fix * fix * serde-rs-xml -> quick-xml * fix * checksum * fix * fix * fix * fix * fix * fix * fix * transfer lang to english * upgrade clap version from 4.5.41 to 4.5.42 * refactor: replace `lazy_static` with `LazyLock` * add router * fix: modify comment * improve code * fix typos * fix * fix: modify name and fmt * improve code for registry * fix test --------- Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: junxiang Mu <1948535941@qq.com> Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com> Co-authored-by: 安正超 <anzhengchao@gmail.com> Co-authored-by: shiro.lee <69624924+shiroleeee@users.noreply.github.com> Co-authored-by: Marco Orlandin <mipnamic@mipnamic.net> Co-authored-by: zhangwenlong <zhangwenlong@loongson.cn> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: junxiang Mu <1948535941@qq.com> Co-authored-by: likewu <likewu@126.com>
179 lines
6.1 KiB
Rust
179 lines
6.1 KiB
Rust
// Copyright 2024 RustFS Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use crate::sinks::Sink;
|
|
use crate::{LogRecord, UnifiedLogEntry};
|
|
use async_trait::async_trait;
|
|
use std::sync::Arc;
|
|
use tokio::fs::OpenOptions;
|
|
use tokio::io;
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
/// File Sink Implementation
|
|
pub struct FileSink {
|
|
path: String,
|
|
buffer_size: usize,
|
|
writer: Arc<tokio::sync::Mutex<io::BufWriter<tokio::fs::File>>>,
|
|
entry_count: std::sync::atomic::AtomicUsize,
|
|
last_flush: std::sync::atomic::AtomicU64,
|
|
flush_interval_ms: u64, // Time between flushes
|
|
flush_threshold: usize, // Number of entries before flush
|
|
}
|
|
|
|
impl FileSink {
|
|
/// Create a new FileSink instance
|
|
pub async fn new(
|
|
path: String,
|
|
buffer_size: usize,
|
|
flush_interval_ms: u64,
|
|
flush_threshold: usize,
|
|
) -> Result<Self, io::Error> {
|
|
// check if the file exists
|
|
let file_exists = tokio::fs::metadata(&path).await.is_ok();
|
|
// if the file not exists, create it
|
|
if !file_exists {
|
|
tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?;
|
|
tracing::debug!("File does not exist, creating it. Path: {:?}", path)
|
|
}
|
|
let file = if file_exists {
|
|
// If the file exists, open it in append mode
|
|
tracing::debug!("FileSink: File exists, opening in append mode. Path: {:?}", path);
|
|
OpenOptions::new().append(true).create(true).open(&path).await?
|
|
} else {
|
|
// If the file does not exist, create it
|
|
tracing::debug!("FileSink: File does not exist, creating a new file.");
|
|
// Create the file and write a header or initial content if needed
|
|
OpenOptions::new().create(true).truncate(true).write(true).open(&path).await?
|
|
};
|
|
let writer = io::BufWriter::with_capacity(buffer_size, file);
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
Ok(FileSink {
|
|
path,
|
|
buffer_size,
|
|
writer: Arc::new(tokio::sync::Mutex::new(writer)),
|
|
entry_count: std::sync::atomic::AtomicUsize::new(0),
|
|
last_flush: std::sync::atomic::AtomicU64::new(now),
|
|
flush_interval_ms,
|
|
flush_threshold,
|
|
})
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
async fn initialize_writer(&mut self) -> io::Result<()> {
|
|
let file = tokio::fs::File::create(&self.path).await?;
|
|
|
|
// Use buffer_size to create a buffer writer with a specified capacity
|
|
let buf_writer = io::BufWriter::with_capacity(self.buffer_size, file);
|
|
|
|
// Replace the original writer with the new Mutex
|
|
self.writer = Arc::new(tokio::sync::Mutex::new(buf_writer));
|
|
Ok(())
|
|
}
|
|
|
|
// Get the current buffer size
|
|
#[allow(dead_code)]
|
|
pub fn buffer_size(&self) -> usize {
|
|
self.buffer_size
|
|
}
|
|
|
|
// How to dynamically adjust the buffer size
|
|
#[allow(dead_code)]
|
|
pub async fn set_buffer_size(&mut self, new_size: usize) -> io::Result<()> {
|
|
if self.buffer_size != new_size {
|
|
self.buffer_size = new_size;
|
|
// Reinitialize the writer directly, without checking is_some()
|
|
self.initialize_writer().await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
// Check if flushing is needed based on count or time
|
|
fn should_flush(&self) -> bool {
|
|
// Check entry count threshold
|
|
if self.entry_count.load(std::sync::atomic::Ordering::Relaxed) >= self.flush_threshold {
|
|
return true;
|
|
}
|
|
|
|
// Check time threshold
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
|
|
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
|
|
now - last >= self.flush_interval_ms
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Sink for FileSink {
|
|
async fn write(&self, entry: &UnifiedLogEntry) {
|
|
let line = format!("{entry:?}\n");
|
|
let mut writer = self.writer.lock().await;
|
|
|
|
if let Err(e) = writer.write_all(line.as_bytes()).await {
|
|
eprintln!(
|
|
"Failed to write log to file {}: {},entry timestamp:{:?}",
|
|
self.path,
|
|
e,
|
|
entry.get_timestamp()
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Only flush periodically to improve performance
|
|
// Logic to determine when to flush could be added here
|
|
// Increment the entry count
|
|
self.entry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
// Check if we should flush
|
|
if self.should_flush() {
|
|
if let Err(e) = writer.flush().await {
|
|
eprintln!("Failed to flush log file {}: {}", self.path, e);
|
|
return;
|
|
}
|
|
|
|
// Reset counters
|
|
self.entry_count.store(0, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as u64;
|
|
|
|
self.last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for FileSink {
|
|
fn drop(&mut self) {
|
|
let writer = self.writer.clone();
|
|
let path = self.path.clone();
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
|
rt.block_on(async {
|
|
let mut writer = writer.lock().await;
|
|
if let Err(e) = writer.flush().await {
|
|
eprintln!("Failed to flush log file {path}: {e}");
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|