Merge branch 'main' into dada/fix-entry

This commit is contained in:
weisd
2025-05-09 14:33:30 +08:00
committed by GitHub
76 changed files with 1384 additions and 550 deletions

View File

@@ -1,7 +0,0 @@
[target.x86_64-unknown-linux-gnu]
rustflags = [
"-C", "link-arg=-fuse-ld=bfd"
]
[target.x86_64-unknown-linux-musl]
linker = "x86_64-linux-musl-gcc"

View File

@@ -9,7 +9,7 @@ Jaeger、Prometheus 等)而需要运行和维护多个代理/收集器的必
2. 执行以下命令启动服务:
```bash
docker compose up -d -f docker-compose.yml
docker compose -f docker-compose.yml up -d
```
### 访问监控面板
@@ -34,7 +34,7 @@ docker compose up -d -f docker-compose.yml
| service_name | 服务名称 | rustfs |
| service_version | 服务版本 | 1.0.0 |
| environment | 运行环境 | production |
| meter_interval | 指标导出间隔 (秒) | 30 |
| meter_interval | 指标导出间隔 (秒) | 30 |
| sample_ratio | 采样率 | 1.0 |
| use_stdout | 是否输出到控制台 | true/false |
| logger_level | 日志级别 | info |

View File

@@ -0,0 +1,34 @@
[observability]
endpoint = "http://otel-collector:4317" # Default is "http://localhost:4317" if not specified
use_stdout = false # Output with stdout, true output, false no output
sample_ratio = 2.0
meter_interval = 30
service_name = "rustfs"
service_version = "0.1.0"
environments = "production"
logger_level = "debug"
local_logging_enabled = true
[sinks]
[sinks.kafka] # Kafka sink is disabled by default
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
[sinks.webhook]
enabled = false
endpoint = "http://localhost:8080/webhook"
auth_token = ""
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "/root/data/logs/app.log"
batch_size = 10
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[logger]
queue_capacity = 10

View File

@@ -1,5 +1,5 @@
[observability]
endpoint = "http://otel-collector:4317" # Default is "http://localhost:4317" if not specified
endpoint = "http://localhost:4317" # Default is "http://localhost:4317" if not specified
use_stdout = false # Output with stdout, true output, false no output
sample_ratio = 2.0
meter_interval = 30

View File

@@ -1,6 +1,6 @@
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.120.0
image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.124.0
environment:
- TZ=Asia/Shanghai
volumes:
@@ -16,7 +16,7 @@ services:
networks:
- otel-network
jaeger:
image: jaegertracing/jaeger:2.4.0
image: jaegertracing/jaeger:2.5.0
environment:
- TZ=Asia/Shanghai
ports:
@@ -26,7 +26,7 @@ services:
networks:
- otel-network
prometheus:
image: prom/prometheus:v3.2.1
image: prom/prometheus:v3.3.0
environment:
- TZ=Asia/Shanghai
volumes:
@@ -36,7 +36,7 @@ services:
networks:
- otel-network
loki:
image: grafana/loki:3.4.2
image: grafana/loki:3.5.0
environment:
- TZ=Asia/Shanghai
volumes:
@@ -47,7 +47,7 @@ services:
networks:
- otel-network
grafana:
image: grafana/grafana:11.6.0
image: grafana/grafana:11.6.1
ports:
- "3000:3000" # Web UI
environment:

125
Cargo.lock generated
View File

@@ -185,12 +185,14 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"common",
"datafusion",
"ecstore",
"futures",
"futures-core",
"http",
"object_store",
"pin-project-lite",
"s3s",
"snafu",
"tokio",
@@ -711,9 +713,9 @@ dependencies = [
[[package]]
name = "axum"
version = "0.8.3"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288"
checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5"
dependencies = [
"axum-core",
"bytes",
@@ -799,7 +801,7 @@ dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"rustls 0.23.26",
"rustls 0.23.27",
"rustls-pemfile",
"rustls-pki-types",
"tokio",
@@ -1204,9 +1206,9 @@ dependencies = [
[[package]]
name = "chrono"
version = "0.4.40"
version = "0.4.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c"
checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -1430,7 +1432,7 @@ dependencies = [
"lazy_static",
"scopeguard",
"tokio",
"tonic 0.13.0",
"tonic 0.13.1",
"tracing-error",
]
@@ -3061,7 +3063,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tonic 0.13.0",
"tonic 0.13.1",
"tower 0.5.2",
"url",
]
@@ -3091,7 +3093,7 @@ dependencies = [
"madmin",
"md-5",
"netif",
"nix",
"nix 0.30.1",
"num",
"num_cpus",
"path-absolutize",
@@ -3105,6 +3107,7 @@ dependencies = [
"reqwest",
"rmp",
"rmp-serde",
"rustfs-config",
"s3s",
"s3s-policy",
"serde",
@@ -3119,7 +3122,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.13.0",
"tonic 0.13.1",
"tower 0.5.2",
"tracing",
"tracing-error",
@@ -4129,7 +4132,7 @@ dependencies = [
"http",
"hyper",
"hyper-util",
"rustls 0.23.26",
"rustls 0.23.27",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.2",
@@ -4780,14 +4783,14 @@ dependencies = [
[[package]]
name = "libsystemd"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b85fe9dc49de659d05829fdf72b5770c0a5952d1055c34a39f6d4e932bce175d"
checksum = "19c97a761fc86953c5b885422b22c891dbf5bcb9dcc99d0110d6ce4c052759f0"
dependencies = [
"hmac 0.12.1",
"libc",
"log",
"nix",
"nix 0.29.0",
"nom 8.0.0",
"once_cell",
"serde",
@@ -4847,13 +4850,13 @@ checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856"
[[package]]
name = "local-ip-address"
version = "0.6.3"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782"
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
dependencies = [
"libc",
"neli",
"thiserror 1.0.69",
"thiserror 2.0.12",
"windows-sys 0.59.0",
]
@@ -4870,7 +4873,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tonic 0.13.0",
"tonic 0.13.1",
"tracing",
"tracing-error",
"url",
@@ -5225,6 +5228,18 @@ dependencies = [
"memoffset",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.9.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nodrop"
version = "0.1.14"
@@ -6587,7 +6602,7 @@ dependencies = [
"prost-build",
"protobuf",
"tokio",
"tonic 0.13.0",
"tonic 0.13.1",
"tonic-build",
"tower 0.5.2",
]
@@ -6641,7 +6656,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash 2.1.1",
"rustls 0.23.26",
"rustls 0.23.27",
"socket2",
"thiserror 2.0.12",
"tokio",
@@ -6660,7 +6675,7 @@ dependencies = [
"rand 0.9.1",
"ring",
"rustc-hash 2.1.1",
"rustls 0.23.26",
"rustls 0.23.27",
"rustls-pki-types",
"slab",
"thiserror 2.0.12",
@@ -6986,7 +7001,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls 0.23.26",
"rustls 0.23.27",
"rustls-pemfile",
"rustls-pki-types",
"serde",
@@ -7143,9 +7158,9 @@ dependencies = [
[[package]]
name = "rust-embed"
version = "8.7.0"
version = "8.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5fbc0ee50fcb99af7cebb442e5df7b5b45e9460ffa3f8f549cd26b862bec49d"
checksum = "60e425e204264b144d4c929d126d0de524b40a961686414bab5040f7465c71be"
dependencies = [
"rust-embed-impl",
"rust-embed-utils",
@@ -7244,7 +7259,6 @@ dependencies = [
"iam",
"lazy_static",
"libsystemd",
"local-ip-address",
"lock",
"madmin",
"matchit",
@@ -7259,23 +7273,24 @@ dependencies = [
"query",
"rmp-serde",
"rust-embed",
"rustfs-config",
"rustfs-event-notifier",
"rustfs-obs",
"rustls 0.23.26",
"rustls-pemfile",
"rustls-pki-types",
"rustfs-utils",
"rustls 0.23.27",
"s3s",
"serde",
"serde_json",
"serde_urlencoded",
"shadow-rs",
"socket2",
"tikv-jemallocator",
"time",
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tokio-util",
"tonic 0.13.0",
"tonic 0.13.1",
"tonic-build",
"tower 0.5.2",
"tower-http",
@@ -7284,6 +7299,16 @@ dependencies = [
"uuid",
]
[[package]]
name = "rustfs-config"
version = "0.0.1"
dependencies = [
"config",
"const-str",
"serde",
"serde_json",
]
[[package]]
name = "rustfs-event-notifier"
version = "0.0.1"
@@ -7358,6 +7383,18 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "rustfs-utils"
version = "0.0.1"
dependencies = [
"local-ip-address",
"rustfs-config",
"rustls 0.23.27",
"rustls-pemfile",
"rustls-pki-types",
"tracing",
]
[[package]]
name = "rustix"
version = "0.38.44"
@@ -7400,16 +7437,16 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.26"
version = "0.23.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0"
checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321"
dependencies = [
"aws-lc-rs",
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.103.1",
"rustls-webpki 0.103.2",
"subtle",
"zeroize",
]
@@ -7458,9 +7495,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.1"
version = "0.103.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03"
checksum = "7149975849f1abb3832b246010ef62ccc80d3a76169517ada7188252b9cfb437"
dependencies = [
"aws-lc-rs",
"ring",
@@ -8596,9 +8633,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.44.2"
version = "1.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48"
checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165"
dependencies = [
"backtrace",
"bytes",
@@ -8641,7 +8678,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls 0.23.26",
"rustls 0.23.27",
"tokio",
]
@@ -8658,9 +8695,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.14"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
dependencies = [
"bytes",
"futures-core",
@@ -8755,9 +8792,9 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.13.0"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85839f0b32fd242bb3209262371d07feda6d780d16ee9d2bc88581b89da1549b"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"axum",
@@ -8785,9 +8822,9 @@ dependencies = [
[[package]]
name = "tonic-build"
version = "0.13.0"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d85f0383fadd15609306383a90e85eaed44169f931a5d2be1b42c76ceff1825e"
checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847"
dependencies = [
"prettyplease",
"proc-macro2",
@@ -10252,7 +10289,7 @@ dependencies = [
"futures-sink",
"futures-util",
"hex",
"nix",
"nix 0.29.0",
"ordered-stream",
"rand 0.8.5",
"serde",
@@ -10283,7 +10320,7 @@ dependencies = [
"futures-core",
"futures-lite",
"hex",
"nix",
"nix 0.29.0",
"ordered-stream",
"serde",
"serde_repr",

View File

@@ -1,21 +1,23 @@
[workspace]
members = [
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"appauth", # Application authentication and authorization
"cli/rustfs-gui", # Graphical user interface client
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"crates/config", # Configuration management
"crates/event-notifier", # Event notification system
"crates/obs", # Observability utilities
"crates/utils", # Utility functions and helpers
"crypto", # Cryptography and security features
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"iam", # Identity and Access Management
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"appauth", # Application authentication and authorization
]
resolver = "2"
@@ -45,22 +47,26 @@ policy = { path = "./policy", version = "0.0.1" }
protos = { path = "./common/protos", version = "0.0.1" }
query = { path = "./s3select/query", version = "0.0.1" }
rustfs = { path = "./rustfs", version = "0.0.1" }
rustfs-config = { path = "./crates/config", version = "0.0.1" }
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" }
rustfs-utils = { path = "crates/utils", version = "0.0.1" }
workers = { path = "./common/workers", version = "0.0.1" }
atoi = "2.0.0"
async-recursion = "1.1.1"
async-trait = "0.1.88"
atomic_enum = "0.3.0"
axum = "0.8.3"
axum = "0.8.4"
axum-extra = "0.10.1"
axum-server = { version = "0.7.2", features = ["tls-rustls"] }
backon = "1.5.0"
blake2 = "0.10.6"
bytes = "1.10.1"
bytesize = "2.0.1"
chrono = { version = "0.4.40", features = ["serde"] }
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.37", features = ["derive", "env"] }
config = "0.15.11"
const-str = { version = "0.6.2", features = ["std", "proc"] }
datafusion = "46.0.1"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
@@ -69,7 +75,9 @@ flatbuffers = "25.2.10"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
glob = "0.3.2"
hex = "0.4.3"
highway = { version = "1.3.0" }
hyper = "1.6.0"
hyper-util = { version = "0.1.11", features = [
"tokio",
@@ -86,13 +94,15 @@ keyring = { version = "3.6.2", features = [
"sync-secret-service",
] }
lazy_static = "1.5.0"
libsystemd = { version = "0.7.1" }
local-ip-address = "0.6.3"
libsystemd = { version = "0.7.2" }
local-ip-address = "0.6.5"
matchit = "0.8.4"
md-5 = "0.10.6"
mime = "0.3.17"
mime_guess = "2.0.5"
netif = "0.1.6"
nix = { version = "0.30.1", features = ["fs"] }
num_cpus = { version = "1.16.0" }
nvml-wrapper = "0.10.0"
object_store = "0.11.2"
opentelemetry = { version = "0.29.1" }
@@ -113,6 +123,8 @@ prost-types = "0.13.5"
protobuf = "3.7"
rand = "0.8.5"
rdkafka = { version = "0.37.0", features = ["tokio"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
regex = { version = "1.11.1" }
reqwest = { version = "0.12.15", default-features = false, features = [
"rustls-tls",
"charset",
@@ -129,8 +141,8 @@ rfd = { version = "0.15.3", default-features = false, features = [
rmp = "0.8.14"
rmp-serde = "1.3.0"
rumqttc = { version = "0.24" }
rust-embed = "8.7.0"
rustls = { version = "0.23.26" }
rust-embed = { version = "8.7.1" }
rustls = { version = "0.23.27" }
rustls-pki-types = "1.11.0"
rustls-pemfile = "2.2.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
@@ -143,6 +155,7 @@ serde_with = "3.12.0"
sha2 = "0.10.8"
smallvec = { version = "1.15.0", features = ["serde"] }
snafu = "0.8.5"
socket2 = "0.5.9"
strum = { version = "0.27.1", features = ["derive"] }
sysinfo = "0.34.2"
tempfile = "3.19.1"
@@ -155,12 +168,12 @@ time = { version = "0.3.41", features = [
"macros",
"serde",
] }
tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.13.0", features = ["gzip"] }
tonic-build = "0.13.0"
tokio = { version = "1.45.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.13.1", features = ["gzip"] }
tonic-build = { version = "0.13.1" }
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.14", features = ["io", "compat"] }
tokio-stream = { version = "0.1.17" }
tokio-util = { version = "0.7.15", features = ["io", "compat"] }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.2", features = ["cors"] }
tracing = "0.1.41"
@@ -176,6 +189,7 @@ uuid = { version = "1.16.0", features = [
"fast-rng",
"macro-diagnostics",
] }
winapi = { version = "0.3.9" }
[profile.wasm-dev]

View File

@@ -102,7 +102,7 @@ export RUSTFS__LOGGER__QUEUE_CAPACITY=10
2. Start the observability stack:
```bash
docker compose up -d -f docker-compose.yml
docker compose -f docker-compose.yml up -d
```
#### Access Monitoring Dashboards

View File

@@ -102,7 +102,7 @@ export RUSTFS__LOGGER__QUEUE_CAPACITY=10
2. 启动可观测性系统:
```bash
docker compose up -d -f docker-compose.yml
docker compose -f docker-compose.yml up -d
```
#### 访问监控面板

View File

@@ -2,6 +2,9 @@ pub mod error;
pub mod globals;
pub mod last_minute;
// is ','
pub static DEFAULT_DELIMITER: u8 = 44;
/// Defers evaluation of a block of code until the end of the scope.
#[macro_export]
macro_rules! defer {

View File

@@ -3,13 +3,13 @@ use tokio::sync::{Mutex, Notify};
use tracing::info;
pub struct Workers {
available: Mutex<usize>, // 可用的工作槽
notify: Notify, // 用于通知等待的任务
limit: usize, // 最大并发工作数
available: Mutex<usize>, // Available working slots
notify: Notify, // Used to notify waiting tasks
limit: usize, // Maximum number of concurrent jobs
}
impl Workers {
// 创建 Workers 对象,允许最多 n 个作业并发执行
// Create a Workers object that allows up to n jobs to execute concurrently
pub fn new(n: usize) -> Result<Arc<Workers>, &'static str> {
if n == 0 {
return Err("n must be > 0");
@@ -22,7 +22,7 @@ impl Workers {
}))
}
// 让一个作业获得执行的机会
// Give a job a chance to be executed
pub async fn take(&self) {
loop {
let mut available = self.available.lock().await;
@@ -37,15 +37,15 @@ impl Workers {
}
}
// 让一个作业释放其机会
// Release a job's slot
pub async fn give(&self) {
let mut available = self.available.lock().await;
info!("worker give, {}", *available);
*available += 1; // 增加可用槽
self.notify.notify_one(); // 通知一个等待的任务
*available += 1; // Increase available slots
self.notify.notify_one(); // Notify a waiting task
}
// 等待所有并发作业完成
// Wait for all concurrent jobs to complete
pub async fn wait(&self) {
loop {
{
@@ -54,7 +54,7 @@ impl Workers {
break;
}
}
// 等待直到所有槽都被释放
// Wait until all slots are freed
self.notify.notified().await;
}
info!("worker wait end");

17
crates/config/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "rustfs-config"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
config = { workspace = true }
const-str = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,23 @@
use crate::event::config::EventConfig;
use crate::ObservabilityConfig;
/// RustFs configuration
pub struct RustFsConfig {
pub observability: ObservabilityConfig,
pub event: EventConfig,
}
impl RustFsConfig {
pub fn new() -> Self {
Self {
observability: ObservabilityConfig::new(),
event: EventConfig::new(),
}
}
}
impl Default for RustFsConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,72 @@
use const_str::concat;
/// Application name
/// Default value: RustFs
/// Environment variable: RUSTFS_APP_NAME
pub const APP_NAME: &str = "RustFs";
/// Application version
/// Default value: 1.0.0
/// Environment variable: RUSTFS_VERSION
pub const VERSION: &str = "0.0.1";
/// Default configuration logger level
/// Default value: info
/// Environment variable: RUSTFS_LOG_LEVEL
pub const DEFAULT_LOG_LEVEL: &str = "info";
/// maximum number of connections
/// This is the maximum number of connections that the server will accept.
/// This is used to limit the number of connections to the server.
pub const MAX_CONNECTIONS: usize = 100;
/// timeout for connections
/// This is the timeout for connections to the server.
/// This is used to limit the time that a connection can be open.
pub const DEFAULT_TIMEOUT_MS: u64 = 3000;
/// Default Access Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_ACCESS_KEY
/// Command line argument: --access-key
/// Example: RUSTFS_ACCESS_KEY=rustfsadmin
/// Example: --access-key rustfsadmin
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Default Secret Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_SECRET_KEY
/// Command line argument: --secret-key
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Default configuration file for observability
/// Default value: config/obs.toml
/// Environment variable: RUSTFS_OBS_CONFIG
/// Command line argument: --obs-config
/// Example: RUSTFS_OBS_CONFIG=config/obs.toml
/// Example: --obs-config config/obs.toml
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.
pub const RUSTFS_TLS_KEY: &str = "rustfs_key.pem";
/// Default TLS cert for rustfs
/// This is the default cert for TLS.
pub const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem";
/// Default port for rustfs
/// This is the default port for rustfs.
/// This is used to bind the server to a specific port.
pub const DEFAULT_PORT: u16 = 9000;
/// Default address for rustfs
/// This is the default address for rustfs.
pub const DEFAULT_ADDRESS: &str = concat!(":", DEFAULT_PORT);
/// Default port for rustfs console
/// This is the default port for rustfs console.
pub const DEFAULT_CONSOLE_PORT: u16 = 9002;
/// Default address for rustfs console
/// This is the default address for rustfs console.
pub const DEFAULT_CONSOLE_ADDRESS: &str = concat!(":", DEFAULT_CONSOLE_PORT);

View File

@@ -0,0 +1 @@
pub(crate) mod app;

View File

@@ -0,0 +1,23 @@
/// Event configuration module
pub struct EventConfig {
pub event_type: String,
pub event_source: String,
pub event_destination: String,
}
impl EventConfig {
/// Creates a new instance of `EventConfig` with default values.
pub fn new() -> Self {
Self {
event_type: "default".to_string(),
event_source: "default".to_string(),
event_destination: "default".to_string(),
}
}
}
impl Default for EventConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,17 @@
/// Event configuration module
pub struct EventConfig {
pub event_type: String,
pub event_source: String,
pub event_destination: String,
}
impl EventConfig {
/// Creates a new instance of `EventConfig` with default values.
pub fn new() -> Self {
Self {
event_type: "default".to_string(),
event_source: "default".to_string(),
event_destination: "default".to_string(),
}
}
}

View File

@@ -0,0 +1,2 @@
pub(crate) mod config;
pub(crate) mod event;

9
crates/config/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
use crate::observability::config::ObservabilityConfig;
mod config;
mod constants;
mod event;
mod observability;
pub use config::RustFsConfig;
pub use constants::app::*;

View File

@@ -0,0 +1,28 @@
use crate::observability::logger::LoggerConfig;
use crate::observability::otel::OtelConfig;
use crate::observability::sink::SinkConfig;
use serde::Deserialize;
/// Observability configuration
#[derive(Debug, Deserialize, Clone)]
pub struct ObservabilityConfig {
pub otel: OtelConfig,
pub sinks: SinkConfig,
pub logger: Option<LoggerConfig>,
}
impl ObservabilityConfig {
pub fn new() -> Self {
Self {
otel: OtelConfig::new(),
sinks: SinkConfig::new(),
logger: Some(LoggerConfig::new()),
}
}
}
impl Default for ObservabilityConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,25 @@
use serde::Deserialize;
/// File sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct FileSinkConfig {
pub path: String,
pub max_size: u64,
pub max_backups: u64,
}
impl FileSinkConfig {
pub fn new() -> Self {
Self {
path: "".to_string(),
max_size: 0,
max_backups: 0,
}
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,23 @@
use serde::Deserialize;
/// Kafka sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct KafkaSinkConfig {
pub brokers: Vec<String>,
pub topic: String,
}
impl KafkaSinkConfig {
pub fn new() -> Self {
Self {
brokers: vec!["localhost:9092".to_string()],
topic: "rustfs".to_string(),
}
}
}
impl Default for KafkaSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,21 @@
use serde::Deserialize;
/// Logger configuration
#[derive(Debug, Deserialize, Clone)]
pub struct LoggerConfig {
pub queue_capacity: Option<usize>,
}
impl LoggerConfig {
pub fn new() -> Self {
Self {
queue_capacity: Some(10000),
}
}
}
impl Default for LoggerConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,8 @@
pub(crate) mod config;
pub(crate) mod file_sink;
pub(crate) mod kafka_sink;
pub(crate) mod logger;
pub(crate) mod observability;
pub(crate) mod otel;
pub(crate) mod sink;
pub(crate) mod webhook_sink;

View File

@@ -0,0 +1,22 @@
use crate::observability::logger::LoggerConfig;
use crate::observability::otel::OtelConfig;
use crate::observability::sink::SinkConfig;
use serde::Deserialize;
/// Observability configuration
#[derive(Debug, Deserialize, Clone)]
pub struct ObservabilityConfig {
pub otel: OtelConfig,
pub sinks: SinkConfig,
pub logger: Option<LoggerConfig>,
}
impl ObservabilityConfig {
pub fn new() -> Self {
Self {
otel: OtelConfig::new(),
sinks: SinkConfig::new(),
logger: Some(LoggerConfig::new()),
}
}
}

View File

@@ -0,0 +1,27 @@
use serde::Deserialize;
/// OpenTelemetry configuration
#[derive(Debug, Deserialize, Clone)]
pub struct OtelConfig {
pub endpoint: String,
pub service_name: String,
pub service_version: String,
pub resource_attributes: Vec<String>,
}
impl OtelConfig {
pub fn new() -> Self {
Self {
endpoint: "http://localhost:4317".to_string(),
service_name: "rustfs".to_string(),
service_version: "0.1.0".to_string(),
resource_attributes: vec![],
}
}
}
impl Default for OtelConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,28 @@
use crate::observability::file_sink::FileSinkConfig;
use crate::observability::kafka_sink::KafkaSinkConfig;
use crate::observability::webhook_sink::WebhookSinkConfig;
use serde::Deserialize;
/// Sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct SinkConfig {
pub kafka: Option<KafkaSinkConfig>,
pub webhook: Option<WebhookSinkConfig>,
pub file: Option<FileSinkConfig>,
}
impl SinkConfig {
pub fn new() -> Self {
Self {
kafka: None,
webhook: None,
file: Some(FileSinkConfig::new()),
}
}
}
impl Default for SinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,25 @@
use serde::Deserialize;
/// Webhook sink configuration
#[derive(Debug, Deserialize, Clone)]
pub struct WebhookSinkConfig {
pub url: String,
pub method: String,
pub headers: Vec<(String, String)>,
}
impl WebhookSinkConfig {
pub fn new() -> Self {
Self {
url: "http://localhost:8080/webhook".to_string(),
method: "POST".to_string(),
headers: vec![],
}
}
}
impl Default for WebhookSinkConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -37,7 +37,7 @@ async fn receive_webhook(Json(payload): Json<Value>) -> StatusCode {
println!("current time:{:04}-{:02}-{:02} {:02}:{:02}:{:02}", year, month, day, hour, minute, second);
println!(
"received a webhook request time:{} content:\n {}",
seconds.to_string(),
seconds,
serde_json::to_string_pretty(&payload).unwrap()
);
StatusCode::OK
@@ -66,10 +66,10 @@ fn convert_seconds_to_date(seconds: u64) -> (u32, u32, u32, u32, u32, u32) {
// calculate month
let days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
for m in 0..12 {
if total_seconds >= days_in_month[m] * seconds_per_day {
for m in &days_in_month {
if total_seconds >= m * seconds_per_day {
month += 1;
total_seconds -= days_in_month[m] * seconds_per_day;
total_seconds -= m * seconds_per_day;
} else {
break;
}

View File

@@ -7,11 +7,13 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
/// Handles incoming events from the producer.
///
/// This function is responsible for receiving events from the producer and sending them to the appropriate adapters.
/// It also handles the shutdown process and saves any pending logs to the event store.
#[instrument(skip_all)]
pub async fn event_bus(
mut rx: mpsc::Receiver<Event>,
adapters: Vec<Arc<dyn ChannelAdapter>>,

View File

@@ -162,7 +162,7 @@ impl NotifierConfig {
}
}
const DEFAULT_CONFIG_FILE: &str = "obs";
const DEFAULT_CONFIG_FILE: &str = "event";
/// Provide temporary directories as default storage paths
fn default_store_path() -> String {

View File

@@ -15,6 +15,18 @@ pub struct Identity {
pub principal_id: String,
}
impl Identity {
/// Create a new Identity instance
pub fn new(principal_id: String) -> Self {
Self { principal_id }
}
/// Set the principal ID
pub fn set_principal_id(&mut self, principal_id: String) {
self.principal_id = principal_id;
}
}
/// A struct representing the bucket information
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Bucket {
@@ -24,6 +36,32 @@ pub struct Bucket {
pub arn: String,
}
impl Bucket {
/// Create a new Bucket instance
pub fn new(name: String, owner_identity: Identity, arn: String) -> Self {
Self {
name,
owner_identity,
arn,
}
}
/// Set the name of the bucket
pub fn set_name(&mut self, name: String) {
self.name = name;
}
/// Set the ARN of the bucket
pub fn set_arn(&mut self, arn: String) {
self.arn = arn;
}
/// Set the owner identity of the bucket
pub fn set_owner_identity(&mut self, owner_identity: Identity) {
self.owner_identity = owner_identity;
}
}
/// A struct representing the object information
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Object {
@@ -41,6 +79,64 @@ pub struct Object {
pub sequencer: String,
}
impl Object {
/// Create a new Object instance
pub fn new(
key: String,
size: Option<i64>,
etag: Option<String>,
content_type: Option<String>,
user_metadata: Option<HashMap<String, String>>,
version_id: Option<String>,
sequencer: String,
) -> Self {
Self {
key,
size,
etag,
content_type,
user_metadata,
version_id,
sequencer,
}
}
/// Set the key
pub fn set_key(&mut self, key: String) {
self.key = key;
}
/// Set the size
pub fn set_size(&mut self, size: Option<i64>) {
self.size = size;
}
/// Set the etag
pub fn set_etag(&mut self, etag: Option<String>) {
self.etag = etag;
}
/// Set the content type
pub fn set_content_type(&mut self, content_type: Option<String>) {
self.content_type = content_type;
}
/// Set the user metadata
pub fn set_user_metadata(&mut self, user_metadata: Option<HashMap<String, String>>) {
self.user_metadata = user_metadata;
}
/// Set the version ID
pub fn set_version_id(&mut self, version_id: Option<String>) {
self.version_id = version_id;
}
/// Set the sequencer
pub fn set_sequencer(&mut self, sequencer: String) {
self.sequencer = sequencer;
}
}
/// A struct representing the metadata of the event
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Metadata {
@@ -52,6 +148,57 @@ pub struct Metadata {
pub object: Object,
}
impl Default for Metadata {
fn default() -> Self {
Self::new()
}
}
impl Metadata {
/// Create a new Metadata instance with default values
pub fn new() -> Self {
Self {
schema_version: "1.0".to_string(),
configuration_id: "default".to_string(),
bucket: Bucket::new(
"default".to_string(),
Identity::new("default".to_string()),
"arn:aws:s3:::default".to_string(),
),
object: Object::new("default".to_string(), None, None, None, None, None, "default".to_string()),
}
}
/// Create a new Metadata instance
pub fn create(schema_version: String, configuration_id: String, bucket: Bucket, object: Object) -> Self {
Self {
schema_version,
configuration_id,
bucket,
object,
}
}
/// Set the schema version
pub fn set_schema_version(&mut self, schema_version: String) {
self.schema_version = schema_version;
}
/// Set the configuration ID
pub fn set_configuration_id(&mut self, configuration_id: String) {
self.configuration_id = configuration_id;
}
/// Set the bucket
pub fn set_bucket(&mut self, bucket: Bucket) {
self.bucket = bucket;
}
/// Set the object
pub fn set_object(&mut self, object: Object) {
self.object = object;
}
}
/// A struct representing the source of the event
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Source {
@@ -61,6 +208,28 @@ pub struct Source {
pub user_agent: String,
}
impl Source {
/// Create a new Source instance
pub fn new(host: String, port: String, user_agent: String) -> Self {
Self { host, port, user_agent }
}
/// Set the host
pub fn set_host(&mut self, host: String) {
self.host = host;
}
/// Set the port
pub fn set_port(&mut self, port: String) {
self.port = port;
}
/// Set the user agent
pub fn set_user_agent(&mut self, user_agent: String) {
self.user_agent = user_agent;
}
}
/// Builder for creating an Event.
///
/// This struct is used to build an Event object with various parameters.

View File

@@ -1,6 +1,7 @@
use crate::{create_adapters, Error, Event, NotifierConfig, NotifierSystem};
use std::sync::{atomic, Arc};
use tokio::sync::{Mutex, OnceCell};
use tracing::instrument;
static GLOBAL_SYSTEM: OnceCell<Arc<Mutex<NotifierSystem>>> = OnceCell::const_new();
static INITIALIZED: atomic::AtomicBool = atomic::AtomicBool::new(false);
@@ -113,6 +114,7 @@ pub fn is_ready() -> bool {
/// - The system is not initialized.
/// - The system is not ready.
/// - Sending the event fails.
#[instrument(fields(event))]
pub async fn send_event(event: Event) -> Result<(), Error> {
if !READY.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("Notification system not ready, please wait for initialization to complete"));
@@ -124,6 +126,7 @@ pub async fn send_event(event: Event) -> Result<(), Error> {
}
/// Shuts down the notification system.
#[instrument]
pub async fn shutdown() -> Result<(), Error> {
if let Some(system) = GLOBAL_SYSTEM.get() {
tracing::info!("Shutting down notification system start");
@@ -189,7 +192,7 @@ mod tests {
let config = NotifierConfig::default();
let _ = initialize(config.clone()).await; // first initialization
let result = initialize(config).await; // second initialization
assert!(!result.is_ok(), "Initialization should succeed");
assert!(result.is_err(), "Initialization should succeed");
assert!(result.is_err(), "Re-initialization should fail");
}
@@ -211,7 +214,7 @@ mod tests {
..Default::default()
};
let result = initialize(config).await;
assert!(!result.is_err(), "Initialization with invalid config should fail");
assert!(result.is_ok(), "Initialization with invalid config should fail");
assert!(is_initialized(), "System should not be marked as initialized after failure");
assert!(is_ready(), "System should not be marked as ready after failure");
}

View File

@@ -2,6 +2,7 @@ use crate::{event_bus, ChannelAdapter, Error, Event, EventStore, NotifierConfig}
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
/// The `NotificationSystem` struct represents the notification system.
/// It manages the event bus and the adapters.
@@ -18,6 +19,7 @@ pub struct NotifierSystem {
impl NotifierSystem {
/// Creates a new `NotificationSystem` instance.
#[instrument(skip(config))]
pub async fn new(config: NotifierConfig) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
let store = Arc::new(EventStore::new(&config.store_path).await?);
@@ -44,6 +46,7 @@ impl NotifierSystem {
/// Starts the notification system.
/// It initializes the event bus and the producer.
#[instrument(skip_all)]
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
if self.shutdown.is_cancelled() {
let error = Error::custom("System is shutting down");
@@ -67,6 +70,7 @@ impl NotifierSystem {
/// Sends an event to the notification system.
/// This method is used to send events to the event bus.
#[instrument(skip(self))]
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
self.log(tracing::Level::DEBUG, "send_event", &format!("Sending event: {:?}", event));
if self.shutdown.is_cancelled() {
@@ -85,6 +89,7 @@ impl NotifierSystem {
/// Shuts down the notification system.
/// This method is used to cancel the event bus and producer tasks.
#[instrument(skip(self))]
pub async fn shutdown(&mut self) -> Result<(), Error> {
tracing::info!("Shutting down the notification system");
self.shutdown.cancel();
@@ -112,10 +117,13 @@ impl NotifierSystem {
self.shutdown.is_cancelled()
}
fn handle_error(&self, context: &str, error: &Error) {
#[instrument(skip(self))]
pub fn handle_error(&self, context: &str, error: &Error) {
self.log(tracing::Level::ERROR, context, &format!("{:?}", error));
// TODO Can be extended to record to files or send to monitoring systems
}
#[instrument(skip(self))]
fn log(&self, level: tracing::Level, context: &str, message: &str) {
match level {
tracing::Level::ERROR => tracing::error!("[{}] {}", context, message),

View File

@@ -5,6 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::fs::{create_dir_all, File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::RwLock;
use tracing::instrument;
/// `EventStore` is a struct that manages the storage of event logs.
pub struct EventStore {
@@ -21,6 +22,7 @@ impl EventStore {
})
}
#[instrument(skip(self))]
pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> {
let _guard = self.lock.write().await;
let file_path = format!(

View File

@@ -210,7 +210,8 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true);
.with_line_number(true)
.with_filter(build_env_filter(logger_level, None));
let filter = build_env_filter(logger_level, None);
let otel_filter = build_env_filter(logger_level, None);
@@ -218,7 +219,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure registry to avoid repeated calls to filter methods
let _registry = tracing_subscriber::registry()
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(if config.local_logging_enabled.unwrap_or(false) {
@@ -230,17 +231,13 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
.with(otel_layer)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
info!("Telemetry logging enabled: {:?}", config.local_logging_enabled);
// if config.local_logging_enabled.unwrap_or(false) {
// registry.with(fmt_layer).init();
// } else {
// registry.init();
// }
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}",
endpoint, logger_level
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
std::env::var("RUST_LOG").unwrap_or_else(|_| "未设置".to_string())
);
}
}
@@ -255,7 +252,6 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter {
let level = default_level.unwrap_or(logger_level);
let mut filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level));
if !matches!(logger_level, "trace" | "debug") {
let directives: SmallVec<[&str; 5]> = smallvec::smallvec!["hyper", "tonic", "h2", "reqwest", "tower"];
for directive in directives {

18
crates/utils/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "rustfs-utils"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
local-ip-address = { workspace = true }
rustfs-config = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
rustls-pki-types = { workspace = true }
tracing = { workspace = true }
[lints]
workspace = true

View File

@@ -1,57 +1,48 @@
use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustls::server::{ClientHello, ResolvesServerCert, ResolvesServerCertUsingSni};
use rustls::sign::CertifiedKey;
use rustls_pemfile::{certs, private_key};
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Error;
use std::net::IpAddr;
use std::path::Path;
use std::sync::Arc;
use std::{fs, io};
use tracing::{debug, warn};
/// Get the local IP address.
/// This function retrieves the local IP address of the machine.
pub(crate) fn get_local_ip() -> Option<std::net::Ipv4Addr> {
match local_ip_address::local_ip() {
Ok(IpAddr::V4(ip)) => Some(ip),
Err(_) => None,
Ok(IpAddr::V6(_)) => todo!(),
}
}
/// Load public certificate from file.
/// This function loads a public certificate from the specified file.
pub(crate) fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
pub fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
// Open certificate file.
let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let cert_file = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(cert_file);
// Load and return certificate.
let certs = certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| error(format!("certificate file {} format error", filename)))?;
.map_err(|_| certs_error(format!("certificate file {} format error", filename)))?;
if certs.is_empty() {
return Err(error(format!("No valid certificate was found in the certificate file {}", filename)));
return Err(certs_error(format!(
"No valid certificate was found in the certificate file {}",
filename
)));
}
Ok(certs)
}
/// Load private key from file.
/// This function loads a private key from the specified file.
pub(crate) fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
pub fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
// Open keyfile.
let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let keyfile = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(keyfile);
// Load and return a single private key.
private_key(&mut reader)?.ok_or_else(|| error(format!("no private key found in {}", filename)))
private_key(&mut reader)?.ok_or_else(|| certs_error(format!("no private key found in {}", filename)))
}
/// error function
pub(crate) fn error(err: String) -> Error {
pub fn certs_error(err: String) -> Error {
Error::new(io::ErrorKind::Other, err)
}
@@ -59,14 +50,14 @@ pub(crate) fn error(err: String) -> Error {
/// This function loads all certificate and private key pairs from the specified directory.
/// It looks for files named `rustfs_cert.pem` and `rustfs_key.pem` in each subdirectory.
/// The root directory can also contain a default certificate/private key pair.
pub(crate) fn load_all_certs_from_directory(
pub fn load_all_certs_from_directory(
dir_path: &str,
) -> io::Result<HashMap<String, (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>> {
let mut cert_key_pairs = HashMap::new();
let dir = Path::new(dir_path);
if !dir.exists() || !dir.is_dir() {
return Err(error(format!(
return Err(certs_error(format!(
"The certificate directory does not exist or is not a directory: {}",
dir_path
)));
@@ -80,10 +71,10 @@ pub(crate) fn load_all_certs_from_directory(
debug!("find the root directory certificate: {:?}", root_cert_path);
let root_cert_str = root_cert_path
.to_str()
.ok_or_else(|| error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?;
.ok_or_else(|| certs_error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?;
let root_key_str = root_key_path
.to_str()
.ok_or_else(|| error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?;
.ok_or_else(|| certs_error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?;
match load_cert_key_pair(root_cert_str, root_key_str) {
Ok((certs, key)) => {
// The root directory certificate is used as the default certificate and is stored using special keys.
@@ -104,7 +95,7 @@ pub(crate) fn load_all_certs_from_directory(
let domain_name = path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| error(format!("invalid domain name directory:{:?}", path)))?;
.ok_or_else(|| certs_error(format!("invalid domain name directory:{:?}", path)))?;
// find certificate and private key files
let cert_path = path.join(RUSTFS_TLS_CERT); // e.g., rustfs_cert.pem
@@ -125,7 +116,10 @@ pub(crate) fn load_all_certs_from_directory(
}
if cert_key_pairs.is_empty() {
return Err(error(format!("No valid certificate/private key pair found in directory {}", dir_path)));
return Err(certs_error(format!(
"No valid certificate/private key pair found in directory {}",
dir_path
)));
}
Ok(cert_key_pairs)
@@ -171,7 +165,7 @@ pub fn create_multi_cert_resolver(
for (domain, (certs, key)) in cert_key_pairs {
// create a signature
let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key)
.map_err(|_| error(format!("unsupported private key types:{}", domain)))?;
.map_err(|_| certs_error(format!("unsupported private key types:{}", domain)))?;
// create a CertifiedKey
let certified_key = CertifiedKey::new(certs, signing_key);
@@ -181,7 +175,7 @@ pub fn create_multi_cert_resolver(
// add certificate to resolver
resolver
.add(&domain, certified_key)
.map_err(|e| error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?;
.map_err(|e| certs_error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?;
}
}

43
crates/utils/src/ip.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::net::{IpAddr, Ipv4Addr};
/// Get the IP address of the machine
///
/// Priority is given to trying to get the IPv4 address, and if it fails, try to get the IPv6 address.
/// If both fail to retrieve, None is returned.
///
/// # Returns
///
/// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6)
/// * `None` - Unable to obtain any native IP address
pub fn get_local_ip() -> Option<IpAddr> {
local_ip_address::local_ip()
.ok()
.or_else(|| local_ip_address::local_ipv6().ok())
}
/// Get the IP address of the machine as a string
///
/// If the IP address cannot be obtained, returns "127.0.0.1" as the default value.
///
/// # Returns
///
/// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value
pub fn get_local_ip_with_default() -> String {
get_local_ip()
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_local_ip() {
match get_local_ip() {
Some(ip) => println!("the ip address of this machine:{}", ip),
None => println!("Unable to obtain the IP address of the machine"),
}
assert!(get_local_ip().is_some());
}
}

11
crates/utils/src/lib.rs Normal file
View File

@@ -0,0 +1,11 @@
mod certs;
mod ip;
mod net;
pub use certs::certs_error;
pub use certs::create_multi_cert_resolver;
pub use certs::load_all_certs_from_directory;
pub use certs::load_certs;
pub use certs::load_private_key;
pub use ip::get_local_ip;
pub use ip::get_local_ip_with_default;

1
crates/utils/src/net.rs Normal file
View File

@@ -0,0 +1 @@

View File

@@ -1,6 +1,6 @@
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.120.0
image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.124.0
environment:
- TZ=Asia/Shanghai
volumes:
@@ -16,7 +16,7 @@ services:
networks:
- rustfs-network
jaeger:
image: jaegertracing/jaeger:2.4.0
image: jaegertracing/jaeger:2.5.0
environment:
- TZ=Asia/Shanghai
ports:
@@ -26,7 +26,7 @@ services:
networks:
- rustfs-network
prometheus:
image: prom/prometheus:v3.2.1
image: prom/prometheus:v3.3.0
environment:
- TZ=Asia/Shanghai
volumes:
@@ -36,7 +36,7 @@ services:
networks:
- rustfs-network
loki:
image: grafana/loki:3.4.2
image: grafana/loki:3.5.0
environment:
- TZ=Asia/Shanghai
volumes:
@@ -47,7 +47,7 @@ services:
networks:
- rustfs-network
grafana:
image: grafana/grafana:11.6.0
image: grafana/grafana:11.6.1
ports:
- "3000:3000" # Web UI
environment:
@@ -63,10 +63,10 @@ services:
container_name: node1
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ADDRESS=:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml
- RUSTFS_CONSOLE_ADDRESS=:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml
platform: linux/amd64
ports:
- "9001:9000" # 映射宿主机的 9001 端口到容器的 9000 端口
@@ -84,10 +84,10 @@ services:
container_name: node2
environment:
- RUSTFS_VOLUMES=/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ADDRESS=:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml
- RUSTFS_CONSOLE_ADDRESS=:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml
platform: linux/amd64
ports:
- "9002:9000" # 映射宿主机的 9002 端口到容器的 9000 端口
@@ -105,10 +105,10 @@ services:
container_name: node3
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ADDRESS=:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml
- RUSTFS_CONSOLE_ADDRESS=:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml
platform: linux/amd64
ports:
- "9003:9000" # 映射宿主机的 9003 端口到容器的 9000 端口
@@ -126,10 +126,10 @@ services:
container_name: node4
environment:
- RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4}
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ADDRESS=:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml
- RUSTFS_CONSOLE_ADDRESS=:9002
- RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml
platform: linux/amd64
ports:
- "9004:9000" # 映射宿主机的 9004 端口到容器的 9000 端口

View File

@@ -11,14 +11,15 @@ rust-version.workspace = true
workspace = true
[dependencies]
rustfs-config = { workspace = true }
async-trait.workspace = true
backon.workspace = true
blake2 = "0.10.6"
blake2 = { workspace = true }
bytes.workspace = true
common.workspace = true
policy.workspace = true
chrono.workspace = true
glob = "0.3.2"
glob = { workspace = true }
thiserror.workspace = true
flatbuffers.workspace = true
futures.workspace = true
@@ -30,16 +31,16 @@ serde_json.workspace = true
tracing-error.workspace = true
s3s.workspace = true
http.workspace = true
highway = "1.3.0"
highway = { workspace = true }
url.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-erasure = { workspace = true }
transform-stream = "0.3.1"
lazy_static.workspace = true
lock.workspace = true
regex = "1.11.1"
netif = "0.1.6"
nix = { version = "0.29.0", features = ["fs"] }
regex = { workspace = true }
netif = { workspace = true }
nix = { workspace = true }
path-absolutize = "3.1.1"
protos.workspace = true
rmp.workspace = true
@@ -53,13 +54,13 @@ hex-simd = "0.8.0"
path-clean = "1.0.1"
tempfile.workspace = true
tokio = { workspace = true, features = ["io-util", "sync", "signal"] }
tokio-stream = "0.1.17"
tokio-stream = { workspace = true }
tonic.workspace = true
tower.workspace = true
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.15", features = ["xxh64"] }
num = "0.4.3"
num_cpus = "1.16"
num_cpus = { workspace = true }
s3s-policy.workspace = true
rand.workspace = true
pin-project-lite.workspace = true
@@ -68,20 +69,18 @@ madmin.workspace = true
workers.workspace = true
reqwest = { workspace = true }
urlencoding = "2.1.3"
smallvec = "1.15.0"
smallvec = { workspace = true }
shadow-rs.workspace = true
[target.'cfg(not(windows))'.dependencies]
nix = { version = "0.29.0", features = ["fs"] }
nix = { workspace = true }
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"
winapi = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[build-dependencies]
shadow-rs.workspace = true
shadow-rs = { workspace = true, features = ["build", "metadata"] }

View File

@@ -534,6 +534,7 @@ impl Writer for BitrotFileWriter {
self
}
#[tracing::instrument(level = "info", skip_all)]
async fn write(&mut self, buf: Bytes) -> Result<()> {
if buf.is_empty() {
return Ok(());

View File

@@ -38,7 +38,9 @@ use crate::set_disk::{
CHECK_PART_VOLUME_NOT_FOUND,
};
use crate::store_api::{BitrotAlgorithm, StorageAPI};
use crate::utils::fs::{access, lstat, remove, remove_all, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY};
use crate::utils::fs::{
access, lstat, remove, remove_all, remove_all_std, remove_std, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY,
};
use crate::utils::os::get_info;
use crate::utils::path::{
self, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX,
@@ -259,7 +261,7 @@ impl LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn check_format_json(&self) -> Result<Metadata> {
let md = fs::metadata(&self.format_path).await.map_err(|e| match e.kind() {
let md = std::fs::metadata(&self.format_path).map_err(|e| match e.kind() {
ErrorKind::NotFound => DiskError::DiskNotFound,
ErrorKind::PermissionDenied => DiskError::FileAccessDenied,
_ => {
@@ -315,9 +317,9 @@ impl LocalDisk {
#[allow(unused_variables)]
pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> {
if recursive {
remove_all(delete_path).await?;
remove_all_std(delete_path)?;
} else {
remove(delete_path).await?;
remove_std(delete_path)?;
}
return Ok(());
@@ -365,7 +367,7 @@ impl LocalDisk {
Ok(())
}
// #[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn delete_file(
&self,
base_path: &PathBuf,
@@ -688,6 +690,7 @@ impl LocalDisk {
}
// write_all_private with check_path_length
#[tracing::instrument(level = "debug", skip_all)]
pub async fn write_all_private(
&self,
volume: &str,
@@ -1213,7 +1216,7 @@ impl DiskAPI for LocalDisk {
Ok(data)
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip_all)]
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
self.write_all_public(volume, path, data).await
}
@@ -1721,7 +1724,7 @@ impl DiskAPI for LocalDisk {
Ok(())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn rename_data(
&self,
src_volume: &str,
@@ -1732,7 +1735,7 @@ impl DiskAPI for LocalDisk {
) -> Result<RenameDataResp> {
let src_volume_dir = self.get_bucket_path(src_volume)?;
if !skip_access_checks(src_volume) {
if let Err(e) = utils::fs::access(&src_volume_dir).await {
if let Err(e) = utils::fs::access_std(&src_volume_dir) {
info!("access checks failed, src_volume_dir: {:?}, err: {}", src_volume_dir, e.to_string());
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
}
@@ -1740,7 +1743,7 @@ impl DiskAPI for LocalDisk {
let dst_volume_dir = self.get_bucket_path(dst_volume)?;
if !skip_access_checks(dst_volume) {
if let Err(e) = utils::fs::access(&dst_volume_dir).await {
if let Err(e) = utils::fs::access_std(&dst_volume_dir) {
info!("access checks failed, dst_volume_dir: {:?}, err: {}", dst_volume_dir, e.to_string());
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
}
@@ -1913,7 +1916,7 @@ impl DiskAPI for LocalDisk {
if let Some(src_file_path_parent) = src_file_path.parent() {
if src_volume != super::RUSTFS_META_MULTIPART_BUCKET {
let _ = utils::fs::remove(src_file_path_parent).await;
let _ = utils::fs::remove_std(src_file_path_parent);
} else {
let _ = self
.delete_file(&dst_volume_dir, &src_file_path_parent.to_path_buf(), true, false)

View File

@@ -108,6 +108,7 @@ pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> Result<Vec<String>>
Ok(volumes)
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn rename_all(
src_file_path: impl AsRef<Path>,
dst_file_path: impl AsRef<Path>,
@@ -136,7 +137,7 @@ pub async fn reliable_rename(
base_dir: impl AsRef<Path>,
) -> io::Result<()> {
if let Some(parent) = dst_file_path.as_ref().parent() {
if !file_exists(parent).await {
if !file_exists(parent) {
info!("reliable_rename reliable_mkdir_all parent: {:?}", parent);
reliable_mkdir_all(parent, base_dir.as_ref()).await?;
}
@@ -144,7 +145,7 @@ pub async fn reliable_rename(
let mut i = 0;
loop {
if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await {
if let Err(e) = utils::fs::rename_std(src_file_path.as_ref(), dst_file_path.as_ref()) {
if os_is_not_exist(&e) && i == 0 {
i += 1;
continue;
@@ -221,6 +222,6 @@ pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>
Ok(())
}
pub async fn file_exists(path: impl AsRef<Path>) -> bool {
fs::metadata(path.as_ref()).await.map(|_| true).unwrap_or(false)
pub fn file_exists(path: impl AsRef<Path>) -> bool {
std::fs::metadata(path.as_ref()).map(|_| true).unwrap_or(false)
}

View File

@@ -1,5 +1,6 @@
use crate::bitrot::{BitrotReader, BitrotWriter};
use crate::error::clone_err;
use crate::io::Etag;
use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs};
use bytes::{Bytes, BytesMut};
use common::error::{Error, Result};
@@ -8,8 +9,10 @@ use reed_solomon_erasure::galois_8::ReedSolomon;
use smallvec::SmallVec;
use std::any::Any;
use std::io::ErrorKind;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tracing::warn;
use tracing::{error, info};
// use tracing::debug;
@@ -26,7 +29,7 @@ pub struct Erasure {
encoder: Option<ReedSolomon>,
pub block_size: usize,
_id: Uuid,
buf: Vec<u8>,
_buf: Vec<u8>,
}
impl Erasure {
@@ -46,61 +49,65 @@ impl Erasure {
block_size,
encoder,
_id: Uuid::new_v4(),
buf: vec![0u8; block_size],
_buf: vec![0u8; block_size],
}
}
#[tracing::instrument(level = "debug", skip(self, reader, writers))]
#[tracing::instrument(level = "info", skip(self, reader, writers))]
pub async fn encode<S>(
&mut self,
reader: &mut S,
self: Arc<Self>,
mut reader: S,
writers: &mut [Option<BitrotWriter>],
// block_size: usize,
total_size: usize,
write_quorum: usize,
) -> Result<usize>
) -> Result<(usize, String)>
where
S: AsyncRead + Unpin + Send + 'static,
S: AsyncRead + Etag + Unpin + Send + 'static,
{
// pin_mut!(body);
// let mut reader = tokio_util::io::StreamReader::new(
// body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
// );
let (tx, mut rx) = mpsc::channel(5);
let task = tokio::spawn(async move {
let mut buf = vec![0u8; self.block_size];
let mut total: usize = 0;
loop {
if total_size > 0 {
let new_len = {
let remain = total_size - total;
if remain > self.block_size {
self.block_size
} else {
remain
}
};
let mut total: usize = 0;
let mut blocks = <SmallVec<[Bytes; 16]>>::new();
loop {
if total_size > 0 {
let new_len = {
let remain = total_size - total;
if remain > self.block_size {
self.block_size
} else {
remain
if new_len == 0 && total > 0 {
break;
}
};
if new_len == 0 && total > 0 {
buf.resize(new_len, 0u8);
match reader.read_exact(&mut buf).await {
Ok(res) => res,
Err(e) => {
if let ErrorKind::UnexpectedEof = e.kind() {
break;
} else {
return Err(Error::new(e));
}
}
};
total += buf.len();
}
let blocks = Arc::new(Box::pin(self.clone().encode_data(&buf)?));
let _ = tx.send(blocks).await;
if total_size == 0 {
break;
}
self.buf.resize(new_len, 0u8);
match reader.read_exact(&mut self.buf).await {
Ok(res) => res,
Err(e) => {
if let ErrorKind::UnexpectedEof = e.kind() {
break;
} else {
return Err(Error::new(e));
}
}
};
total += self.buf.len();
}
let etag = reader.etag().await;
Ok((total, etag))
});
self.encode_data(&self.buf, &mut blocks)?;
while let Some(blocks) = rx.recv().await {
let write_futures = writers.iter_mut().enumerate().map(|(i, w_op)| {
let i_inner = i;
let blocks_inner = blocks.clone();
@@ -125,84 +132,8 @@ impl Erasure {
warn!("Erasure encode errs {:?}", &errs);
return Err(err);
}
if total_size == 0 {
break;
}
}
Ok(total)
// // let stream = ChunkedStream::new(body, self.block_size);
// let stream = ChunkedStream::new(body, total_size, self.block_size, false);
// let mut total: usize = 0;
// // let mut idx = 0;
// pin_mut!(stream);
// // warn!("encode start...");
// loop {
// match stream.next().await {
// Some(result) => match result {
// Ok(data) => {
// total += data.len();
// // EOF
// if data.is_empty() {
// break;
// }
// // idx += 1;
// // warn!("encode {} get data {:?}", data.len(), data.to_vec());
// let blocks = self.encode_data(data.as_ref())?;
// // warn!(
// // "encode shard size: {}/{} from block_size {}, total_size {} ",
// // blocks[0].len(),
// // blocks.len(),
// // data.len(),
// // total_size
// // );
// let mut errs = Vec::new();
// for (i, w_op) in writers.iter_mut().enumerate() {
// if let Some(w) = w_op {
// match w.write(blocks[i].as_ref()).await {
// Ok(_) => errs.push(None),
// Err(e) => errs.push(Some(e)),
// }
// } else {
// errs.push(Some(Error::new(DiskError::DiskNotFound)));
// }
// }
// let none_count = errs.iter().filter(|&x| x.is_none()).count();
// if none_count >= write_quorum {
// continue;
// }
// if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
// warn!("Erasure encode errs {:?}", &errs);
// return Err(err);
// }
// }
// Err(e) => {
// warn!("poll result err {:?}", &e);
// return Err(Error::msg(e.to_string()));
// }
// },
// None => {
// // warn!("poll empty result");
// break;
// }
// }
// }
// let _ = close_bitrot_writers(writers).await?;
// Ok(total)
task.await?
}
pub async fn decode<W>(
@@ -356,8 +287,8 @@ impl Erasure {
self.data_shards + self.parity_shards
}
#[tracing::instrument(level = "debug", skip_all, fields(data_len=data.len()))]
pub fn encode_data(&self, data: &[u8], shards: &mut SmallVec<[Bytes; 16]>) -> Result<()> {
#[tracing::instrument(level = "info", skip_all, fields(data_len=data.len()))]
pub fn encode_data(self: Arc<Self>, data: &[u8]) -> Result<Vec<Bytes>> {
let (shard_size, total_size) = self.need_size(data.len());
// 生成一个新的 所需的所有分片数据长度
@@ -379,14 +310,13 @@ impl Erasure {
// 零拷贝分片,所有 shard 引用 data_buffer
let mut data_buffer = data_buffer.freeze();
shards.clear();
shards.reserve(self.total_shard_count());
let mut shards = Vec::with_capacity(self.total_shard_count());
for _ in 0..self.total_shard_count() {
let shard = data_buffer.split_to(shard_size);
shards.push(shard);
}
Ok(())
Ok(shards)
}
pub fn decode_data(&self, shards: &mut [Option<Vec<u8>>]) -> Result<()> {
@@ -617,7 +547,6 @@ impl ShardReader {
#[cfg(test)]
mod test {
use super::*;
#[test]
@@ -626,8 +555,7 @@ mod test {
let parity_shards = 2;
let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
let ec = Erasure::new(data_shards, parity_shards, 1);
let mut shards = SmallVec::new();
ec.encode_data(data, &mut shards).unwrap();
let shards = Arc::new(ec).encode_data(data).unwrap();
println!("shards:{:?}", shards);
let mut s: Vec<_> = shards
@@ -643,6 +571,7 @@ mod test {
println!("sss:{:?}", &s);
let ec = Erasure::new(data_shards, parity_shards, 1);
ec.decode_data(&mut s).unwrap();
// ec.encoder.reconstruct(&mut s).unwrap();

View File

@@ -58,10 +58,12 @@ impl FileMeta {
}
// isXL2V1Format
#[tracing::instrument(level = "debug", skip_all)]
pub fn is_xl2_v1_format(buf: &[u8]) -> bool {
!matches!(Self::check_xl2_v1(buf), Err(_e))
}
#[tracing::instrument(level = "debug", skip_all)]
pub fn load(buf: &[u8]) -> Result<FileMeta> {
let mut xl = FileMeta::default();
xl.unmarshal_msg(buf)?;
@@ -245,7 +247,7 @@ impl FileMeta {
}
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip_all)]
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut wr = Vec::new();
@@ -363,6 +365,7 @@ impl FileMeta {
}
// shard_data_dir_count 查询 vid下data_dir的数量
#[tracing::instrument(level = "debug", skip_all)]
pub fn shard_data_dir_count(&self, vid: &Option<Uuid>, data_dir: &Option<Uuid>) -> usize {
self.versions
.iter()
@@ -434,6 +437,7 @@ impl FileMeta {
}
// 添加版本
#[tracing::instrument(level = "debug", skip_all)]
pub fn add_version(&mut self, fi: FileInfo) -> Result<()> {
let vid = fi.version_id;

View File

@@ -20,8 +20,6 @@ pub const DISK_MIN_INODES: u64 = 1000;
pub const DISK_FILL_FRACTION: f64 = 0.99;
pub const DISK_RESERVE_FRACTION: f64 = 0.15;
pub const DEFAULT_PORT: u16 = 9000;
lazy_static! {
static ref GLOBAL_RUSTFS_PORT: OnceLock<u16> = OnceLock::new();
pub static ref GLOBAL_OBJECT_API: OnceLock<Arc<ECStore>> = OnceLock::new();
@@ -41,31 +39,37 @@ lazy_static! {
pub static ref GLOBAL_BOOT_TIME: OnceCell<SystemTime> = OnceCell::new();
}
/// Get the global rustfs port
pub fn global_rustfs_port() -> u16 {
if let Some(p) = GLOBAL_RUSTFS_PORT.get() {
*p
} else {
DEFAULT_PORT
rustfs_config::DEFAULT_PORT
}
}
/// Set the global rustfs port
pub fn set_global_rustfs_port(value: u16) {
GLOBAL_RUSTFS_PORT.set(value).expect("set_global_rustfs_port fail");
}
/// Get the global rustfs port
pub fn set_global_deployment_id(id: Uuid) {
globalDeploymentIDPtr.set(id).unwrap();
}
/// Get the global deployment id
pub fn get_global_deployment_id() -> Option<String> {
globalDeploymentIDPtr.get().map(|v| v.to_string())
}
/// Get the global deployment id
pub fn set_global_endpoints(eps: Vec<PoolEndpoints>) {
GLOBAL_Endpoints
.set(EndpointServerPools::from(eps))
.expect("GLOBAL_Endpoints set failed")
}
/// Get the global endpoints
pub fn get_global_endpoints() -> EndpointServerPools {
if let Some(eps) = GLOBAL_Endpoints.get() {
eps.clone()

View File

@@ -17,7 +17,7 @@ use crate::{
global::GLOBAL_IsDistErasure,
heal::heal_commands::{HealStartSuccess, HEAL_UNKNOWN_SCAN},
new_object_layer_fn,
utils::path::has_profix,
utils::path::has_prefix,
};
use crate::{
heal::heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT},
@@ -786,7 +786,7 @@ impl AllHealState {
let _ = self.mu.write().await;
for (k, v) in self.heal_seq_map.read().await.iter() {
if (has_profix(k, path_s) || has_profix(path_s, k)) && !v.has_ended().await {
if (has_prefix(k, path_s) || has_prefix(path_s, k)) && !v.has_ended().await {
return Err(Error::from_string(format!(
"The provided heal sequence path overlaps with an existing heal path: {}",
k

View File

@@ -1,8 +1,12 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::TryStreamExt;
use md5::Digest;
use md5::Md5;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
@@ -125,10 +129,17 @@ impl AsyncRead for HttpFileReader {
}
}
pub struct EtagReader<R> {
inner: R,
bytes_tx: mpsc::Sender<Bytes>,
md5_rx: oneshot::Receiver<String>,
#[async_trait]
pub trait Etag {
async fn etag(self) -> String;
}
pin_project! {
pub struct EtagReader<R> {
inner: R,
bytes_tx: mpsc::Sender<Bytes>,
md5_rx: oneshot::Receiver<String>,
}
}
impl<R> EtagReader<R> {
@@ -148,8 +159,11 @@ impl<R> EtagReader<R> {
EtagReader { inner, bytes_tx, md5_rx }
}
}
pub async fn etag(self) -> String {
#[async_trait]
impl<R: Send> Etag for EtagReader<R> {
async fn etag(self) -> String {
drop(self.inner);
drop(self.bytes_tx);
self.md5_rx.await.unwrap()
@@ -157,21 +171,28 @@ impl<R> EtagReader<R> {
}
impl<R: AsyncRead + Unpin> AsyncRead for EtagReader<R> {
#[tracing::instrument(level = "debug", skip_all)]
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
let poll = Pin::new(&mut self.inner).poll_read(cx, buf);
if let Poll::Ready(Ok(())) = &poll {
if buf.remaining() == 0 {
#[tracing::instrument(level = "info", skip_all)]
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<tokio::io::Result<()>> {
let me = self.project();
loop {
let rem = buf.remaining();
if rem != 0 {
ready!(Pin::new(&mut *me.inner).poll_read(cx, buf))?;
if buf.remaining() == rem {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")).into();
}
} else {
let bytes = buf.filled();
let bytes = Bytes::copy_from_slice(bytes);
let tx = self.bytes_tx.clone();
let tx = me.bytes_tx.clone();
tokio::spawn(async move {
if let Err(e) = tx.send(bytes).await {
warn!("EtagReader send error: {:?}", e);
}
});
return Poll::Ready(Ok(()));
}
}
poll
}
}

View File

@@ -147,6 +147,7 @@ pub fn reduce_read_quorum_errs(
// 根据写quorum验证错误数量
// 返回最大错误数量的下标或QuorumError
#[tracing::instrument(level = "info", skip_all)]
pub fn reduce_write_quorum_errs(
errs: &[Option<Error>],
ignored_errs: &[Box<dyn CheckErrorFn>],

View File

@@ -284,10 +284,20 @@ impl SetDisks {
// let mut ress = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
let mut file_info = file_infos[i].clone();
let src_bucket = Arc::new(src_bucket.to_string());
let src_object = Arc::new(src_object.to_string());
let dst_bucket = Arc::new(dst_bucket.to_string());
let dst_object = Arc::new(dst_object.to_string());
futures.push(async move {
for (i, (disk, file_info)) in disks.iter().zip(file_infos.iter()).enumerate() {
let mut file_info = file_info.clone();
let disk = disk.clone();
let src_bucket = src_bucket.clone();
let src_object = src_object.clone();
let dst_object = dst_object.clone();
let dst_bucket = dst_bucket.clone();
futures.push(tokio::spawn(async move {
if file_info.erasure.index == 0 {
file_info.erasure.index = i + 1;
}
@@ -297,12 +307,12 @@ impl SetDisks {
}
if let Some(disk) = disk {
disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object)
disk.rename_data(&src_bucket, &src_object, file_info, &dst_bucket, &dst_object)
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
})
}));
}
let mut disk_versions = vec![None; disks.len()];
@@ -311,15 +321,13 @@ impl SetDisks {
let results = join_all(futures).await;
for (idx, result) in results.iter().enumerate() {
match result {
match result.as_ref().map_err(|_| Error::new(DiskError::Unexpected))? {
Ok(res) => {
data_dirs[idx] = res.old_data_dir;
disk_versions[idx].clone_from(&res.sign);
// ress.push(Some(res));
errs.push(None);
}
Err(e) => {
// ress.push(None);
errs.push(Some(clone_err(e)));
}
}
@@ -336,11 +344,14 @@ impl SetDisks {
if let Some(disk) = disks[i].as_ref() {
let fi = file_infos[i].clone();
let old_data_dir = data_dirs[i];
futures.push(async move {
let disk = disk.clone();
let src_bucket = src_bucket.clone();
let src_object = src_object.clone();
futures.push(tokio::spawn(async move {
let _ = disk
.delete_version(
src_bucket,
src_object,
&src_bucket,
&src_object,
fi,
false,
DeleteOptions {
@@ -354,7 +365,7 @@ impl SetDisks {
debug!("rename_data delete_version err {:?}", e);
e
});
});
}));
}
}
@@ -416,41 +427,41 @@ impl SetDisks {
data_dir: &str,
write_quorum: usize,
) -> Result<()> {
let file_path = format!("{}/{}", object, data_dir);
let mut futures = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let file_path = Arc::new(format!("{}/{}", object, data_dir));
let bucket = Arc::new(bucket.to_string());
let futures = disks.iter().map(|disk| {
let file_path = file_path.clone();
futures.push(async move {
let bucket = bucket.clone();
let disk = disk.clone();
tokio::spawn(async move {
if let Some(disk) = disk {
disk.delete(
bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.await
match disk
.delete(
&bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.await
{
Ok(_) => None,
Err(e) => Some(e),
}
} else {
Err(Error::new(DiskError::DiskNotFound))
Some(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errs.push(None);
}
Err(e) => {
errs.push(Some(e));
}
}
}
})
});
let errs: Vec<Option<Error>> = join_all(futures)
.await
.into_iter()
.map(|e| match e {
Ok(e) => e,
Err(_) => Some(Error::new(DiskError::Unexpected)),
})
.collect();
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
return Err(err);
@@ -3759,7 +3770,7 @@ impl ObjectIO for SetDisks {
let tmp_object = format!("{}/{}/part.1", tmp_dir, fi.data_dir.unwrap());
let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let is_inline_buffer = {
if let Some(sc) = GLOBAL_StorageClass.get() {
@@ -3798,19 +3809,18 @@ impl ObjectIO for SetDisks {
}
let stream = replace(&mut data.stream, Box::new(empty()));
let mut etag_stream = EtagReader::new(stream);
let etag_stream = EtagReader::new(stream);
// TODO: etag from header
let w_size = erasure
.encode(&mut etag_stream, &mut writers, data.content_length, write_quorum)
let (w_size, etag) = Arc::new(erasure)
.encode(etag_stream, &mut writers, data.content_length, write_quorum)
.await?; // TODO: 出错,删除临时目录
if let Err(err) = close_bitrot_writers(&mut writers).await {
error!("close_bitrot_writers err {:?}", err);
}
let etag = etag_stream.etag().await;
//TODO: userDefined
user_defined.insert("etag".to_owned(), etag.clone());
@@ -4408,21 +4418,19 @@ impl StorageAPI for SetDisks {
}
}
let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let stream = replace(&mut data.stream, Box::new(empty()));
let mut etag_stream = EtagReader::new(stream);
let etag_stream = EtagReader::new(stream);
let w_size = erasure
.encode(&mut etag_stream, &mut writers, data.content_length, write_quorum)
let (w_size, mut etag) = Arc::new(erasure)
.encode(etag_stream, &mut writers, data.content_length, write_quorum)
.await?;
if let Err(err) = close_bitrot_writers(&mut writers).await {
error!("close_bitrot_writers err {:?}", err);
}
let mut etag = etag_stream.etag().await;
if let Some(ref tag) = opts.preserve_etag {
etag = tag.clone();
}

View File

@@ -2560,6 +2560,7 @@ fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Re
check_multipart_object_args(bucket, object, upload_id)
}
#[tracing::instrument(level = "debug")]
fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(Error::new(StorageError::BucketNameInvalid(bucket.to_string())));

View File

@@ -106,6 +106,11 @@ pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
Ok(())
}
pub fn access_std(path: impl AsRef<Path>) -> io::Result<()> {
std::fs::metadata(path)?;
Ok(())
}
pub async fn lstat(path: impl AsRef<Path>) -> io::Result<Metadata> {
fs::metadata(path).await
}
@@ -114,6 +119,7 @@ pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir_all(path.as_ref()).await
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
let meta = fs::metadata(path.as_ref()).await?;
if meta.is_dir() {
@@ -132,6 +138,25 @@ pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
std::fs::remove_dir(path.as_ref())
} else {
std::fs::remove_file(path.as_ref())
}
}
pub fn remove_all_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
std::fs::remove_dir_all(path.as_ref())
} else {
std::fs::remove_file(path.as_ref())
}
}
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir(path.as_ref()).await
}
@@ -140,6 +165,11 @@ pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<
fs::rename(from, to).await
}
pub fn rename_std(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
std::fs::rename(from, to)
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn read_file(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
fs::read(path.as_ref()).await
}

View File

@@ -52,7 +52,7 @@ pub fn strings_has_prefix_fold(s: &str, prefix: &str) -> bool {
s.len() >= prefix.len() && (s[..prefix.len()] == *prefix || s[..prefix.len()].eq_ignore_ascii_case(prefix))
}
pub fn has_profix(s: &str, prefix: &str) -> bool {
pub fn has_prefix(s: &str, prefix: &str) -> bool {
if cfg!(target_os = "windows") {
return strings_has_prefix_fold(s, prefix);
}

View File

@@ -30,7 +30,7 @@ clap.workspace = true
crypto = { workspace = true }
datafusion = { workspace = true }
common.workspace = true
const-str = { version = "0.6.1", features = ["std", "proc"] }
const-str = { workspace = true }
ecstore.workspace = true
policy.workspace = true
flatbuffers.workspace = true
@@ -42,7 +42,6 @@ http.workspace = true
http-body.workspace = true
iam = { workspace = true }
lock.workspace = true
local-ip-address = { workspace = true }
matchit = { workspace = true }
mime.workspace = true
mime_guess = { workspace = true }
@@ -51,17 +50,18 @@ pin-project-lite.workspace = true
protos.workspace = true
query = { workspace = true }
rmp-serde.workspace = true
rustfs-config = { workspace = true }
rustfs-event-notifier = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true }
rustls.workspace = true
rustls-pemfile.workspace = true
rustls-pki-types.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_urlencoded = { workspace = true }
shadow-rs = { workspace = true, features = ["build", "metadata"] }
socket2 = { workspace = true }
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util.workspace = true

View File

@@ -1,30 +1,36 @@
rustfs/
├── Cargo.toml
├── src/
│ ├── main.rs # 主入口
│ ├── admin/
│ │ └── mod.rs # 管理接口
│ ├── auth/
│ │ └── mod.rs # 认证模块
│ ├── config/
│ │ ├── mod.rs # 配置模块
│ │ └── options.rs # 命令行参数
│ ├── console/
│ │ ├── mod.rs # 控制台模块
│ │ └── server.rs # 控制台服务器
│ ├── grpc/
│ │ └── mod.rs # gRPC 服务
│ ├── license/
│ │ └── mod.rs # 许可证管理
│ ├── logging/
│ │ └── mod.rs # 日志管理
│ ├── server/
│ │ ├── mod.rs # 服务器实现
│ │ ├── connection.rs # 连接处理
│ │ ├── service.rs # 服务实现
│ │ └── state.rs # 状态管理
│ ├── storage/
│ │ ├── mod.rs # 存储模块
│ │ └── fs.rs # 文件系统实现
│ └── utils/
│ └── mod.rs # 工具函数
# RustFS
RustFS is a simple file system written in Rust. It is designed to be a learning project for those who want to understand
how file systems work and how to implement them in Rust.
## Features
- Simple file system structure
- Basic file operations (create, read, write, delete)
- Directory support
- File metadata (size, creation time, etc.)
- Basic error handling
- Unit tests for core functionality
- Documentation for public API
- Example usage
- License information
- Contributing guidelines
- Changelog
- Code of conduct
- Acknowledgements
- Contact information
- Links to additional resources
## Getting Started
To get started with RustFS, clone the repository and build the project:
```bash
git clone git@github.com:rustfs/s3-rustfs.git
cd rustfs
cargo build
```
## Usage
To use RustFS, you can create a new file system instance and perform basic file operations. Here is an example:

View File

@@ -1,40 +1,8 @@
use clap::Parser;
use const_str::concat;
use ecstore::global::DEFAULT_PORT;
use std::string::ToString;
shadow_rs::shadow!(build);
/// Default Access Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_ACCESS_KEY
/// Command line argument: --access-key
/// Example: RUSTFS_ACCESS_KEY=rustfsadmin
/// Example: --access-key rustfsadmin
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Default Secret Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_SECRET_KEY
/// Command line argument: --secret-key
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Default configuration file for observability
/// Default value: config/obs.toml
/// Environment variable: RUSTFS_OBS_CONFIG
/// Command line argument: --obs-config
/// Example: RUSTFS_OBS_CONFIG=config/obs.toml
/// Example: --obs-config config/obs.toml
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.
pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_key.pem";
/// Default TLS cert for rustfs
/// This is the default cert for TLS.
pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem";
#[allow(clippy::const_is_empty)]
const SHORT_VERSION: &str = {
if !build::TAG.is_empty() {
@@ -67,7 +35,7 @@ pub struct Opt {
pub volumes: Vec<String>,
/// bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname
#[arg(long, default_value_t = format!("0.0.0.0:{}", DEFAULT_PORT), env = "RUSTFS_ADDRESS")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_ADDRESS.to_string(), env = "RUSTFS_ADDRESS")]
pub address: String,
/// Domain name used for virtual-hosted-style requests.
@@ -75,17 +43,19 @@ pub struct Opt {
pub server_domains: Vec<String>,
/// Access key used for authentication.
#[arg(long, default_value_t = DEFAULT_ACCESS_KEY.to_string(), env = "RUSTFS_ACCESS_KEY")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_ACCESS_KEY.to_string(), env = "RUSTFS_ACCESS_KEY")]
pub access_key: String,
/// Secret key used for authentication.
#[arg(long, default_value_t = DEFAULT_SECRET_KEY.to_string(), env = "RUSTFS_SECRET_KEY")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_SECRET_KEY.to_string(), env = "RUSTFS_SECRET_KEY")]
pub secret_key: String,
/// Enable console server
#[arg(long, default_value_t = false, env = "RUSTFS_CONSOLE_ENABLE")]
pub console_enable: bool,
#[arg(long, default_value_t = format!("127.0.0.1:{}", 9002), env = "RUSTFS_CONSOLE_ADDRESS")]
/// Console server bind address
#[arg(long, default_value_t = rustfs_config::DEFAULT_CONSOLE_ADDRESS.to_string(), env = "RUSTFS_CONSOLE_ADDRESS")]
pub console_address: String,
/// rustfs endpoint for console
@@ -94,7 +64,7 @@ pub struct Opt {
/// Observability configuration file
/// Default value: config/obs.toml
#[arg(long, default_value_t = DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")]
#[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")]
pub obs_config: String,
/// tls path for rustfs api and console.

View File

@@ -1,4 +1,3 @@
use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use crate::license::get_license;
use axum::{
body::Body,
@@ -8,6 +7,7 @@ use axum::{
Router,
};
use axum_extra::extract::Host;
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use std::io;
use axum::response::Redirect;
@@ -17,7 +17,7 @@ use mime_guess::from_path;
use rust_embed::RustEmbed;
use serde::Serialize;
use shadow_rs::shadow;
use std::net::{Ipv4Addr, SocketAddr};
use std::net::{IpAddr, SocketAddr};
use std::sync::OnceLock;
use std::time::Duration;
use tokio::signal;
@@ -33,7 +33,8 @@ const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3";
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct StaticFiles;
async fn static_handler(uri: axum::http::Uri) -> impl IntoResponse {
/// Static file handler
async fn static_handler(uri: Uri) -> impl IntoResponse {
let mut path = uri.path().trim_start_matches('/');
if path.is_empty() {
path = "index.html"
@@ -72,7 +73,7 @@ pub(crate) struct Config {
}
impl Config {
fn new(local_ip: Ipv4Addr, port: u16, version: &str, date: &str) -> Self {
fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self {
Config {
port,
api: Api {
@@ -143,7 +144,7 @@ struct License {
pub(crate) static CONSOLE_CONFIG: OnceLock<Config> = OnceLock::new();
#[allow(clippy::const_is_empty)]
pub(crate) fn init_console_cfg(local_ip: Ipv4Addr, port: u16) {
pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) {
CONSOLE_CONFIG.get_or_init(|| {
let ver = {
if !build::TAG.is_empty() {
@@ -193,7 +194,7 @@ fn _is_private_ip(ip: std::net::IpAddr) -> bool {
async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse {
let scheme = uri.scheme().map(|s| s.as_str()).unwrap_or("http");
// 从 uri 中获取 host如果没有则使用 Host extractor 的值
// Get the host from the uri and use the value of the host extractor if it doesn't have one
let host = uri.host().unwrap_or(host.as_str());
let host = if host.contains(':') {
@@ -203,7 +204,7 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse {
host
};
// 将当前配置复制一份
// Make a copy of the current configuration
let mut cfg = CONSOLE_CONFIG.get().unwrap().clone();
let url = format!("{}://{}:{}", scheme, host, cfg.port);
@@ -219,14 +220,14 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse {
pub async fn start_static_file_server(
addrs: &str,
local_ip: Ipv4Addr,
local_ip: IpAddr,
access_key: &str,
secret_key: &str,
tls_path: Option<String>,
) {
// 配置 CORS
// Configure CORS
let cors = CorsLayer::new()
.allow_origin(Any) // 生产环境建议指定具体域名
.allow_origin(Any) // In the production environment, we recommend that you specify a specific domain name
.allow_methods([http::Method::GET, http::Method::POST])
.allow_headers([header::CONTENT_TYPE]);
// Create a route
@@ -298,7 +299,7 @@ async fn start_server(server_addr: SocketAddr, tls_path: Option<String>, app: Ro
}
#[allow(dead_code)]
/// HTTP HTTPS 的 308 重定向
/// 308 redirect for HTTP to HTTPS
fn redirect_to_https(https_port: u16) -> Router {
Router::new().route(
"/*path",

21
rustfs/src/event.rs Normal file
View File

@@ -0,0 +1,21 @@
use rustfs_event_notifier::NotifierConfig;
use tracing::{error, info, instrument};
#[instrument]
pub(crate) async fn init_event_notifier(notifier_config: Option<String>) {
// Initialize event notifier
if notifier_config.is_some() {
info!("event_config is not empty");
tokio::spawn(async move {
let config = NotifierConfig::event_load_config(notifier_config);
let result = rustfs_event_notifier::initialize(config).await;
if let Err(e) = result {
error!("Failed to initialize event notifier: {}", e);
} else {
info!("Event notifier initialized successfully");
}
});
} else {
info!("event_config is empty");
}
}

View File

@@ -10,6 +10,7 @@ lazy_static::lazy_static! {
static ref LICENSE: OnceLock<Token> = OnceLock::new();
}
/// Initialize the license
pub fn init_license(license: Option<String>) {
if license.is_none() {
error!("License is None");
@@ -23,10 +24,13 @@ pub fn init_license(license: Option<String>) {
});
}
/// Get the license
pub fn get_license() -> Option<Token> {
LICENSE.get().cloned()
}
/// Check the license
/// This function checks if the license is valid.
#[allow(unreachable_code)]
pub fn license_check() -> Result<()> {
return Ok(());

View File

@@ -2,18 +2,18 @@ mod admin;
mod auth;
mod config;
mod console;
mod event;
mod grpc;
pub mod license;
mod logging;
mod server;
mod service;
mod storage;
mod utils;
use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use crate::utils::error;
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
@@ -21,7 +21,6 @@ use common::{
error::{Error, Result},
globals::set_global_addr,
};
use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
@@ -48,11 +47,12 @@ use hyper_util::{
use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_event_notifier::NotifierConfig;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use socket2::SockRef;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
@@ -62,8 +62,10 @@ use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::Span;
use tracing::{debug, error, info, info_span, warn};
use tracing::{debug, error, info, warn};
use tracing::{instrument, Span};
const MI_B: usize = 1024 * 1024;
#[cfg(all(target_os = "linux", target_env = "gnu"))]
#[global_allocator]
@@ -77,12 +79,12 @@ fn check_auth(req: Request<()>) -> Result<Request<()>, Status> {
_ => Err(Status::unauthenticated("No valid auth token")),
}
}
#[instrument]
fn print_server_info() {
let cfg = CONSOLE_CONFIG.get().unwrap();
let current_year = chrono::Utc::now().year();
// 使用自定义宏打印服务器信息
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
info!("License: {}", cfg.license());
@@ -95,8 +97,7 @@ async fn main() -> Result<()> {
// Parse the obtained parameters
let opt = config::Opt::parse();
// config::init_config(opt.clone());
// Initialize the configuration
init_license(opt.license.clone());
// Load the configuration file
@@ -108,50 +109,34 @@ async fn main() -> Result<()> {
// Store in global storage
set_global_guard(guard)?;
// Initialize event notifier
let notifier_config = opt.clone().event_config;
if notifier_config.is_some() {
info!("event_config is not empty");
tokio::spawn(async move {
let config = NotifierConfig::event_load_config(notifier_config);
let result = rustfs_event_notifier::initialize(config).await;
if let Err(e) = result {
error!("Failed to initialize event notifier: {}", e);
} else {
info!("Event notifier initialized successfully");
}
});
} else {
info!("event_config is empty");
}
// Run parameters
run(opt).await
}
// #[tokio::main]
#[instrument(skip(opt))]
async fn run(opt: config::Opt) -> Result<()> {
let span = info_span!("trace-main-run");
let _enter = span.enter();
debug!("opt: {:?}", &opt);
// Initialize event notifier
event::init_event_notifier(opt.event_config).await;
let server_addr = net::parse_and_resolve_address(opt.address.as_str())?;
let server_port = server_addr.port();
let server_address = server_addr.to_string();
debug!("server_address {}", &server_address);
//设置 AK SK
// Set up AK and SK
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?;
set_global_rustfs_port(server_port);
//监听地址,端口从参数中获取
// The listening address and port are obtained from the parameters
let listener = TcpListener::bind(server_address.clone()).await?;
//获取监听地址
// Obtain the listener address
let local_addr: SocketAddr = listener.local_addr()?;
let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
// let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
let local_ip = rustfs_utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
// 用于 rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone())
@@ -175,7 +160,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// Detailed endpoint information (showing all API endpoints)
let api_endpoints = format!("http://{}:{}", local_ip, server_port);
let localhost_endpoint = format!("http://127.0.0.1:{}", server_port);
info!("API: {} {}", api_endpoints, localhost_endpoint);
info!(" API: {} {}", api_endpoints, localhost_endpoint);
info!(" RootUser: {}", opt.access_key.clone());
info!(" RootPass: {}", opt.secret_key.clone());
if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) {
@@ -198,13 +183,13 @@ async fn run(opt: config::Opt) -> Result<()> {
set_global_endpoints(endpoint_pools.as_ref().clone());
update_erasure_type(setup_type).await;
// 初始化本地磁盘
// Initialize the local disk
init_local_disks(endpoint_pools.clone())
.await
.map_err(|err| Error::from_string(err.to_string()))?;
// Setup S3 service
// 本项目使用 s3s 库来实现 s3 服务
// This project uses the S3S library to implement S3 services
let s3_service = {
let store = storage::ecfs::FS::new();
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?);
@@ -212,7 +197,7 @@ async fn run(opt: config::Opt) -> Result<()> {
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
//显示 info 信息
// Displays info information
debug!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(IAMAuth::new(access_key, secret_key));
@@ -263,7 +248,7 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("Found TLS directory, checking for certificates");
// 1. Try to load all certificates directly (including root and subdirectories)
match utils::load_all_certs_from_directory(&tls_path) {
match rustfs_utils::load_all_certs_from_directory(&tls_path) {
Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => {
debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
@@ -271,7 +256,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// create a multi certificate configuration
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(utils::create_multi_cert_resolver(cert_key_pairs)?));
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
@@ -286,12 +271,14 @@ async fn run(opt: config::Opt) -> Result<()> {
if has_single_cert {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?;
let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?;
let certs =
rustfs_utils::load_certs(cert_path.as_str()).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let key = rustfs_utils::load_private_key(key_path.as_str())
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| error(e.to_string()))?;
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
} else {
@@ -339,7 +326,7 @@ async fn run(opt: config::Opt) -> Result<()> {
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::debug_span!("http-request",
let span = tracing::info_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
@@ -368,13 +355,14 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body lenght",);
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
@@ -425,15 +413,22 @@ async fn run(opt: config::Opt) -> Result<()> {
}
};
if let Err(err) = socket.set_nodelay(true) {
let socket_ref = SockRef::from(&socket);
if let Err(err) = socket_ref.set_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) {
warn!(?err, "Failed to set set_recv_buffer_size");
}
if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) {
warn!(?err, "Failed to set set_send_buffer_size");
}
if has_tls_certs {
debug!("TLS certificates found, starting with SIGINT");
let tls_socket = match tls_acceptor
.as_ref()
.ok_or_else(|| error("TLS not configured".to_string()))
.ok_or_else(|| rustfs_utils::certs_error("TLS not configured".to_string()))
.unwrap()
.accept(socket)
.await

View File

@@ -129,7 +129,7 @@ impl Default for ServiceStateManager {
}
}
// 使用示例
// Example of use
#[cfg(test)]
mod tests {
use super::*;
@@ -138,18 +138,18 @@ mod tests {
fn test_service_state_manager() {
let manager = ServiceStateManager::new();
// 初始状态应该是 Starting
// The initial state should be Starting
assert_eq!(manager.current_state(), ServiceState::Starting);
// 更新状态到 Ready
// Update the status to Ready
manager.update(ServiceState::Ready);
assert_eq!(manager.current_state(), ServiceState::Ready);
// 更新状态到 Stopping
// Update the status to Stopping
manager.update(ServiceState::Stopping);
assert_eq!(manager.current_state(), ServiceState::Stopping);
// 更新状态到 Stopped
// Update the status to Stopped
manager.update(ServiceState::Stopped);
assert_eq!(manager.current_state(), ServiceState::Stopped);
}

View File

@@ -20,6 +20,7 @@ pub(crate) struct ReqInfo {
pub version_id: Option<String>,
}
/// Authorizes the request based on the action and credentials.
pub async fn authorize_request<T>(req: &mut S3Request<T>, action: Action) -> S3Result<()> {
let req_info = req.extensions.get_mut::<ReqInfo>().expect("ReqInfo not found");

View File

@@ -62,6 +62,7 @@ use s3s::S3;
use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
@@ -141,6 +142,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
/// Copy an object from one location to another
#[tracing::instrument(level = "debug", skip(self, req))]
async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
let CopyObjectInput {
@@ -227,6 +229,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
/// Delete a bucket
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let input = req.input;
@@ -249,6 +252,7 @@ impl S3 for FS {
Ok(S3Response::new(DeleteBucketOutput {}))
}
/// Delete an object
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let DeleteObjectInput {
@@ -308,6 +312,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
/// Delete multiple objects
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
// info!("delete_objects args {:?}", req.input);
@@ -367,6 +372,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
/// Get bucket location
#[tracing::instrument(level = "debug", skip(self, req))]
async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
// mc get 1
@@ -385,6 +391,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
/// Get bucket notification
#[tracing::instrument(
level = "debug",
skip(self, req),
@@ -1890,14 +1897,14 @@ impl S3 for FS {
) -> S3Result<S3Response<SelectObjectContentOutput>> {
info!("handle select_object_content");
let input = req.input;
let input = Arc::new(req.input);
info!("{:?}", input);
let db = make_rustfsms(input.clone(), false).await.map_err(|e| {
error!("make db failed, {}", e.to_string());
s3_error!(InternalError, "{}", e.to_string())
})?;
let query = Query::new(Context { input: input.clone() }, input.request.expression);
let query = Query::new(Context { input: input.clone() }, input.request.expression.clone());
let result = db
.execute(&query)
.await

View File

@@ -0,0 +1,17 @@
use rustfs_event_notifier::{Event, Metadata};
/// Create a new metadata object
#[allow(dead_code)]
pub(crate) fn create_metadata() -> Metadata {
// Create a new metadata object
let mut metadata = Metadata::new();
metadata.set_configuration_id("test-config".to_string());
// Return the created metadata object
metadata
}
/// Create a new event object
#[allow(dead_code)]
pub(crate) async fn send_event(event: Event) -> Result<(), Box<dyn std::error::Error>> {
rustfs_event_notifier::send_event(event).await.map_err(|e| e.into())
}

View File

@@ -1,4 +1,5 @@
pub mod access;
pub mod ecfs;
pub mod error;
mod event_notifier;
pub mod options;

View File

@@ -8,6 +8,7 @@ use lazy_static::lazy_static;
use std::collections::HashMap;
use uuid::Uuid;
/// Creates options for deleting an object in a bucket.
pub async fn del_opts(
bucket: &str,
object: &str,
@@ -56,6 +57,7 @@ pub async fn del_opts(
Ok(opts)
}
/// Creates options for getting an object from a bucket.
pub async fn get_opts(
bucket: &str,
object: &str,
@@ -105,6 +107,7 @@ pub async fn get_opts(
Ok(opts)
}
/// Creates options for putting an object in a bucket.
pub async fn put_opts(
bucket: &str,
object: &str,
@@ -151,6 +154,7 @@ pub async fn put_opts(
Ok(opts)
}
/// Creates options for copying an object in a bucket.
pub async fn copy_dst_opts(
bucket: &str,
object: &str,
@@ -172,6 +176,7 @@ pub fn put_opts_from_headers(
get_default_opts(headers, metadata, false)
}
/// Creates default options for getting an object from a bucket.
pub fn get_default_opts(
_headers: &HeaderMap<HeaderValue>,
metadata: Option<HashMap<String, String>>,
@@ -183,6 +188,7 @@ pub fn get_default_opts(
})
}
/// Extracts metadata from headers and returns it as a HashMap.
pub fn extract_metadata(headers: &HeaderMap<HeaderValue>) -> HashMap<String, String> {
let mut metadata = HashMap::new();
@@ -191,6 +197,7 @@ pub fn extract_metadata(headers: &HeaderMap<HeaderValue>) -> HashMap<String, Str
metadata
}
/// Extracts metadata from headers and returns it as a HashMap.
pub fn extract_metadata_from_mime(headers: &HeaderMap<HeaderValue>, metadata: &mut HashMap<String, String>) {
for (k, v) in headers.iter() {
if let Some(key) = k.as_str().strip_prefix("x-amz-meta-") {
@@ -219,7 +226,9 @@ pub fn extract_metadata_from_mime(headers: &HeaderMap<HeaderValue>, metadata: &m
metadata.insert("content-type".to_owned(), "binary/octet-stream".to_owned());
}
}
lazy_static! {
/// List of supported headers.
static ref SUPPORTED_HEADERS: Vec<&'static str> = vec![
"content-type",
"cache-control",

View File

@@ -7,12 +7,14 @@ edition.workspace = true
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
common.workspace = true
datafusion = { workspace = true }
ecstore.workspace = true
futures = { workspace = true }
futures-core = { workspace = true }
http.workspace = true
object_store = { workspace = true }
pin-project-lite.workspace = true
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio.workspace = true

View File

@@ -1,6 +1,7 @@
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use common::DEFAULT_DELIMITER;
use ecstore::io::READ_BUFFER_SIZE;
use ecstore::new_object_layer_fn;
use ecstore::store::ECStore;
@@ -24,29 +25,54 @@ use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::{Error as o_Error, Result};
use pin_project_lite::pin_project;
use s3s::dto::SelectObjectContentInput;
use s3s::s3_error;
use s3s::S3Result;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::ready;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use tracing::info;
use transform_stream::AsyncTryStream;
#[derive(Debug)]
pub struct EcObjectStore {
input: SelectObjectContentInput,
input: Arc<SelectObjectContentInput>,
need_convert: bool,
delimiter: String,
store: Arc<ECStore>,
}
impl EcObjectStore {
pub fn new(input: SelectObjectContentInput) -> S3Result<Self> {
pub fn new(input: Arc<SelectObjectContentInput>) -> S3Result<Self> {
let Some(store) = new_object_layer_fn() else {
return Err(s3_error!(InternalError, "ec store not inited"));
};
Ok(Self { input, store })
let (need_convert, delimiter) = if let Some(csv) = input.request.input_serialization.csv.as_ref() {
if let Some(delimiter) = csv.field_delimiter.as_ref() {
if delimiter.len() > 1 {
(true, delimiter.to_owned())
} else {
(false, String::new())
}
} else {
(false, String::new())
}
} else {
(false, String::new())
};
Ok(Self {
input,
need_convert,
delimiter,
store,
})
}
}
@@ -79,16 +105,6 @@ impl ObjectStore for EcObjectStore {
source: "can not get object info".into(),
})?;
// let stream = stream::unfold(reader.stream, |mut blob| async move {
// match blob.next().await {
// Some(Ok(chunk)) => {
// let bytes = chunk;
// Some((Ok(bytes), blob))
// }
// _ => None,
// }
// })
// .boxed();
let meta = ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
@@ -98,10 +114,21 @@ impl ObjectStore for EcObjectStore {
};
let attributes = Attributes::default();
Ok(GetResult {
payload: object_store::GetResultPayload::Stream(
let payload = if self.need_convert {
object_store::GetResultPayload::Stream(
bytes_stream(
ReaderStream::with_capacity(ConvertStream::new(reader.stream, self.delimiter.clone()), READ_BUFFER_SIZE),
reader.object_info.size,
)
.boxed(),
)
} else {
object_store::GetResultPayload::Stream(
bytes_stream(ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE), reader.object_info.size).boxed(),
),
)
};
Ok(GetResult {
payload,
meta,
range: 0..reader.object_info.size,
attributes,
@@ -154,6 +181,54 @@ impl ObjectStore for EcObjectStore {
}
}
pin_project! {
struct ConvertStream<R> {
inner: R,
delimiter: Vec<u8>,
}
}
impl<R> ConvertStream<R> {
fn new(inner: R, delimiter: String) -> Self {
ConvertStream {
inner,
delimiter: delimiter.as_bytes().to_vec(),
}
}
}
impl<R: AsyncRead + Unpin> AsyncRead for ConvertStream<R> {
#[tracing::instrument(level = "debug", skip_all)]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let me = self.project();
ready!(Pin::new(&mut *me.inner).poll_read(cx, buf))?;
let bytes = buf.filled();
let replaced = replace_symbol(me.delimiter, bytes);
buf.clear();
buf.put_slice(&replaced);
Poll::Ready(Ok(()))
}
}
fn replace_symbol(delimiter: &[u8], slice: &[u8]) -> Vec<u8> {
let mut result = Vec::with_capacity(slice.len());
let mut i = 0;
while i < slice.len() {
if slice[i..].starts_with(delimiter) {
result.push(DEFAULT_DELIMITER);
i += delimiter.len();
} else {
result.push(slice[i]);
i += 1;
}
}
result
}
pub fn bytes_stream<S>(stream: S, content_length: usize) -> impl Stream<Item = Result<Bytes>> + Send + 'static
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
@@ -175,3 +250,21 @@ where
Ok(())
})
}
#[cfg(test)]
mod test {
use super::replace_symbol;
#[test]
fn test_replace() {
let ss = String::from("dandan&&is&&best");
let slice = ss.as_bytes();
let delimiter = b"&&";
println!("len: {}", "".len());
let result = replace_symbol(delimiter, slice);
match String::from_utf8(result) {
Ok(s) => println!("slice: {}", s),
Err(e) => eprintln!("Error converting to string: {}", e),
}
}
}

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use s3s::dto::SelectObjectContentInput;
pub mod analyzer;
@@ -16,7 +18,7 @@ pub mod session;
#[derive(Clone)]
pub struct Context {
// maybe we need transfer some info?
pub input: SelectObjectContentInput,
pub input: Arc<SelectObjectContentInput>,
}
#[derive(Clone)]

View File

@@ -66,7 +66,7 @@ impl SessionCtxFactory {
9,Ivy,24,Marketing,4800
10,Jack,38,Finance,7500";
let data_bytes = data.as_bytes();
// let data = r#""year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank"
// let data = r#""year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"SOPHIA"╦"119"╦"1"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"CHLOE"╦"106"╦"2"
// "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMILY"╦"93"╦"3"

View File

@@ -49,7 +49,7 @@ lazy_static! {
#[derive(Clone)]
pub struct SimpleQueryDispatcher {
input: SelectObjectContentInput,
input: Arc<SelectObjectContentInput>,
// client for default tenant
_default_table_provider: TableHandleProviderRef,
session_factory: Arc<SessionCtxFactory>,
@@ -164,7 +164,9 @@ impl SimpleQueryDispatcher {
.map(|e| e.as_bytes().first().copied().unwrap_or_default()),
);
if let Some(delimiter) = csv.field_delimiter.as_ref() {
file_format = file_format.with_delimiter(delimiter.as_bytes().first().copied().unwrap_or_default());
if delimiter.len() == 1 {
file_format = file_format.with_delimiter(delimiter.as_bytes()[0]);
}
}
// TODO waiting for processing @junxiang Mu
// if csv.file_header_info.is_some() {}
@@ -272,7 +274,7 @@ impl Stream for TrackedRecordBatchStream {
#[derive(Default, Clone)]
pub struct SimpleQueryDispatcherBuilder {
input: Option<SelectObjectContentInput>,
input: Option<Arc<SelectObjectContentInput>>,
default_table_provider: Option<TableHandleProviderRef>,
session_factory: Option<Arc<SessionCtxFactory>>,
parser: Option<Arc<dyn Parser + Send + Sync>>,
@@ -283,7 +285,7 @@ pub struct SimpleQueryDispatcherBuilder {
}
impl SimpleQueryDispatcherBuilder {
pub fn with_input(mut self, input: SelectObjectContentInput) -> Self {
pub fn with_input(mut self, input: Arc<SelectObjectContentInput>) -> Self {
self.input = Some(input);
self
}

View File

@@ -63,7 +63,7 @@ where
}
}
pub async fn make_rustfsms(input: SelectObjectContentInput, is_test: bool) -> QueryResult<impl DatabaseManagerSystem> {
pub async fn make_rustfsms(input: Arc<SelectObjectContentInput>, is_test: bool) -> QueryResult<impl DatabaseManagerSystem> {
// init Function Manager, we can define some UDF if need
let func_manager = SimpleFunctionMetadataManager::default();
// TODO session config need load global system config
@@ -95,6 +95,8 @@ pub async fn make_rustfsms(input: SelectObjectContentInput, is_test: bool) -> Qu
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::{
query::{Context, Query},
server::dbms::DatabaseManagerSystem,
@@ -111,7 +113,7 @@ mod tests {
#[ignore]
async fn test_simple_sql() {
let sql = "select * from S3Object";
let input = SelectObjectContentInput {
let input = Arc::new(SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
@@ -135,7 +137,7 @@ mod tests {
request_progress: None,
scan_range: None,
},
};
});
let db = make_rustfsms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());
@@ -167,8 +169,8 @@ mod tests {
#[tokio::test]
#[ignore]
async fn test_func_sql() {
let sql = "SELECT s._1 FROM S3Object s";
let input = SelectObjectContentInput {
let sql = "SELECT * FROM S3Object s";
let input = Arc::new(SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
@@ -194,7 +196,7 @@ mod tests {
request_progress: None,
scan_range: None,
},
};
});
let db = make_rustfsms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());

View File

@@ -33,7 +33,7 @@ export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS=":9002"
# export RUSTFS_SERVER_DOMAINS="localhost:9000"
# HTTPS 证书目录
# export RUSTFS_TLS_PATH="./deploy/certs"
# export RUSTFS_TLS_PATH="./deploy/certs"
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一
export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"