diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index a1c92ecf..00000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,7 +0,0 @@ -[target.x86_64-unknown-linux-gnu] -rustflags = [ - "-C", "link-arg=-fuse-ld=bfd" -] - -[target.x86_64-unknown-linux-musl] -linker = "x86_64-linux-musl-gcc" \ No newline at end of file diff --git a/.docker/observability/README_ZH.md b/.docker/observability/README_ZH.md index 7ba5342b..53b6db0b 100644 --- a/.docker/observability/README_ZH.md +++ b/.docker/observability/README_ZH.md @@ -9,7 +9,7 @@ Jaeger、Prometheus 等)而需要运行和维护多个代理/收集器的必 2. 执行以下命令启动服务: ```bash -docker compose up -d -f docker-compose.yml +docker compose -f docker-compose.yml up -d ``` ### 访问监控面板 @@ -34,7 +34,7 @@ docker compose up -d -f docker-compose.yml | service_name | 服务名称 | rustfs | | service_version | 服务版本 | 1.0.0 | | environment | 运行环境 | production | -| meter_interval | 指标导出间隔 (秒) | 30 | +| meter_interval | 指标导出间隔 (秒) | 30 | | sample_ratio | 采样率 | 1.0 | | use_stdout | 是否输出到控制台 | true/false | | logger_level | 日志级别 | info | diff --git a/.docker/observability/config/obs-multi.toml b/.docker/observability/config/obs-multi.toml new file mode 100644 index 00000000..e4ea037b --- /dev/null +++ b/.docker/observability/config/obs-multi.toml @@ -0,0 +1,34 @@ +[observability] +endpoint = "http://otel-collector:4317" # Default is "http://localhost:4317" if not specified +use_stdout = false # Output with stdout, true output, false no output +sample_ratio = 2.0 +meter_interval = 30 +service_name = "rustfs" +service_version = "0.1.0" +environments = "production" +logger_level = "debug" +local_logging_enabled = true + +[sinks] +[sinks.kafka] # Kafka sink is disabled by default +enabled = false +bootstrap_servers = "localhost:9092" +topic = "logs" +batch_size = 100 # Default is 100 if not specified +batch_timeout_ms = 1000 # Default is 1000ms if not specified + +[sinks.webhook] +enabled = false +endpoint = "http://localhost:8080/webhook" +auth_token = "" +batch_size = 100 # Default is 3 if not specified +batch_timeout_ms = 1000 # Default is 100ms if not specified + +[sinks.file] +enabled = true +path = "/root/data/logs/app.log" +batch_size = 10 +batch_timeout_ms = 1000 # Default is 8192 bytes if not specified + +[logger] +queue_capacity = 10 \ No newline at end of file diff --git a/.docker/observability/config/obs.toml b/.docker/observability/config/obs.toml index e4ea037b..f77c25d8 100644 --- a/.docker/observability/config/obs.toml +++ b/.docker/observability/config/obs.toml @@ -1,5 +1,5 @@ [observability] -endpoint = "http://otel-collector:4317" # Default is "http://localhost:4317" if not specified +endpoint = "http://localhost:4317" # Default is "http://localhost:4317" if not specified use_stdout = false # Output with stdout, true output, false no output sample_ratio = 2.0 meter_interval = 30 diff --git a/.docker/observability/docker-compose.yml b/.docker/observability/docker-compose.yml index f6714bb2..55e4f84c 100644 --- a/.docker/observability/docker-compose.yml +++ b/.docker/observability/docker-compose.yml @@ -1,6 +1,6 @@ services: otel-collector: - image: otel/opentelemetry-collector-contrib:0.120.0 + image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.124.0 environment: - TZ=Asia/Shanghai volumes: @@ -16,7 +16,7 @@ services: networks: - otel-network jaeger: - image: jaegertracing/jaeger:2.4.0 + image: jaegertracing/jaeger:2.5.0 environment: - TZ=Asia/Shanghai ports: @@ -26,7 +26,7 @@ services: networks: - otel-network prometheus: - image: prom/prometheus:v3.2.1 + image: prom/prometheus:v3.3.0 environment: - TZ=Asia/Shanghai volumes: @@ -36,7 +36,7 @@ services: networks: - otel-network loki: - image: grafana/loki:3.4.2 + image: grafana/loki:3.5.0 environment: - TZ=Asia/Shanghai volumes: @@ -47,7 +47,7 @@ services: networks: - otel-network grafana: - image: grafana/grafana:11.6.0 + image: grafana/grafana:11.6.1 ports: - "3000:3000" # Web UI environment: diff --git a/Cargo.lock b/Cargo.lock index 81d6b4c3..f3407172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,12 +185,14 @@ dependencies = [ "async-trait", "bytes", "chrono", + "common", "datafusion", "ecstore", "futures", "futures-core", "http", "object_store", + "pin-project-lite", "s3s", "snafu", "tokio", @@ -711,9 +713,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288" +checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ "axum-core", "bytes", @@ -799,7 +801,7 @@ dependencies = [ "hyper", "hyper-util", "pin-project-lite", - "rustls 0.23.26", + "rustls 0.23.27", "rustls-pemfile", "rustls-pki-types", "tokio", @@ -1204,9 +1206,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1430,7 +1432,7 @@ dependencies = [ "lazy_static", "scopeguard", "tokio", - "tonic 0.13.0", + "tonic 0.13.1", "tracing-error", ] @@ -3061,7 +3063,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tonic 0.13.0", + "tonic 0.13.1", "tower 0.5.2", "url", ] @@ -3091,7 +3093,7 @@ dependencies = [ "madmin", "md-5", "netif", - "nix", + "nix 0.30.1", "num", "num_cpus", "path-absolutize", @@ -3105,6 +3107,7 @@ dependencies = [ "reqwest", "rmp", "rmp-serde", + "rustfs-config", "s3s", "s3s-policy", "serde", @@ -3119,7 +3122,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic 0.13.0", + "tonic 0.13.1", "tower 0.5.2", "tracing", "tracing-error", @@ -4129,7 +4132,7 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls 0.23.26", + "rustls 0.23.27", "rustls-pki-types", "tokio", "tokio-rustls 0.26.2", @@ -4780,14 +4783,14 @@ dependencies = [ [[package]] name = "libsystemd" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b85fe9dc49de659d05829fdf72b5770c0a5952d1055c34a39f6d4e932bce175d" +checksum = "19c97a761fc86953c5b885422b22c891dbf5bcb9dcc99d0110d6ce4c052759f0" dependencies = [ "hmac 0.12.1", "libc", "log", - "nix", + "nix 0.29.0", "nom 8.0.0", "once_cell", "serde", @@ -4847,13 +4850,13 @@ checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" [[package]] name = "local-ip-address" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782" +checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8" dependencies = [ "libc", "neli", - "thiserror 1.0.69", + "thiserror 2.0.12", "windows-sys 0.59.0", ] @@ -4870,7 +4873,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tonic 0.13.0", + "tonic 0.13.1", "tracing", "tracing-error", "url", @@ -5225,6 +5228,18 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -6587,7 +6602,7 @@ dependencies = [ "prost-build", "protobuf", "tokio", - "tonic 0.13.0", + "tonic 0.13.1", "tonic-build", "tower 0.5.2", ] @@ -6641,7 +6656,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls 0.23.26", + "rustls 0.23.27", "socket2", "thiserror 2.0.12", "tokio", @@ -6660,7 +6675,7 @@ dependencies = [ "rand 0.9.1", "ring", "rustc-hash 2.1.1", - "rustls 0.23.26", + "rustls 0.23.27", "rustls-pki-types", "slab", "thiserror 2.0.12", @@ -6986,7 +7001,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.26", + "rustls 0.23.27", "rustls-pemfile", "rustls-pki-types", "serde", @@ -7143,9 +7158,9 @@ dependencies = [ [[package]] name = "rust-embed" -version = "8.7.0" +version = "8.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5fbc0ee50fcb99af7cebb442e5df7b5b45e9460ffa3f8f549cd26b862bec49d" +checksum = "60e425e204264b144d4c929d126d0de524b40a961686414bab5040f7465c71be" dependencies = [ "rust-embed-impl", "rust-embed-utils", @@ -7244,7 +7259,6 @@ dependencies = [ "iam", "lazy_static", "libsystemd", - "local-ip-address", "lock", "madmin", "matchit", @@ -7259,23 +7273,24 @@ dependencies = [ "query", "rmp-serde", "rust-embed", + "rustfs-config", "rustfs-event-notifier", "rustfs-obs", - "rustls 0.23.26", - "rustls-pemfile", - "rustls-pki-types", + "rustfs-utils", + "rustls 0.23.27", "s3s", "serde", "serde_json", "serde_urlencoded", "shadow-rs", + "socket2", "tikv-jemallocator", "time", "tokio", "tokio-rustls 0.26.2", "tokio-stream", "tokio-util", - "tonic 0.13.0", + "tonic 0.13.1", "tonic-build", "tower 0.5.2", "tower-http", @@ -7284,6 +7299,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "rustfs-config" +version = "0.0.1" +dependencies = [ + "config", + "const-str", + "serde", + "serde_json", +] + [[package]] name = "rustfs-event-notifier" version = "0.0.1" @@ -7358,6 +7383,18 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "rustfs-utils" +version = "0.0.1" +dependencies = [ + "local-ip-address", + "rustfs-config", + "rustls 0.23.27", + "rustls-pemfile", + "rustls-pki-types", + "tracing", +] + [[package]] name = "rustix" version = "0.38.44" @@ -7400,16 +7437,16 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.26" +version = "0.23.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" +checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" dependencies = [ "aws-lc-rs", "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.1", + "rustls-webpki 0.103.2", "subtle", "zeroize", ] @@ -7458,9 +7495,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.1" +version = "0.103.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" +checksum = "7149975849f1abb3832b246010ef62ccc80d3a76169517ada7188252b9cfb437" dependencies = [ "aws-lc-rs", "ring", @@ -8596,9 +8633,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.2" +version = "1.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" dependencies = [ "backtrace", "bytes", @@ -8641,7 +8678,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.26", + "rustls 0.23.27", "tokio", ] @@ -8658,9 +8695,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", @@ -8755,9 +8792,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85839f0b32fd242bb3209262371d07feda6d780d16ee9d2bc88581b89da1549b" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" dependencies = [ "async-trait", "axum", @@ -8785,9 +8822,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d85f0383fadd15609306383a90e85eaed44169f931a5d2be1b42c76ceff1825e" +checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" dependencies = [ "prettyplease", "proc-macro2", @@ -10252,7 +10289,7 @@ dependencies = [ "futures-sink", "futures-util", "hex", - "nix", + "nix 0.29.0", "ordered-stream", "rand 0.8.5", "serde", @@ -10283,7 +10320,7 @@ dependencies = [ "futures-core", "futures-lite", "hex", - "nix", + "nix 0.29.0", "ordered-stream", "serde", "serde_repr", diff --git a/Cargo.toml b/Cargo.toml index 684dfb53..8f486f5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,21 +1,23 @@ [workspace] members = [ - "madmin", # Management dashboard and admin API interface - "rustfs", # Core file system implementation - "ecstore", # Erasure coding storage implementation - "e2e_test", # End-to-end test suite + "appauth", # Application authentication and authorization + "cli/rustfs-gui", # Graphical user interface client "common/common", # Shared utilities and data structures "common/lock", # Distributed locking implementation "common/protos", # Protocol buffer definitions "common/workers", # Worker thread pools and task scheduling - "iam", # Identity and Access Management - "crypto", # Cryptography and security features - "cli/rustfs-gui", # Graphical user interface client - "crates/obs", # Observability utilities + "crates/config", # Configuration management "crates/event-notifier", # Event notification system + "crates/obs", # Observability utilities + "crates/utils", # Utility functions and helpers + "crypto", # Cryptography and security features + "ecstore", # Erasure coding storage implementation + "e2e_test", # End-to-end test suite + "iam", # Identity and Access Management + "madmin", # Management dashboard and admin API interface + "rustfs", # Core file system implementation "s3select/api", # S3 Select API interface "s3select/query", # S3 Select query engine - "appauth", # Application authentication and authorization ] resolver = "2" @@ -45,22 +47,26 @@ policy = { path = "./policy", version = "0.0.1" } protos = { path = "./common/protos", version = "0.0.1" } query = { path = "./s3select/query", version = "0.0.1" } rustfs = { path = "./rustfs", version = "0.0.1" } +rustfs-config = { path = "./crates/config", version = "0.0.1" } rustfs-obs = { path = "crates/obs", version = "0.0.1" } rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" } +rustfs-utils = { path = "crates/utils", version = "0.0.1" } workers = { path = "./common/workers", version = "0.0.1" } atoi = "2.0.0" async-recursion = "1.1.1" async-trait = "0.1.88" atomic_enum = "0.3.0" -axum = "0.8.3" +axum = "0.8.4" axum-extra = "0.10.1" axum-server = { version = "0.7.2", features = ["tls-rustls"] } backon = "1.5.0" +blake2 = "0.10.6" bytes = "1.10.1" bytesize = "2.0.1" -chrono = { version = "0.4.40", features = ["serde"] } +chrono = { version = "0.4.41", features = ["serde"] } clap = { version = "4.5.37", features = ["derive", "env"] } config = "0.15.11" +const-str = { version = "0.6.2", features = ["std", "proc"] } datafusion = "46.0.1" derive_builder = "0.20.2" dioxus = { version = "0.6.3", features = ["router"] } @@ -69,7 +75,9 @@ flatbuffers = "25.2.10" futures = "0.3.31" futures-core = "0.3.31" futures-util = "0.3.31" +glob = "0.3.2" hex = "0.4.3" +highway = { version = "1.3.0" } hyper = "1.6.0" hyper-util = { version = "0.1.11", features = [ "tokio", @@ -86,13 +94,15 @@ keyring = { version = "3.6.2", features = [ "sync-secret-service", ] } lazy_static = "1.5.0" -libsystemd = { version = "0.7.1" } -local-ip-address = "0.6.3" +libsystemd = { version = "0.7.2" } +local-ip-address = "0.6.5" matchit = "0.8.4" md-5 = "0.10.6" mime = "0.3.17" mime_guess = "2.0.5" netif = "0.1.6" +nix = { version = "0.30.1", features = ["fs"] } +num_cpus = { version = "1.16.0" } nvml-wrapper = "0.10.0" object_store = "0.11.2" opentelemetry = { version = "0.29.1" } @@ -113,6 +123,8 @@ prost-types = "0.13.5" protobuf = "3.7" rand = "0.8.5" rdkafka = { version = "0.37.0", features = ["tokio"] } +reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } +regex = { version = "1.11.1" } reqwest = { version = "0.12.15", default-features = false, features = [ "rustls-tls", "charset", @@ -129,8 +141,8 @@ rfd = { version = "0.15.3", default-features = false, features = [ rmp = "0.8.14" rmp-serde = "1.3.0" rumqttc = { version = "0.24" } -rust-embed = "8.7.0" -rustls = { version = "0.23.26" } +rust-embed = { version = "8.7.1" } +rustls = { version = "0.23.27" } rustls-pki-types = "1.11.0" rustls-pemfile = "2.2.0" s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" } @@ -143,6 +155,7 @@ serde_with = "3.12.0" sha2 = "0.10.8" smallvec = { version = "1.15.0", features = ["serde"] } snafu = "0.8.5" +socket2 = "0.5.9" strum = { version = "0.27.1", features = ["derive"] } sysinfo = "0.34.2" tempfile = "3.19.1" @@ -155,12 +168,12 @@ time = { version = "0.3.41", features = [ "macros", "serde", ] } -tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] } -tonic = { version = "0.13.0", features = ["gzip"] } -tonic-build = "0.13.0" +tokio = { version = "1.45.0", features = ["fs", "rt-multi-thread"] } +tonic = { version = "0.13.1", features = ["gzip"] } +tonic-build = { version = "0.13.1" } tokio-rustls = { version = "0.26.2", default-features = false } -tokio-stream = "0.1.17" -tokio-util = { version = "0.7.14", features = ["io", "compat"] } +tokio-stream = { version = "0.1.17" } +tokio-util = { version = "0.7.15", features = ["io", "compat"] } tower = { version = "0.5.2", features = ["timeout"] } tower-http = { version = "0.6.2", features = ["cors"] } tracing = "0.1.41" @@ -176,6 +189,7 @@ uuid = { version = "1.16.0", features = [ "fast-rng", "macro-diagnostics", ] } +winapi = { version = "0.3.9" } [profile.wasm-dev] diff --git a/README.md b/README.md index 79d9a342..e4818d4d 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,7 @@ export RUSTFS__LOGGER__QUEUE_CAPACITY=10 2. Start the observability stack: ```bash - docker compose up -d -f docker-compose.yml + docker compose -f docker-compose.yml up -d ``` #### Access Monitoring Dashboards diff --git a/README_ZH.md b/README_ZH.md index c2016fb7..10f05bbe 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -102,7 +102,7 @@ export RUSTFS__LOGGER__QUEUE_CAPACITY=10 2. 启动可观测性系统: ```bash - docker compose up -d -f docker-compose.yml + docker compose -f docker-compose.yml up -d ``` #### 访问监控面板 diff --git a/common/common/src/lib.rs b/common/common/src/lib.rs index a8998fee..90250c5e 100644 --- a/common/common/src/lib.rs +++ b/common/common/src/lib.rs @@ -2,6 +2,9 @@ pub mod error; pub mod globals; pub mod last_minute; +// is ',' +pub static DEFAULT_DELIMITER: u8 = 44; + /// Defers evaluation of a block of code until the end of the scope. #[macro_export] macro_rules! defer { diff --git a/common/workers/src/workers.rs b/common/workers/src/workers.rs index bd12d928..f6a6a94e 100644 --- a/common/workers/src/workers.rs +++ b/common/workers/src/workers.rs @@ -3,13 +3,13 @@ use tokio::sync::{Mutex, Notify}; use tracing::info; pub struct Workers { - available: Mutex, // 可用的工作槽 - notify: Notify, // 用于通知等待的任务 - limit: usize, // 最大并发工作数 + available: Mutex, // Available working slots + notify: Notify, // Used to notify waiting tasks + limit: usize, // Maximum number of concurrent jobs } impl Workers { - // 创建 Workers 对象,允许最多 n 个作业并发执行 + // Create a Workers object that allows up to n jobs to execute concurrently pub fn new(n: usize) -> Result, &'static str> { if n == 0 { return Err("n must be > 0"); @@ -22,7 +22,7 @@ impl Workers { })) } - // 让一个作业获得执行的机会 + // Give a job a chance to be executed pub async fn take(&self) { loop { let mut available = self.available.lock().await; @@ -37,15 +37,15 @@ impl Workers { } } - // 让一个作业释放其机会 + // Release a job's slot pub async fn give(&self) { let mut available = self.available.lock().await; info!("worker give, {}", *available); - *available += 1; // 增加可用槽 - self.notify.notify_one(); // 通知一个等待的任务 + *available += 1; // Increase available slots + self.notify.notify_one(); // Notify a waiting task } - // 等待所有并发作业完成 + // Wait for all concurrent jobs to complete pub async fn wait(&self) { loop { { @@ -54,7 +54,7 @@ impl Workers { break; } } - // 等待直到所有槽都被释放 + // Wait until all slots are freed self.notify.notified().await; } info!("worker wait end"); diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml new file mode 100644 index 00000000..1a81e30d --- /dev/null +++ b/crates/config/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "rustfs-config" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +config = { workspace = true } +const-str = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } + + +[lints] +workspace = true diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs new file mode 100644 index 00000000..bebcbfd0 --- /dev/null +++ b/crates/config/src/config.rs @@ -0,0 +1,23 @@ +use crate::event::config::EventConfig; +use crate::ObservabilityConfig; + +/// RustFs configuration +pub struct RustFsConfig { + pub observability: ObservabilityConfig, + pub event: EventConfig, +} + +impl RustFsConfig { + pub fn new() -> Self { + Self { + observability: ObservabilityConfig::new(), + event: EventConfig::new(), + } + } +} + +impl Default for RustFsConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs new file mode 100644 index 00000000..294c7677 --- /dev/null +++ b/crates/config/src/constants/app.rs @@ -0,0 +1,72 @@ +use const_str::concat; + +/// Application name +/// Default value: RustFs +/// Environment variable: RUSTFS_APP_NAME +pub const APP_NAME: &str = "RustFs"; +/// Application version +/// Default value: 1.0.0 +/// Environment variable: RUSTFS_VERSION +pub const VERSION: &str = "0.0.1"; + +/// Default configuration logger level +/// Default value: info +/// Environment variable: RUSTFS_LOG_LEVEL +pub const DEFAULT_LOG_LEVEL: &str = "info"; + +/// maximum number of connections +/// This is the maximum number of connections that the server will accept. +/// This is used to limit the number of connections to the server. +pub const MAX_CONNECTIONS: usize = 100; +/// timeout for connections +/// This is the timeout for connections to the server. +/// This is used to limit the time that a connection can be open. +pub const DEFAULT_TIMEOUT_MS: u64 = 3000; + +/// Default Access Key +/// Default value: rustfsadmin +/// Environment variable: RUSTFS_ACCESS_KEY +/// Command line argument: --access-key +/// Example: RUSTFS_ACCESS_KEY=rustfsadmin +/// Example: --access-key rustfsadmin +pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; +/// Default Secret Key +/// Default value: rustfsadmin +/// Environment variable: RUSTFS_SECRET_KEY +/// Command line argument: --secret-key +/// Example: RUSTFS_SECRET_KEY=rustfsadmin +/// Example: --secret-key rustfsadmin +pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; +/// Default configuration file for observability +/// Default value: config/obs.toml +/// Environment variable: RUSTFS_OBS_CONFIG +/// Command line argument: --obs-config +/// Example: RUSTFS_OBS_CONFIG=config/obs.toml +/// Example: --obs-config config/obs.toml +/// Example: --obs-config /etc/rustfs/obs.toml +pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml"; + +/// Default TLS key for rustfs +/// This is the default key for TLS. +pub const RUSTFS_TLS_KEY: &str = "rustfs_key.pem"; + +/// Default TLS cert for rustfs +/// This is the default cert for TLS. +pub const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem"; + +/// Default port for rustfs +/// This is the default port for rustfs. +/// This is used to bind the server to a specific port. +pub const DEFAULT_PORT: u16 = 9000; + +/// Default address for rustfs +/// This is the default address for rustfs. +pub const DEFAULT_ADDRESS: &str = concat!(":", DEFAULT_PORT); + +/// Default port for rustfs console +/// This is the default port for rustfs console. +pub const DEFAULT_CONSOLE_PORT: u16 = 9002; + +/// Default address for rustfs console +/// This is the default address for rustfs console. +pub const DEFAULT_CONSOLE_ADDRESS: &str = concat!(":", DEFAULT_CONSOLE_PORT); diff --git a/crates/config/src/constants/mod.rs b/crates/config/src/constants/mod.rs new file mode 100644 index 00000000..04023c88 --- /dev/null +++ b/crates/config/src/constants/mod.rs @@ -0,0 +1 @@ +pub(crate) mod app; diff --git a/crates/config/src/event/config.rs b/crates/config/src/event/config.rs new file mode 100644 index 00000000..a8a43068 --- /dev/null +++ b/crates/config/src/event/config.rs @@ -0,0 +1,23 @@ +/// Event configuration module +pub struct EventConfig { + pub event_type: String, + pub event_source: String, + pub event_destination: String, +} + +impl EventConfig { + /// Creates a new instance of `EventConfig` with default values. + pub fn new() -> Self { + Self { + event_type: "default".to_string(), + event_source: "default".to_string(), + event_destination: "default".to_string(), + } + } +} + +impl Default for EventConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/event/event.rs b/crates/config/src/event/event.rs new file mode 100644 index 00000000..70a10369 --- /dev/null +++ b/crates/config/src/event/event.rs @@ -0,0 +1,17 @@ +/// Event configuration module +pub struct EventConfig { + pub event_type: String, + pub event_source: String, + pub event_destination: String, +} + +impl EventConfig { + /// Creates a new instance of `EventConfig` with default values. + pub fn new() -> Self { + Self { + event_type: "default".to_string(), + event_source: "default".to_string(), + event_destination: "default".to_string(), + } + } +} diff --git a/crates/config/src/event/mod.rs b/crates/config/src/event/mod.rs new file mode 100644 index 00000000..60280924 --- /dev/null +++ b/crates/config/src/event/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod config; +pub(crate) mod event; diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs new file mode 100644 index 00000000..fd7b8bec --- /dev/null +++ b/crates/config/src/lib.rs @@ -0,0 +1,9 @@ +use crate::observability::config::ObservabilityConfig; + +mod config; +mod constants; +mod event; +mod observability; + +pub use config::RustFsConfig; +pub use constants::app::*; diff --git a/crates/config/src/observability/config.rs b/crates/config/src/observability/config.rs new file mode 100644 index 00000000..361f9a6c --- /dev/null +++ b/crates/config/src/observability/config.rs @@ -0,0 +1,28 @@ +use crate::observability::logger::LoggerConfig; +use crate::observability::otel::OtelConfig; +use crate::observability::sink::SinkConfig; +use serde::Deserialize; + +/// Observability configuration +#[derive(Debug, Deserialize, Clone)] +pub struct ObservabilityConfig { + pub otel: OtelConfig, + pub sinks: SinkConfig, + pub logger: Option, +} + +impl ObservabilityConfig { + pub fn new() -> Self { + Self { + otel: OtelConfig::new(), + sinks: SinkConfig::new(), + logger: Some(LoggerConfig::new()), + } + } +} + +impl Default for ObservabilityConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/observability/file_sink.rs b/crates/config/src/observability/file_sink.rs new file mode 100644 index 00000000..d475376e --- /dev/null +++ b/crates/config/src/observability/file_sink.rs @@ -0,0 +1,25 @@ +use serde::Deserialize; + +/// File sink configuration +#[derive(Debug, Deserialize, Clone)] +pub struct FileSinkConfig { + pub path: String, + pub max_size: u64, + pub max_backups: u64, +} + +impl FileSinkConfig { + pub fn new() -> Self { + Self { + path: "".to_string(), + max_size: 0, + max_backups: 0, + } + } +} + +impl Default for FileSinkConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/observability/kafka_sink.rs b/crates/config/src/observability/kafka_sink.rs new file mode 100644 index 00000000..f40a979b --- /dev/null +++ b/crates/config/src/observability/kafka_sink.rs @@ -0,0 +1,23 @@ +use serde::Deserialize; + +/// Kafka sink configuration +#[derive(Debug, Deserialize, Clone)] +pub struct KafkaSinkConfig { + pub brokers: Vec, + pub topic: String, +} + +impl KafkaSinkConfig { + pub fn new() -> Self { + Self { + brokers: vec!["localhost:9092".to_string()], + topic: "rustfs".to_string(), + } + } +} + +impl Default for KafkaSinkConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/observability/logger.rs b/crates/config/src/observability/logger.rs new file mode 100644 index 00000000..f6c70682 --- /dev/null +++ b/crates/config/src/observability/logger.rs @@ -0,0 +1,21 @@ +use serde::Deserialize; + +/// Logger configuration +#[derive(Debug, Deserialize, Clone)] +pub struct LoggerConfig { + pub queue_capacity: Option, +} + +impl LoggerConfig { + pub fn new() -> Self { + Self { + queue_capacity: Some(10000), + } + } +} + +impl Default for LoggerConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/observability/mod.rs b/crates/config/src/observability/mod.rs new file mode 100644 index 00000000..65d9933b --- /dev/null +++ b/crates/config/src/observability/mod.rs @@ -0,0 +1,8 @@ +pub(crate) mod config; +pub(crate) mod file_sink; +pub(crate) mod kafka_sink; +pub(crate) mod logger; +pub(crate) mod observability; +pub(crate) mod otel; +pub(crate) mod sink; +pub(crate) mod webhook_sink; diff --git a/crates/config/src/observability/observability.rs b/crates/config/src/observability/observability.rs new file mode 100644 index 00000000..17b4e070 --- /dev/null +++ b/crates/config/src/observability/observability.rs @@ -0,0 +1,22 @@ +use crate::observability::logger::LoggerConfig; +use crate::observability::otel::OtelConfig; +use crate::observability::sink::SinkConfig; +use serde::Deserialize; + +/// Observability configuration +#[derive(Debug, Deserialize, Clone)] +pub struct ObservabilityConfig { + pub otel: OtelConfig, + pub sinks: SinkConfig, + pub logger: Option, +} + +impl ObservabilityConfig { + pub fn new() -> Self { + Self { + otel: OtelConfig::new(), + sinks: SinkConfig::new(), + logger: Some(LoggerConfig::new()), + } + } +} diff --git a/crates/config/src/observability/otel.rs b/crates/config/src/observability/otel.rs new file mode 100644 index 00000000..4ac6618b --- /dev/null +++ b/crates/config/src/observability/otel.rs @@ -0,0 +1,27 @@ +use serde::Deserialize; + +/// OpenTelemetry configuration +#[derive(Debug, Deserialize, Clone)] +pub struct OtelConfig { + pub endpoint: String, + pub service_name: String, + pub service_version: String, + pub resource_attributes: Vec, +} + +impl OtelConfig { + pub fn new() -> Self { + Self { + endpoint: "http://localhost:4317".to_string(), + service_name: "rustfs".to_string(), + service_version: "0.1.0".to_string(), + resource_attributes: vec![], + } + } +} + +impl Default for OtelConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/observability/sink.rs b/crates/config/src/observability/sink.rs new file mode 100644 index 00000000..dcb37fa3 --- /dev/null +++ b/crates/config/src/observability/sink.rs @@ -0,0 +1,28 @@ +use crate::observability::file_sink::FileSinkConfig; +use crate::observability::kafka_sink::KafkaSinkConfig; +use crate::observability::webhook_sink::WebhookSinkConfig; +use serde::Deserialize; + +/// Sink configuration +#[derive(Debug, Deserialize, Clone)] +pub struct SinkConfig { + pub kafka: Option, + pub webhook: Option, + pub file: Option, +} + +impl SinkConfig { + pub fn new() -> Self { + Self { + kafka: None, + webhook: None, + file: Some(FileSinkConfig::new()), + } + } +} + +impl Default for SinkConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/observability/webhook_sink.rs b/crates/config/src/observability/webhook_sink.rs new file mode 100644 index 00000000..49429203 --- /dev/null +++ b/crates/config/src/observability/webhook_sink.rs @@ -0,0 +1,25 @@ +use serde::Deserialize; + +/// Webhook sink configuration +#[derive(Debug, Deserialize, Clone)] +pub struct WebhookSinkConfig { + pub url: String, + pub method: String, + pub headers: Vec<(String, String)>, +} + +impl WebhookSinkConfig { + pub fn new() -> Self { + Self { + url: "http://localhost:8080/webhook".to_string(), + method: "POST".to_string(), + headers: vec![], + } + } +} + +impl Default for WebhookSinkConfig { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/event-notifier/examples/webhook.rs b/crates/event-notifier/examples/webhook.rs index c754f822..4cdf02c6 100644 --- a/crates/event-notifier/examples/webhook.rs +++ b/crates/event-notifier/examples/webhook.rs @@ -37,7 +37,7 @@ async fn receive_webhook(Json(payload): Json) -> StatusCode { println!("current time:{:04}-{:02}-{:02} {:02}:{:02}:{:02}", year, month, day, hour, minute, second); println!( "received a webhook request time:{} content:\n {}", - seconds.to_string(), + seconds, serde_json::to_string_pretty(&payload).unwrap() ); StatusCode::OK @@ -66,10 +66,10 @@ fn convert_seconds_to_date(seconds: u64) -> (u32, u32, u32, u32, u32, u32) { // calculate month let days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; - for m in 0..12 { - if total_seconds >= days_in_month[m] * seconds_per_day { + for m in &days_in_month { + if total_seconds >= m * seconds_per_day { month += 1; - total_seconds -= days_in_month[m] * seconds_per_day; + total_seconds -= m * seconds_per_day; } else { break; } diff --git a/crates/event-notifier/src/bus.rs b/crates/event-notifier/src/bus.rs index bd4b81c5..5cabfc22 100644 --- a/crates/event-notifier/src/bus.rs +++ b/crates/event-notifier/src/bus.rs @@ -7,11 +7,13 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio::time::Duration; use tokio_util::sync::CancellationToken; +use tracing::instrument; /// Handles incoming events from the producer. /// /// This function is responsible for receiving events from the producer and sending them to the appropriate adapters. /// It also handles the shutdown process and saves any pending logs to the event store. +#[instrument(skip_all)] pub async fn event_bus( mut rx: mpsc::Receiver, adapters: Vec>, diff --git a/crates/event-notifier/src/config.rs b/crates/event-notifier/src/config.rs index ab46d8e8..10429c34 100644 --- a/crates/event-notifier/src/config.rs +++ b/crates/event-notifier/src/config.rs @@ -162,7 +162,7 @@ impl NotifierConfig { } } -const DEFAULT_CONFIG_FILE: &str = "obs"; +const DEFAULT_CONFIG_FILE: &str = "event"; /// Provide temporary directories as default storage paths fn default_store_path() -> String { diff --git a/crates/event-notifier/src/event.rs b/crates/event-notifier/src/event.rs index ce32aecc..1bf100d7 100644 --- a/crates/event-notifier/src/event.rs +++ b/crates/event-notifier/src/event.rs @@ -15,6 +15,18 @@ pub struct Identity { pub principal_id: String, } +impl Identity { + /// Create a new Identity instance + pub fn new(principal_id: String) -> Self { + Self { principal_id } + } + + /// Set the principal ID + pub fn set_principal_id(&mut self, principal_id: String) { + self.principal_id = principal_id; + } +} + /// A struct representing the bucket information #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Bucket { @@ -24,6 +36,32 @@ pub struct Bucket { pub arn: String, } +impl Bucket { + /// Create a new Bucket instance + pub fn new(name: String, owner_identity: Identity, arn: String) -> Self { + Self { + name, + owner_identity, + arn, + } + } + + /// Set the name of the bucket + pub fn set_name(&mut self, name: String) { + self.name = name; + } + + /// Set the ARN of the bucket + pub fn set_arn(&mut self, arn: String) { + self.arn = arn; + } + + /// Set the owner identity of the bucket + pub fn set_owner_identity(&mut self, owner_identity: Identity) { + self.owner_identity = owner_identity; + } +} + /// A struct representing the object information #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Object { @@ -41,6 +79,64 @@ pub struct Object { pub sequencer: String, } +impl Object { + /// Create a new Object instance + pub fn new( + key: String, + size: Option, + etag: Option, + content_type: Option, + user_metadata: Option>, + version_id: Option, + sequencer: String, + ) -> Self { + Self { + key, + size, + etag, + content_type, + user_metadata, + version_id, + sequencer, + } + } + + /// Set the key + pub fn set_key(&mut self, key: String) { + self.key = key; + } + + /// Set the size + pub fn set_size(&mut self, size: Option) { + self.size = size; + } + + /// Set the etag + pub fn set_etag(&mut self, etag: Option) { + self.etag = etag; + } + + /// Set the content type + pub fn set_content_type(&mut self, content_type: Option) { + self.content_type = content_type; + } + + /// Set the user metadata + pub fn set_user_metadata(&mut self, user_metadata: Option>) { + self.user_metadata = user_metadata; + } + + /// Set the version ID + pub fn set_version_id(&mut self, version_id: Option) { + self.version_id = version_id; + } + + /// Set the sequencer + pub fn set_sequencer(&mut self, sequencer: String) { + self.sequencer = sequencer; + } +} + /// A struct representing the metadata of the event #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Metadata { @@ -52,6 +148,57 @@ pub struct Metadata { pub object: Object, } +impl Default for Metadata { + fn default() -> Self { + Self::new() + } +} +impl Metadata { + /// Create a new Metadata instance with default values + pub fn new() -> Self { + Self { + schema_version: "1.0".to_string(), + configuration_id: "default".to_string(), + bucket: Bucket::new( + "default".to_string(), + Identity::new("default".to_string()), + "arn:aws:s3:::default".to_string(), + ), + object: Object::new("default".to_string(), None, None, None, None, None, "default".to_string()), + } + } + + /// Create a new Metadata instance + pub fn create(schema_version: String, configuration_id: String, bucket: Bucket, object: Object) -> Self { + Self { + schema_version, + configuration_id, + bucket, + object, + } + } + + /// Set the schema version + pub fn set_schema_version(&mut self, schema_version: String) { + self.schema_version = schema_version; + } + + /// Set the configuration ID + pub fn set_configuration_id(&mut self, configuration_id: String) { + self.configuration_id = configuration_id; + } + + /// Set the bucket + pub fn set_bucket(&mut self, bucket: Bucket) { + self.bucket = bucket; + } + + /// Set the object + pub fn set_object(&mut self, object: Object) { + self.object = object; + } +} + /// A struct representing the source of the event #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Source { @@ -61,6 +208,28 @@ pub struct Source { pub user_agent: String, } +impl Source { + /// Create a new Source instance + pub fn new(host: String, port: String, user_agent: String) -> Self { + Self { host, port, user_agent } + } + + /// Set the host + pub fn set_host(&mut self, host: String) { + self.host = host; + } + + /// Set the port + pub fn set_port(&mut self, port: String) { + self.port = port; + } + + /// Set the user agent + pub fn set_user_agent(&mut self, user_agent: String) { + self.user_agent = user_agent; + } +} + /// Builder for creating an Event. /// /// This struct is used to build an Event object with various parameters. diff --git a/crates/event-notifier/src/global.rs b/crates/event-notifier/src/global.rs index 25fadeeb..0ffcb8b0 100644 --- a/crates/event-notifier/src/global.rs +++ b/crates/event-notifier/src/global.rs @@ -1,6 +1,7 @@ use crate::{create_adapters, Error, Event, NotifierConfig, NotifierSystem}; use std::sync::{atomic, Arc}; use tokio::sync::{Mutex, OnceCell}; +use tracing::instrument; static GLOBAL_SYSTEM: OnceCell>> = OnceCell::const_new(); static INITIALIZED: atomic::AtomicBool = atomic::AtomicBool::new(false); @@ -113,6 +114,7 @@ pub fn is_ready() -> bool { /// - The system is not initialized. /// - The system is not ready. /// - Sending the event fails. +#[instrument(fields(event))] pub async fn send_event(event: Event) -> Result<(), Error> { if !READY.load(atomic::Ordering::SeqCst) { return Err(Error::custom("Notification system not ready, please wait for initialization to complete")); @@ -124,6 +126,7 @@ pub async fn send_event(event: Event) -> Result<(), Error> { } /// Shuts down the notification system. +#[instrument] pub async fn shutdown() -> Result<(), Error> { if let Some(system) = GLOBAL_SYSTEM.get() { tracing::info!("Shutting down notification system start"); @@ -189,7 +192,7 @@ mod tests { let config = NotifierConfig::default(); let _ = initialize(config.clone()).await; // first initialization let result = initialize(config).await; // second initialization - assert!(!result.is_ok(), "Initialization should succeed"); + assert!(result.is_err(), "Initialization should succeed"); assert!(result.is_err(), "Re-initialization should fail"); } @@ -211,7 +214,7 @@ mod tests { ..Default::default() }; let result = initialize(config).await; - assert!(!result.is_err(), "Initialization with invalid config should fail"); + assert!(result.is_ok(), "Initialization with invalid config should fail"); assert!(is_initialized(), "System should not be marked as initialized after failure"); assert!(is_ready(), "System should not be marked as ready after failure"); } diff --git a/crates/event-notifier/src/notifier.rs b/crates/event-notifier/src/notifier.rs index 0b17dddd..5ab17d37 100644 --- a/crates/event-notifier/src/notifier.rs +++ b/crates/event-notifier/src/notifier.rs @@ -2,6 +2,7 @@ use crate::{event_bus, ChannelAdapter, Error, Event, EventStore, NotifierConfig} use std::sync::Arc; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; +use tracing::instrument; /// The `NotificationSystem` struct represents the notification system. /// It manages the event bus and the adapters. @@ -18,6 +19,7 @@ pub struct NotifierSystem { impl NotifierSystem { /// Creates a new `NotificationSystem` instance. + #[instrument(skip(config))] pub async fn new(config: NotifierConfig) -> Result { let (tx, rx) = mpsc::channel::(config.channel_capacity); let store = Arc::new(EventStore::new(&config.store_path).await?); @@ -44,6 +46,7 @@ impl NotifierSystem { /// Starts the notification system. /// It initializes the event bus and the producer. + #[instrument(skip_all)] pub async fn start(&mut self, adapters: Vec>) -> Result<(), Error> { if self.shutdown.is_cancelled() { let error = Error::custom("System is shutting down"); @@ -67,6 +70,7 @@ impl NotifierSystem { /// Sends an event to the notification system. /// This method is used to send events to the event bus. + #[instrument(skip(self))] pub async fn send_event(&self, event: Event) -> Result<(), Error> { self.log(tracing::Level::DEBUG, "send_event", &format!("Sending event: {:?}", event)); if self.shutdown.is_cancelled() { @@ -85,6 +89,7 @@ impl NotifierSystem { /// Shuts down the notification system. /// This method is used to cancel the event bus and producer tasks. + #[instrument(skip(self))] pub async fn shutdown(&mut self) -> Result<(), Error> { tracing::info!("Shutting down the notification system"); self.shutdown.cancel(); @@ -112,10 +117,13 @@ impl NotifierSystem { self.shutdown.is_cancelled() } - fn handle_error(&self, context: &str, error: &Error) { + #[instrument(skip(self))] + pub fn handle_error(&self, context: &str, error: &Error) { self.log(tracing::Level::ERROR, context, &format!("{:?}", error)); // TODO Can be extended to record to files or send to monitoring systems } + + #[instrument(skip(self))] fn log(&self, level: tracing::Level, context: &str, message: &str) { match level { tracing::Level::ERROR => tracing::error!("[{}] {}", context, message), diff --git a/crates/event-notifier/src/store.rs b/crates/event-notifier/src/store.rs index eca26674..24911615 100644 --- a/crates/event-notifier/src/store.rs +++ b/crates/event-notifier/src/store.rs @@ -5,6 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::fs::{create_dir_all, File, OpenOptions}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::sync::RwLock; +use tracing::instrument; /// `EventStore` is a struct that manages the storage of event logs. pub struct EventStore { @@ -21,6 +22,7 @@ impl EventStore { }) } + #[instrument(skip(self))] pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> { let _guard = self.lock.write().await; let file_path = format!( diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 95bedfc4..5a08321d 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -210,7 +210,8 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { .with_thread_names(true) .with_thread_ids(true) .with_file(true) - .with_line_number(true); + .with_line_number(true) + .with_filter(build_env_filter(logger_level, None)); let filter = build_env_filter(logger_level, None); let otel_filter = build_env_filter(logger_level, None); @@ -218,7 +219,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); // Configure registry to avoid repeated calls to filter methods - let _registry = tracing_subscriber::registry() + tracing_subscriber::registry() .with(filter) .with(ErrorLayer::default()) .with(if config.local_logging_enabled.unwrap_or(false) { @@ -230,17 +231,13 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { .with(otel_layer) .with(MetricsLayer::new(meter_provider.clone())) .init(); - info!("Telemetry logging enabled: {:?}", config.local_logging_enabled); - // if config.local_logging_enabled.unwrap_or(false) { - // registry.with(fmt_layer).init(); - // } else { - // registry.init(); - // } if !endpoint.is_empty() { info!( - "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}", - endpoint, logger_level + "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", + endpoint, + logger_level, + std::env::var("RUST_LOG").unwrap_or_else(|_| "未设置".to_string()) ); } } @@ -255,7 +252,6 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter { let level = default_level.unwrap_or(logger_level); let mut filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)); - if !matches!(logger_level, "trace" | "debug") { let directives: SmallVec<[&str; 5]> = smallvec::smallvec!["hyper", "tonic", "h2", "reqwest", "tower"]; for directive in directives { diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml new file mode 100644 index 00000000..13ee21e4 --- /dev/null +++ b/crates/utils/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rustfs-utils" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +local-ip-address = { workspace = true } +rustfs-config = { workspace = true } +rustls = { workspace = true } +rustls-pemfile = { workspace = true } +rustls-pki-types = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/rustfs/src/utils/mod.rs b/crates/utils/src/certs.rs similarity index 76% rename from rustfs/src/utils/mod.rs rename to crates/utils/src/certs.rs index fad2718c..568fc6b6 100644 --- a/rustfs/src/utils/mod.rs +++ b/crates/utils/src/certs.rs @@ -1,57 +1,48 @@ -use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use rustls::server::{ClientHello, ResolvesServerCert, ResolvesServerCertUsingSni}; use rustls::sign::CertifiedKey; use rustls_pemfile::{certs, private_key}; use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use std::collections::HashMap; -use std::fmt::Debug; use std::io::Error; -use std::net::IpAddr; use std::path::Path; use std::sync::Arc; use std::{fs, io}; use tracing::{debug, warn}; -/// Get the local IP address. -/// This function retrieves the local IP address of the machine. -pub(crate) fn get_local_ip() -> Option { - match local_ip_address::local_ip() { - Ok(IpAddr::V4(ip)) => Some(ip), - Err(_) => None, - Ok(IpAddr::V6(_)) => todo!(), - } -} - /// Load public certificate from file. /// This function loads a public certificate from the specified file. -pub(crate) fn load_certs(filename: &str) -> io::Result>> { +pub fn load_certs(filename: &str) -> io::Result>> { // Open certificate file. - let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; + let cert_file = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {}: {}", filename, e)))?; let mut reader = io::BufReader::new(cert_file); // Load and return certificate. let certs = certs(&mut reader) .collect::, _>>() - .map_err(|_| error(format!("certificate file {} format error", filename)))?; + .map_err(|_| certs_error(format!("certificate file {} format error", filename)))?; if certs.is_empty() { - return Err(error(format!("No valid certificate was found in the certificate file {}", filename))); + return Err(certs_error(format!( + "No valid certificate was found in the certificate file {}", + filename + ))); } Ok(certs) } /// Load private key from file. /// This function loads a private key from the specified file. -pub(crate) fn load_private_key(filename: &str) -> io::Result> { +pub fn load_private_key(filename: &str) -> io::Result> { // Open keyfile. - let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; + let keyfile = fs::File::open(filename).map_err(|e| certs_error(format!("failed to open {}: {}", filename, e)))?; let mut reader = io::BufReader::new(keyfile); // Load and return a single private key. - private_key(&mut reader)?.ok_or_else(|| error(format!("no private key found in {}", filename))) + private_key(&mut reader)?.ok_or_else(|| certs_error(format!("no private key found in {}", filename))) } /// error function -pub(crate) fn error(err: String) -> Error { +pub fn certs_error(err: String) -> Error { Error::new(io::ErrorKind::Other, err) } @@ -59,14 +50,14 @@ pub(crate) fn error(err: String) -> Error { /// This function loads all certificate and private key pairs from the specified directory. /// It looks for files named `rustfs_cert.pem` and `rustfs_key.pem` in each subdirectory. /// The root directory can also contain a default certificate/private key pair. -pub(crate) fn load_all_certs_from_directory( +pub fn load_all_certs_from_directory( dir_path: &str, ) -> io::Result>, PrivateKeyDer<'static>)>> { let mut cert_key_pairs = HashMap::new(); let dir = Path::new(dir_path); if !dir.exists() || !dir.is_dir() { - return Err(error(format!( + return Err(certs_error(format!( "The certificate directory does not exist or is not a directory: {}", dir_path ))); @@ -80,10 +71,10 @@ pub(crate) fn load_all_certs_from_directory( debug!("find the root directory certificate: {:?}", root_cert_path); let root_cert_str = root_cert_path .to_str() - .ok_or_else(|| error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?; + .ok_or_else(|| certs_error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?; let root_key_str = root_key_path .to_str() - .ok_or_else(|| error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?; + .ok_or_else(|| certs_error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?; match load_cert_key_pair(root_cert_str, root_key_str) { Ok((certs, key)) => { // The root directory certificate is used as the default certificate and is stored using special keys. @@ -104,7 +95,7 @@ pub(crate) fn load_all_certs_from_directory( let domain_name = path .file_name() .and_then(|name| name.to_str()) - .ok_or_else(|| error(format!("invalid domain name directory:{:?}", path)))?; + .ok_or_else(|| certs_error(format!("invalid domain name directory:{:?}", path)))?; // find certificate and private key files let cert_path = path.join(RUSTFS_TLS_CERT); // e.g., rustfs_cert.pem @@ -125,7 +116,10 @@ pub(crate) fn load_all_certs_from_directory( } if cert_key_pairs.is_empty() { - return Err(error(format!("No valid certificate/private key pair found in directory {}", dir_path))); + return Err(certs_error(format!( + "No valid certificate/private key pair found in directory {}", + dir_path + ))); } Ok(cert_key_pairs) @@ -171,7 +165,7 @@ pub fn create_multi_cert_resolver( for (domain, (certs, key)) in cert_key_pairs { // create a signature let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key) - .map_err(|_| error(format!("unsupported private key types:{}", domain)))?; + .map_err(|_| certs_error(format!("unsupported private key types:{}", domain)))?; // create a CertifiedKey let certified_key = CertifiedKey::new(certs, signing_key); @@ -181,7 +175,7 @@ pub fn create_multi_cert_resolver( // add certificate to resolver resolver .add(&domain, certified_key) - .map_err(|e| error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?; + .map_err(|e| certs_error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?; } } diff --git a/crates/utils/src/ip.rs b/crates/utils/src/ip.rs new file mode 100644 index 00000000..3b63b12f --- /dev/null +++ b/crates/utils/src/ip.rs @@ -0,0 +1,43 @@ +use std::net::{IpAddr, Ipv4Addr}; + +/// Get the IP address of the machine +/// +/// Priority is given to trying to get the IPv4 address, and if it fails, try to get the IPv6 address. +/// If both fail to retrieve, None is returned. +/// +/// # Returns +/// +/// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6) +/// * `None` - Unable to obtain any native IP address +pub fn get_local_ip() -> Option { + local_ip_address::local_ip() + .ok() + .or_else(|| local_ip_address::local_ipv6().ok()) +} + +/// Get the IP address of the machine as a string +/// +/// If the IP address cannot be obtained, returns "127.0.0.1" as the default value. +/// +/// # Returns +/// +/// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value +pub fn get_local_ip_with_default() -> String { + get_local_ip() + .unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_local_ip() { + match get_local_ip() { + Some(ip) => println!("the ip address of this machine:{}", ip), + None => println!("Unable to obtain the IP address of the machine"), + } + assert!(get_local_ip().is_some()); + } +} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs new file mode 100644 index 00000000..fbb5936b --- /dev/null +++ b/crates/utils/src/lib.rs @@ -0,0 +1,11 @@ +mod certs; +mod ip; +mod net; + +pub use certs::certs_error; +pub use certs::create_multi_cert_resolver; +pub use certs::load_all_certs_from_directory; +pub use certs::load_certs; +pub use certs::load_private_key; +pub use ip::get_local_ip; +pub use ip::get_local_ip_with_default; diff --git a/crates/utils/src/net.rs b/crates/utils/src/net.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/crates/utils/src/net.rs @@ -0,0 +1 @@ + diff --git a/docker-compose-obs.yaml b/docker-compose-obs.yaml index 505f287b..f6d85b44 100644 --- a/docker-compose-obs.yaml +++ b/docker-compose-obs.yaml @@ -1,6 +1,6 @@ services: otel-collector: - image: otel/opentelemetry-collector-contrib:0.120.0 + image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.124.0 environment: - TZ=Asia/Shanghai volumes: @@ -16,7 +16,7 @@ services: networks: - rustfs-network jaeger: - image: jaegertracing/jaeger:2.4.0 + image: jaegertracing/jaeger:2.5.0 environment: - TZ=Asia/Shanghai ports: @@ -26,7 +26,7 @@ services: networks: - rustfs-network prometheus: - image: prom/prometheus:v3.2.1 + image: prom/prometheus:v3.3.0 environment: - TZ=Asia/Shanghai volumes: @@ -36,7 +36,7 @@ services: networks: - rustfs-network loki: - image: grafana/loki:3.4.2 + image: grafana/loki:3.5.0 environment: - TZ=Asia/Shanghai volumes: @@ -47,7 +47,7 @@ services: networks: - rustfs-network grafana: - image: grafana/grafana:11.6.0 + image: grafana/grafana:11.6.1 ports: - "3000:3000" # Web UI environment: @@ -63,10 +63,10 @@ services: container_name: node1 environment: - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} - - RUSTFS_ADDRESS=0.0.0.0:9000 + - RUSTFS_ADDRESS=:9000 - RUSTFS_CONSOLE_ENABLE=true - - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 - - RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml + - RUSTFS_CONSOLE_ADDRESS=:9002 + - RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml platform: linux/amd64 ports: - "9001:9000" # 映射宿主机的 9001 端口到容器的 9000 端口 @@ -84,10 +84,10 @@ services: container_name: node2 environment: - RUSTFS_VOLUMES=/root/data/target/volume/test{1...4} - - RUSTFS_ADDRESS=0.0.0.0:9000 + - RUSTFS_ADDRESS=:9000 - RUSTFS_CONSOLE_ENABLE=true - - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 - - RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml + - RUSTFS_CONSOLE_ADDRESS=:9002 + - RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml platform: linux/amd64 ports: - "9002:9000" # 映射宿主机的 9002 端口到容器的 9000 端口 @@ -105,10 +105,10 @@ services: container_name: node3 environment: - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} - - RUSTFS_ADDRESS=0.0.0.0:9000 + - RUSTFS_ADDRESS=:9000 - RUSTFS_CONSOLE_ENABLE=true - - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 - - RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml + - RUSTFS_CONSOLE_ADDRESS=:9002 + - RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml platform: linux/amd64 ports: - "9003:9000" # 映射宿主机的 9003 端口到容器的 9000 端口 @@ -126,10 +126,10 @@ services: container_name: node4 environment: - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} - - RUSTFS_ADDRESS=0.0.0.0:9000 + - RUSTFS_ADDRESS=:9000 - RUSTFS_CONSOLE_ENABLE=true - - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002 - - RUSTFS_OBS_CONFIG=/etc/observability/config/obs.toml + - RUSTFS_CONSOLE_ADDRESS=:9002 + - RUSTFS_OBS_CONFIG=/etc/observability/config/obs-multi.toml platform: linux/amd64 ports: - "9004:9000" # 映射宿主机的 9004 端口到容器的 9000 端口 diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 721c902f..0f9431fc 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -11,14 +11,15 @@ rust-version.workspace = true workspace = true [dependencies] +rustfs-config = { workspace = true } async-trait.workspace = true backon.workspace = true -blake2 = "0.10.6" +blake2 = { workspace = true } bytes.workspace = true common.workspace = true policy.workspace = true chrono.workspace = true -glob = "0.3.2" +glob = { workspace = true } thiserror.workspace = true flatbuffers.workspace = true futures.workspace = true @@ -30,16 +31,16 @@ serde_json.workspace = true tracing-error.workspace = true s3s.workspace = true http.workspace = true -highway = "1.3.0" +highway = { workspace = true } url.workspace = true uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] } -reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } +reed-solomon-erasure = { workspace = true } transform-stream = "0.3.1" lazy_static.workspace = true lock.workspace = true -regex = "1.11.1" -netif = "0.1.6" -nix = { version = "0.29.0", features = ["fs"] } +regex = { workspace = true } +netif = { workspace = true } +nix = { workspace = true } path-absolutize = "3.1.1" protos.workspace = true rmp.workspace = true @@ -53,13 +54,13 @@ hex-simd = "0.8.0" path-clean = "1.0.1" tempfile.workspace = true tokio = { workspace = true, features = ["io-util", "sync", "signal"] } -tokio-stream = "0.1.17" +tokio-stream = { workspace = true } tonic.workspace = true tower.workspace = true byteorder = "1.5.0" xxhash-rust = { version = "0.8.15", features = ["xxh64"] } num = "0.4.3" -num_cpus = "1.16" +num_cpus = { workspace = true } s3s-policy.workspace = true rand.workspace = true pin-project-lite.workspace = true @@ -68,20 +69,18 @@ madmin.workspace = true workers.workspace = true reqwest = { workspace = true } urlencoding = "2.1.3" -smallvec = "1.15.0" +smallvec = { workspace = true } shadow-rs.workspace = true [target.'cfg(not(windows))'.dependencies] - -nix = { version = "0.29.0", features = ["fs"] } +nix = { workspace = true } [target.'cfg(windows)'.dependencies] - -winapi = "0.3.9" +winapi = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } [build-dependencies] -shadow-rs.workspace = true +shadow-rs = { workspace = true, features = ["build", "metadata"] } diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index b64c84b3..05e55a43 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -534,6 +534,7 @@ impl Writer for BitrotFileWriter { self } + #[tracing::instrument(level = "info", skip_all)] async fn write(&mut self, buf: Bytes) -> Result<()> { if buf.is_empty() { return Ok(()); diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 45abdbf5..bd10ccf6 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -38,7 +38,9 @@ use crate::set_disk::{ CHECK_PART_VOLUME_NOT_FOUND, }; use crate::store_api::{BitrotAlgorithm, StorageAPI}; -use crate::utils::fs::{access, lstat, remove, remove_all, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; +use crate::utils::fs::{ + access, lstat, remove, remove_all, remove_all_std, remove_std, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY, +}; use crate::utils::os::get_info; use crate::utils::path::{ self, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX, @@ -259,7 +261,7 @@ impl LocalDisk { #[tracing::instrument(level = "debug", skip(self))] async fn check_format_json(&self) -> Result { - let md = fs::metadata(&self.format_path).await.map_err(|e| match e.kind() { + let md = std::fs::metadata(&self.format_path).map_err(|e| match e.kind() { ErrorKind::NotFound => DiskError::DiskNotFound, ErrorKind::PermissionDenied => DiskError::FileAccessDenied, _ => { @@ -315,9 +317,9 @@ impl LocalDisk { #[allow(unused_variables)] pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> { if recursive { - remove_all(delete_path).await?; + remove_all_std(delete_path)?; } else { - remove(delete_path).await?; + remove_std(delete_path)?; } return Ok(()); @@ -365,7 +367,7 @@ impl LocalDisk { Ok(()) } - // #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip(self))] pub async fn delete_file( &self, base_path: &PathBuf, @@ -688,6 +690,7 @@ impl LocalDisk { } // write_all_private with check_path_length + #[tracing::instrument(level = "debug", skip_all)] pub async fn write_all_private( &self, volume: &str, @@ -1213,7 +1216,7 @@ impl DiskAPI for LocalDisk { Ok(data) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip_all)] async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()> { self.write_all_public(volume, path, data).await } @@ -1721,7 +1724,7 @@ impl DiskAPI for LocalDisk { Ok(()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip(self))] async fn rename_data( &self, src_volume: &str, @@ -1732,7 +1735,7 @@ impl DiskAPI for LocalDisk { ) -> Result { let src_volume_dir = self.get_bucket_path(src_volume)?; if !skip_access_checks(src_volume) { - if let Err(e) = utils::fs::access(&src_volume_dir).await { + if let Err(e) = utils::fs::access_std(&src_volume_dir) { info!("access checks failed, src_volume_dir: {:?}, err: {}", src_volume_dir, e.to_string()); return Err(convert_access_error(e, DiskError::VolumeAccessDenied)); } @@ -1740,7 +1743,7 @@ impl DiskAPI for LocalDisk { let dst_volume_dir = self.get_bucket_path(dst_volume)?; if !skip_access_checks(dst_volume) { - if let Err(e) = utils::fs::access(&dst_volume_dir).await { + if let Err(e) = utils::fs::access_std(&dst_volume_dir) { info!("access checks failed, dst_volume_dir: {:?}, err: {}", dst_volume_dir, e.to_string()); return Err(convert_access_error(e, DiskError::VolumeAccessDenied)); } @@ -1913,7 +1916,7 @@ impl DiskAPI for LocalDisk { if let Some(src_file_path_parent) = src_file_path.parent() { if src_volume != super::RUSTFS_META_MULTIPART_BUCKET { - let _ = utils::fs::remove(src_file_path_parent).await; + let _ = utils::fs::remove_std(src_file_path_parent); } else { let _ = self .delete_file(&dst_volume_dir, &src_file_path_parent.to_path_buf(), true, false) diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index ae88611a..a6dfb15b 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -108,6 +108,7 @@ pub async fn read_dir(path: impl AsRef, count: i32) -> Result> Ok(volumes) } +#[tracing::instrument(level = "debug", skip_all)] pub async fn rename_all( src_file_path: impl AsRef, dst_file_path: impl AsRef, @@ -136,7 +137,7 @@ pub async fn reliable_rename( base_dir: impl AsRef, ) -> io::Result<()> { if let Some(parent) = dst_file_path.as_ref().parent() { - if !file_exists(parent).await { + if !file_exists(parent) { info!("reliable_rename reliable_mkdir_all parent: {:?}", parent); reliable_mkdir_all(parent, base_dir.as_ref()).await?; } @@ -144,7 +145,7 @@ pub async fn reliable_rename( let mut i = 0; loop { - if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await { + if let Err(e) = utils::fs::rename_std(src_file_path.as_ref(), dst_file_path.as_ref()) { if os_is_not_exist(&e) && i == 0 { i += 1; continue; @@ -221,6 +222,6 @@ pub async fn os_mkdir_all(dir_path: impl AsRef, base_dir: impl AsRef Ok(()) } -pub async fn file_exists(path: impl AsRef) -> bool { - fs::metadata(path.as_ref()).await.map(|_| true).unwrap_or(false) +pub fn file_exists(path: impl AsRef) -> bool { + std::fs::metadata(path.as_ref()).map(|_| true).unwrap_or(false) } diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 590889e7..702d393e 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,5 +1,6 @@ use crate::bitrot::{BitrotReader, BitrotWriter}; use crate::error::clone_err; +use crate::io::Etag; use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs}; use bytes::{Bytes, BytesMut}; use common::error::{Error, Result}; @@ -8,8 +9,10 @@ use reed_solomon_erasure::galois_8::ReedSolomon; use smallvec::SmallVec; use std::any::Any; use std::io::ErrorKind; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::mpsc; use tracing::warn; use tracing::{error, info}; // use tracing::debug; @@ -26,7 +29,7 @@ pub struct Erasure { encoder: Option, pub block_size: usize, _id: Uuid, - buf: Vec, + _buf: Vec, } impl Erasure { @@ -46,61 +49,65 @@ impl Erasure { block_size, encoder, _id: Uuid::new_v4(), - buf: vec![0u8; block_size], + _buf: vec![0u8; block_size], } } - #[tracing::instrument(level = "debug", skip(self, reader, writers))] + #[tracing::instrument(level = "info", skip(self, reader, writers))] pub async fn encode( - &mut self, - reader: &mut S, + self: Arc, + mut reader: S, writers: &mut [Option], // block_size: usize, total_size: usize, write_quorum: usize, - ) -> Result + ) -> Result<(usize, String)> where - S: AsyncRead + Unpin + Send + 'static, + S: AsyncRead + Etag + Unpin + Send + 'static, { - // pin_mut!(body); - // let mut reader = tokio_util::io::StreamReader::new( - // body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), - // ); + let (tx, mut rx) = mpsc::channel(5); + let task = tokio::spawn(async move { + let mut buf = vec![0u8; self.block_size]; + let mut total: usize = 0; + loop { + if total_size > 0 { + let new_len = { + let remain = total_size - total; + if remain > self.block_size { + self.block_size + } else { + remain + } + }; - let mut total: usize = 0; - let mut blocks = >::new(); - - loop { - if total_size > 0 { - let new_len = { - let remain = total_size - total; - if remain > self.block_size { - self.block_size - } else { - remain + if new_len == 0 && total > 0 { + break; } - }; - if new_len == 0 && total > 0 { + buf.resize(new_len, 0u8); + match reader.read_exact(&mut buf).await { + Ok(res) => res, + Err(e) => { + if let ErrorKind::UnexpectedEof = e.kind() { + break; + } else { + return Err(Error::new(e)); + } + } + }; + total += buf.len(); + } + let blocks = Arc::new(Box::pin(self.clone().encode_data(&buf)?)); + let _ = tx.send(blocks).await; + if total_size == 0 { break; } - - self.buf.resize(new_len, 0u8); - match reader.read_exact(&mut self.buf).await { - Ok(res) => res, - Err(e) => { - if let ErrorKind::UnexpectedEof = e.kind() { - break; - } else { - return Err(Error::new(e)); - } - } - }; - total += self.buf.len(); } + let etag = reader.etag().await; + Ok((total, etag)) + }); - self.encode_data(&self.buf, &mut blocks)?; - + while let Some(blocks) = rx.recv().await { let write_futures = writers.iter_mut().enumerate().map(|(i, w_op)| { let i_inner = i; let blocks_inner = blocks.clone(); @@ -125,84 +132,8 @@ impl Erasure { warn!("Erasure encode errs {:?}", &errs); return Err(err); } - - if total_size == 0 { - break; - } } - - Ok(total) - - // // let stream = ChunkedStream::new(body, self.block_size); - // let stream = ChunkedStream::new(body, total_size, self.block_size, false); - // let mut total: usize = 0; - // // let mut idx = 0; - // pin_mut!(stream); - - // // warn!("encode start..."); - - // loop { - // match stream.next().await { - // Some(result) => match result { - // Ok(data) => { - // total += data.len(); - - // // EOF - // if data.is_empty() { - // break; - // } - - // // idx += 1; - // // warn!("encode {} get data {:?}", data.len(), data.to_vec()); - - // let blocks = self.encode_data(data.as_ref())?; - - // // warn!( - // // "encode shard size: {}/{} from block_size {}, total_size {} ", - // // blocks[0].len(), - // // blocks.len(), - // // data.len(), - // // total_size - // // ); - - // let mut errs = Vec::new(); - - // for (i, w_op) in writers.iter_mut().enumerate() { - // if let Some(w) = w_op { - // match w.write(blocks[i].as_ref()).await { - // Ok(_) => errs.push(None), - // Err(e) => errs.push(Some(e)), - // } - // } else { - // errs.push(Some(Error::new(DiskError::DiskNotFound))); - // } - // } - - // let none_count = errs.iter().filter(|&x| x.is_none()).count(); - // if none_count >= write_quorum { - // continue; - // } - - // if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { - // warn!("Erasure encode errs {:?}", &errs); - // return Err(err); - // } - // } - // Err(e) => { - // warn!("poll result err {:?}", &e); - // return Err(Error::msg(e.to_string())); - // } - // }, - // None => { - // // warn!("poll empty result"); - // break; - // } - // } - // } - - // let _ = close_bitrot_writers(writers).await?; - - // Ok(total) + task.await? } pub async fn decode( @@ -356,8 +287,8 @@ impl Erasure { self.data_shards + self.parity_shards } - #[tracing::instrument(level = "debug", skip_all, fields(data_len=data.len()))] - pub fn encode_data(&self, data: &[u8], shards: &mut SmallVec<[Bytes; 16]>) -> Result<()> { + #[tracing::instrument(level = "info", skip_all, fields(data_len=data.len()))] + pub fn encode_data(self: Arc, data: &[u8]) -> Result> { let (shard_size, total_size) = self.need_size(data.len()); // 生成一个新的 所需的所有分片数据长度 @@ -379,14 +310,13 @@ impl Erasure { // 零拷贝分片,所有 shard 引用 data_buffer let mut data_buffer = data_buffer.freeze(); - shards.clear(); - shards.reserve(self.total_shard_count()); + let mut shards = Vec::with_capacity(self.total_shard_count()); for _ in 0..self.total_shard_count() { let shard = data_buffer.split_to(shard_size); shards.push(shard); } - Ok(()) + Ok(shards) } pub fn decode_data(&self, shards: &mut [Option>]) -> Result<()> { @@ -617,7 +547,6 @@ impl ShardReader { #[cfg(test)] mod test { - use super::*; #[test] @@ -626,8 +555,7 @@ mod test { let parity_shards = 2; let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; let ec = Erasure::new(data_shards, parity_shards, 1); - let mut shards = SmallVec::new(); - ec.encode_data(data, &mut shards).unwrap(); + let shards = Arc::new(ec).encode_data(data).unwrap(); println!("shards:{:?}", shards); let mut s: Vec<_> = shards @@ -643,6 +571,7 @@ mod test { println!("sss:{:?}", &s); + let ec = Erasure::new(data_shards, parity_shards, 1); ec.decode_data(&mut s).unwrap(); // ec.encoder.reconstruct(&mut s).unwrap(); diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index 1234c07f..6bb644ba 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -58,10 +58,12 @@ impl FileMeta { } // isXL2V1Format + #[tracing::instrument(level = "debug", skip_all)] pub fn is_xl2_v1_format(buf: &[u8]) -> bool { !matches!(Self::check_xl2_v1(buf), Err(_e)) } + #[tracing::instrument(level = "debug", skip_all)] pub fn load(buf: &[u8]) -> Result { let mut xl = FileMeta::default(); xl.unmarshal_msg(buf)?; @@ -245,7 +247,7 @@ impl FileMeta { } } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip_all)] pub fn marshal_msg(&self) -> Result> { let mut wr = Vec::new(); @@ -363,6 +365,7 @@ impl FileMeta { } // shard_data_dir_count 查询 vid下data_dir的数量 + #[tracing::instrument(level = "debug", skip_all)] pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { self.versions .iter() @@ -434,6 +437,7 @@ impl FileMeta { } // 添加版本 + #[tracing::instrument(level = "debug", skip_all)] pub fn add_version(&mut self, fi: FileInfo) -> Result<()> { let vid = fi.version_id; diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index 162aa9d3..c60c6c59 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -20,8 +20,6 @@ pub const DISK_MIN_INODES: u64 = 1000; pub const DISK_FILL_FRACTION: f64 = 0.99; pub const DISK_RESERVE_FRACTION: f64 = 0.15; -pub const DEFAULT_PORT: u16 = 9000; - lazy_static! { static ref GLOBAL_RUSTFS_PORT: OnceLock = OnceLock::new(); pub static ref GLOBAL_OBJECT_API: OnceLock> = OnceLock::new(); @@ -41,31 +39,37 @@ lazy_static! { pub static ref GLOBAL_BOOT_TIME: OnceCell = OnceCell::new(); } +/// Get the global rustfs port pub fn global_rustfs_port() -> u16 { if let Some(p) = GLOBAL_RUSTFS_PORT.get() { *p } else { - DEFAULT_PORT + rustfs_config::DEFAULT_PORT } } +/// Set the global rustfs port pub fn set_global_rustfs_port(value: u16) { GLOBAL_RUSTFS_PORT.set(value).expect("set_global_rustfs_port fail"); } +/// Get the global rustfs port pub fn set_global_deployment_id(id: Uuid) { globalDeploymentIDPtr.set(id).unwrap(); } + +/// Get the global deployment id pub fn get_global_deployment_id() -> Option { globalDeploymentIDPtr.get().map(|v| v.to_string()) } - +/// Get the global deployment id pub fn set_global_endpoints(eps: Vec) { GLOBAL_Endpoints .set(EndpointServerPools::from(eps)) .expect("GLOBAL_Endpoints set failed") } +/// Get the global endpoints pub fn get_global_endpoints() -> EndpointServerPools { if let Some(eps) = GLOBAL_Endpoints.get() { eps.clone() diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 6f461457..f2e065fc 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -17,7 +17,7 @@ use crate::{ global::GLOBAL_IsDistErasure, heal::heal_commands::{HealStartSuccess, HEAL_UNKNOWN_SCAN}, new_object_layer_fn, - utils::path::has_profix, + utils::path::has_prefix, }; use crate::{ heal::heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT}, @@ -786,7 +786,7 @@ impl AllHealState { let _ = self.mu.write().await; for (k, v) in self.heal_seq_map.read().await.iter() { - if (has_profix(k, path_s) || has_profix(path_s, k)) && !v.has_ended().await { + if (has_prefix(k, path_s) || has_prefix(path_s, k)) && !v.has_ended().await { return Err(Error::from_string(format!( "The provided heal sequence path overlaps with an existing heal path: {}", k diff --git a/ecstore/src/io.rs b/ecstore/src/io.rs index f2affe8c..2bd02c11 100644 --- a/ecstore/src/io.rs +++ b/ecstore/src/io.rs @@ -1,8 +1,12 @@ +use async_trait::async_trait; use bytes::Bytes; use futures::TryStreamExt; use md5::Digest; use md5::Md5; +use pin_project_lite::pin_project; +use std::io; use std::pin::Pin; +use std::task::ready; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; @@ -125,10 +129,17 @@ impl AsyncRead for HttpFileReader { } } -pub struct EtagReader { - inner: R, - bytes_tx: mpsc::Sender, - md5_rx: oneshot::Receiver, +#[async_trait] +pub trait Etag { + async fn etag(self) -> String; +} + +pin_project! { + pub struct EtagReader { + inner: R, + bytes_tx: mpsc::Sender, + md5_rx: oneshot::Receiver, + } } impl EtagReader { @@ -148,8 +159,11 @@ impl EtagReader { EtagReader { inner, bytes_tx, md5_rx } } +} - pub async fn etag(self) -> String { +#[async_trait] +impl Etag for EtagReader { + async fn etag(self) -> String { drop(self.inner); drop(self.bytes_tx); self.md5_rx.await.unwrap() @@ -157,21 +171,28 @@ impl EtagReader { } impl AsyncRead for EtagReader { - #[tracing::instrument(level = "debug", skip_all)] - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let poll = Pin::new(&mut self.inner).poll_read(cx, buf); - if let Poll::Ready(Ok(())) = &poll { - if buf.remaining() == 0 { + #[tracing::instrument(level = "info", skip_all)] + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let me = self.project(); + + loop { + let rem = buf.remaining(); + if rem != 0 { + ready!(Pin::new(&mut *me.inner).poll_read(cx, buf))?; + if buf.remaining() == rem { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")).into(); + } + } else { let bytes = buf.filled(); let bytes = Bytes::copy_from_slice(bytes); - let tx = self.bytes_tx.clone(); + let tx = me.bytes_tx.clone(); tokio::spawn(async move { if let Err(e) = tx.send(bytes).await { warn!("EtagReader send error: {:?}", e); } }); + return Poll::Ready(Ok(())); } } - poll } } diff --git a/ecstore/src/quorum.rs b/ecstore/src/quorum.rs index d89fa523..d38177ca 100644 --- a/ecstore/src/quorum.rs +++ b/ecstore/src/quorum.rs @@ -147,6 +147,7 @@ pub fn reduce_read_quorum_errs( // 根据写quorum验证错误数量 // 返回最大错误数量的下标,或QuorumError +#[tracing::instrument(level = "info", skip_all)] pub fn reduce_write_quorum_errs( errs: &[Option], ignored_errs: &[Box], diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index da3046a7..3fb9a0cc 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -284,10 +284,20 @@ impl SetDisks { // let mut ress = Vec::with_capacity(disks.len()); let mut errs = Vec::with_capacity(disks.len()); - for (i, disk) in disks.iter().enumerate() { - let mut file_info = file_infos[i].clone(); + let src_bucket = Arc::new(src_bucket.to_string()); + let src_object = Arc::new(src_object.to_string()); + let dst_bucket = Arc::new(dst_bucket.to_string()); + let dst_object = Arc::new(dst_object.to_string()); - futures.push(async move { + for (i, (disk, file_info)) in disks.iter().zip(file_infos.iter()).enumerate() { + let mut file_info = file_info.clone(); + let disk = disk.clone(); + let src_bucket = src_bucket.clone(); + let src_object = src_object.clone(); + let dst_object = dst_object.clone(); + let dst_bucket = dst_bucket.clone(); + + futures.push(tokio::spawn(async move { if file_info.erasure.index == 0 { file_info.erasure.index = i + 1; } @@ -297,12 +307,12 @@ impl SetDisks { } if let Some(disk) = disk { - disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object) + disk.rename_data(&src_bucket, &src_object, file_info, &dst_bucket, &dst_object) .await } else { Err(Error::new(DiskError::DiskNotFound)) } - }) + })); } let mut disk_versions = vec![None; disks.len()]; @@ -311,15 +321,13 @@ impl SetDisks { let results = join_all(futures).await; for (idx, result) in results.iter().enumerate() { - match result { + match result.as_ref().map_err(|_| Error::new(DiskError::Unexpected))? { Ok(res) => { data_dirs[idx] = res.old_data_dir; disk_versions[idx].clone_from(&res.sign); - // ress.push(Some(res)); errs.push(None); } Err(e) => { - // ress.push(None); errs.push(Some(clone_err(e))); } } @@ -336,11 +344,14 @@ impl SetDisks { if let Some(disk) = disks[i].as_ref() { let fi = file_infos[i].clone(); let old_data_dir = data_dirs[i]; - futures.push(async move { + let disk = disk.clone(); + let src_bucket = src_bucket.clone(); + let src_object = src_object.clone(); + futures.push(tokio::spawn(async move { let _ = disk .delete_version( - src_bucket, - src_object, + &src_bucket, + &src_object, fi, false, DeleteOptions { @@ -354,7 +365,7 @@ impl SetDisks { debug!("rename_data delete_version err {:?}", e); e }); - }); + })); } } @@ -416,41 +427,41 @@ impl SetDisks { data_dir: &str, write_quorum: usize, ) -> Result<()> { - let file_path = format!("{}/{}", object, data_dir); - - let mut futures = Vec::with_capacity(disks.len()); - let mut errs = Vec::with_capacity(disks.len()); - - for disk in disks.iter() { + let file_path = Arc::new(format!("{}/{}", object, data_dir)); + let bucket = Arc::new(bucket.to_string()); + let futures = disks.iter().map(|disk| { let file_path = file_path.clone(); - futures.push(async move { + let bucket = bucket.clone(); + let disk = disk.clone(); + tokio::spawn(async move { if let Some(disk) = disk { - disk.delete( - bucket, - &file_path, - DeleteOptions { - recursive: true, - ..Default::default() - }, - ) - .await + match disk + .delete( + &bucket, + &file_path, + DeleteOptions { + recursive: true, + ..Default::default() + }, + ) + .await + { + Ok(_) => None, + Err(e) => Some(e), + } } else { - Err(Error::new(DiskError::DiskNotFound)) + Some(Error::new(DiskError::DiskNotFound)) } - }); - } - - let results = join_all(futures).await; - for result in results { - match result { - Ok(_) => { - errs.push(None); - } - Err(e) => { - errs.push(Some(e)); - } - } - } + }) + }); + let errs: Vec> = join_all(futures) + .await + .into_iter() + .map(|e| match e { + Ok(e) => e, + Err(_) => Some(Error::new(DiskError::Unexpected)), + }) + .collect(); if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { return Err(err); @@ -3759,7 +3770,7 @@ impl ObjectIO for SetDisks { let tmp_object = format!("{}/{}/part.1", tmp_dir, fi.data_dir.unwrap()); - let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); + let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); let is_inline_buffer = { if let Some(sc) = GLOBAL_StorageClass.get() { @@ -3798,19 +3809,18 @@ impl ObjectIO for SetDisks { } let stream = replace(&mut data.stream, Box::new(empty())); - let mut etag_stream = EtagReader::new(stream); + let etag_stream = EtagReader::new(stream); // TODO: etag from header - let w_size = erasure - .encode(&mut etag_stream, &mut writers, data.content_length, write_quorum) + let (w_size, etag) = Arc::new(erasure) + .encode(etag_stream, &mut writers, data.content_length, write_quorum) .await?; // TODO: 出错,删除临时目录 if let Err(err) = close_bitrot_writers(&mut writers).await { error!("close_bitrot_writers err {:?}", err); } - let etag = etag_stream.etag().await; //TODO: userDefined user_defined.insert("etag".to_owned(), etag.clone()); @@ -4408,21 +4418,19 @@ impl StorageAPI for SetDisks { } } - let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); + let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); let stream = replace(&mut data.stream, Box::new(empty())); - let mut etag_stream = EtagReader::new(stream); + let etag_stream = EtagReader::new(stream); - let w_size = erasure - .encode(&mut etag_stream, &mut writers, data.content_length, write_quorum) + let (w_size, mut etag) = Arc::new(erasure) + .encode(etag_stream, &mut writers, data.content_length, write_quorum) .await?; if let Err(err) = close_bitrot_writers(&mut writers).await { error!("close_bitrot_writers err {:?}", err); } - let mut etag = etag_stream.etag().await; - if let Some(ref tag) = opts.preserve_etag { etag = tag.clone(); } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 821eb530..eb28236e 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -2560,6 +2560,7 @@ fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Re check_multipart_object_args(bucket, object, upload_id) } +#[tracing::instrument(level = "debug")] fn check_put_object_args(bucket: &str, object: &str) -> Result<()> { if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() { return Err(Error::new(StorageError::BucketNameInvalid(bucket.to_string()))); diff --git a/ecstore/src/utils/fs.rs b/ecstore/src/utils/fs.rs index 463989c0..f60e28f0 100644 --- a/ecstore/src/utils/fs.rs +++ b/ecstore/src/utils/fs.rs @@ -106,6 +106,11 @@ pub async fn access(path: impl AsRef) -> io::Result<()> { Ok(()) } +pub fn access_std(path: impl AsRef) -> io::Result<()> { + std::fs::metadata(path)?; + Ok(()) +} + pub async fn lstat(path: impl AsRef) -> io::Result { fs::metadata(path).await } @@ -114,6 +119,7 @@ pub async fn make_dir_all(path: impl AsRef) -> io::Result<()> { fs::create_dir_all(path.as_ref()).await } +#[tracing::instrument(level = "debug", skip_all)] pub async fn remove(path: impl AsRef) -> io::Result<()> { let meta = fs::metadata(path.as_ref()).await?; if meta.is_dir() { @@ -132,6 +138,25 @@ pub async fn remove_all(path: impl AsRef) -> io::Result<()> { } } +#[tracing::instrument(level = "debug", skip_all)] +pub fn remove_std(path: impl AsRef) -> io::Result<()> { + let meta = std::fs::metadata(path.as_ref())?; + if meta.is_dir() { + std::fs::remove_dir(path.as_ref()) + } else { + std::fs::remove_file(path.as_ref()) + } +} + +pub fn remove_all_std(path: impl AsRef) -> io::Result<()> { + let meta = std::fs::metadata(path.as_ref())?; + if meta.is_dir() { + std::fs::remove_dir_all(path.as_ref()) + } else { + std::fs::remove_file(path.as_ref()) + } +} + pub async fn mkdir(path: impl AsRef) -> io::Result<()> { fs::create_dir(path.as_ref()).await } @@ -140,6 +165,11 @@ pub async fn rename(from: impl AsRef, to: impl AsRef) -> io::Result< fs::rename(from, to).await } +pub fn rename_std(from: impl AsRef, to: impl AsRef) -> io::Result<()> { + std::fs::rename(from, to) +} + +#[tracing::instrument(level = "debug", skip_all)] pub async fn read_file(path: impl AsRef) -> io::Result> { fs::read(path.as_ref()).await } diff --git a/ecstore/src/utils/path.rs b/ecstore/src/utils/path.rs index 0e38eac2..0c63b960 100644 --- a/ecstore/src/utils/path.rs +++ b/ecstore/src/utils/path.rs @@ -52,7 +52,7 @@ pub fn strings_has_prefix_fold(s: &str, prefix: &str) -> bool { s.len() >= prefix.len() && (s[..prefix.len()] == *prefix || s[..prefix.len()].eq_ignore_ascii_case(prefix)) } -pub fn has_profix(s: &str, prefix: &str) -> bool { +pub fn has_prefix(s: &str, prefix: &str) -> bool { if cfg!(target_os = "windows") { return strings_has_prefix_fold(s, prefix); } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 3019c060..7f7ec690 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -30,7 +30,7 @@ clap.workspace = true crypto = { workspace = true } datafusion = { workspace = true } common.workspace = true -const-str = { version = "0.6.1", features = ["std", "proc"] } +const-str = { workspace = true } ecstore.workspace = true policy.workspace = true flatbuffers.workspace = true @@ -42,7 +42,6 @@ http.workspace = true http-body.workspace = true iam = { workspace = true } lock.workspace = true -local-ip-address = { workspace = true } matchit = { workspace = true } mime.workspace = true mime_guess = { workspace = true } @@ -51,17 +50,18 @@ pin-project-lite.workspace = true protos.workspace = true query = { workspace = true } rmp-serde.workspace = true +rustfs-config = { workspace = true } rustfs-event-notifier = { workspace = true } rustfs-obs = { workspace = true } +rustfs-utils = { workspace = true } rustls.workspace = true -rustls-pemfile.workspace = true -rustls-pki-types.workspace = true rust-embed = { workspace = true, features = ["interpolate-folder-path"] } s3s.workspace = true serde.workspace = true serde_json.workspace = true serde_urlencoded = { workspace = true } shadow-rs = { workspace = true, features = ["build", "metadata"] } +socket2 = { workspace = true } tracing.workspace = true time = { workspace = true, features = ["parsing", "formatting", "serde"] } tokio-util.workspace = true diff --git a/rustfs/README.md b/rustfs/README.md index 86b2ab76..147e7ed7 100644 --- a/rustfs/README.md +++ b/rustfs/README.md @@ -1,30 +1,36 @@ -rustfs/ -├── Cargo.toml -├── src/ -│ ├── main.rs # 主入口 -│ ├── admin/ -│ │ └── mod.rs # 管理接口 -│ ├── auth/ -│ │ └── mod.rs # 认证模块 -│ ├── config/ -│ │ ├── mod.rs # 配置模块 -│ │ └── options.rs # 命令行参数 -│ ├── console/ -│ │ ├── mod.rs # 控制台模块 -│ │ └── server.rs # 控制台服务器 -│ ├── grpc/ -│ │ └── mod.rs # gRPC 服务 -│ ├── license/ -│ │ └── mod.rs # 许可证管理 -│ ├── logging/ -│ │ └── mod.rs # 日志管理 -│ ├── server/ -│ │ ├── mod.rs # 服务器实现 -│ │ ├── connection.rs # 连接处理 -│ │ ├── service.rs # 服务实现 -│ │ └── state.rs # 状态管理 -│ ├── storage/ -│ │ ├── mod.rs # 存储模块 -│ │ └── fs.rs # 文件系统实现 -│ └── utils/ -│ └── mod.rs # 工具函数 \ No newline at end of file +# RustFS + +RustFS is a simple file system written in Rust. It is designed to be a learning project for those who want to understand +how file systems work and how to implement them in Rust. + +## Features + +- Simple file system structure +- Basic file operations (create, read, write, delete) +- Directory support +- File metadata (size, creation time, etc.) +- Basic error handling +- Unit tests for core functionality +- Documentation for public API +- Example usage +- License information +- Contributing guidelines +- Changelog +- Code of conduct +- Acknowledgements +- Contact information +- Links to additional resources + +## Getting Started + +To get started with RustFS, clone the repository and build the project: + +```bash +git clone git@github.com:rustfs/s3-rustfs.git +cd rustfs +cargo build +``` + +## Usage + +To use RustFS, you can create a new file system instance and perform basic file operations. Here is an example: diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index 7a7bdc4c..a93b6dda 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -1,40 +1,8 @@ use clap::Parser; use const_str::concat; -use ecstore::global::DEFAULT_PORT; use std::string::ToString; shadow_rs::shadow!(build); -/// Default Access Key -/// Default value: rustfsadmin -/// Environment variable: RUSTFS_ACCESS_KEY -/// Command line argument: --access-key -/// Example: RUSTFS_ACCESS_KEY=rustfsadmin -/// Example: --access-key rustfsadmin -pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; -/// Default Secret Key -/// Default value: rustfsadmin -/// Environment variable: RUSTFS_SECRET_KEY -/// Command line argument: --secret-key -/// Example: RUSTFS_SECRET_KEY=rustfsadmin -/// Example: --secret-key rustfsadmin -pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; -/// Default configuration file for observability -/// Default value: config/obs.toml -/// Environment variable: RUSTFS_OBS_CONFIG -/// Command line argument: --obs-config -/// Example: RUSTFS_OBS_CONFIG=config/obs.toml -/// Example: --obs-config config/obs.toml -/// Example: --obs-config /etc/rustfs/obs.toml -pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml"; - -/// Default TLS key for rustfs -/// This is the default key for TLS. -pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_key.pem"; - -/// Default TLS cert for rustfs -/// This is the default cert for TLS. -pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem"; - #[allow(clippy::const_is_empty)] const SHORT_VERSION: &str = { if !build::TAG.is_empty() { @@ -67,7 +35,7 @@ pub struct Opt { pub volumes: Vec, /// bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname - #[arg(long, default_value_t = format!("0.0.0.0:{}", DEFAULT_PORT), env = "RUSTFS_ADDRESS")] + #[arg(long, default_value_t = rustfs_config::DEFAULT_ADDRESS.to_string(), env = "RUSTFS_ADDRESS")] pub address: String, /// Domain name used for virtual-hosted-style requests. @@ -75,17 +43,19 @@ pub struct Opt { pub server_domains: Vec, /// Access key used for authentication. - #[arg(long, default_value_t = DEFAULT_ACCESS_KEY.to_string(), env = "RUSTFS_ACCESS_KEY")] + #[arg(long, default_value_t = rustfs_config::DEFAULT_ACCESS_KEY.to_string(), env = "RUSTFS_ACCESS_KEY")] pub access_key: String, /// Secret key used for authentication. - #[arg(long, default_value_t = DEFAULT_SECRET_KEY.to_string(), env = "RUSTFS_SECRET_KEY")] + #[arg(long, default_value_t = rustfs_config::DEFAULT_SECRET_KEY.to_string(), env = "RUSTFS_SECRET_KEY")] pub secret_key: String, + /// Enable console server #[arg(long, default_value_t = false, env = "RUSTFS_CONSOLE_ENABLE")] pub console_enable: bool, - #[arg(long, default_value_t = format!("127.0.0.1:{}", 9002), env = "RUSTFS_CONSOLE_ADDRESS")] + /// Console server bind address + #[arg(long, default_value_t = rustfs_config::DEFAULT_CONSOLE_ADDRESS.to_string(), env = "RUSTFS_CONSOLE_ADDRESS")] pub console_address: String, /// rustfs endpoint for console @@ -94,7 +64,7 @@ pub struct Opt { /// Observability configuration file /// Default value: config/obs.toml - #[arg(long, default_value_t = DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")] + #[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")] pub obs_config: String, /// tls path for rustfs api and console. diff --git a/rustfs/src/console.rs b/rustfs/src/console.rs index c3edbaaa..0523991d 100644 --- a/rustfs/src/console.rs +++ b/rustfs/src/console.rs @@ -1,4 +1,3 @@ -use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use crate::license::get_license; use axum::{ body::Body, @@ -8,6 +7,7 @@ use axum::{ Router, }; use axum_extra::extract::Host; +use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use std::io; use axum::response::Redirect; @@ -17,7 +17,7 @@ use mime_guess::from_path; use rust_embed::RustEmbed; use serde::Serialize; use shadow_rs::shadow; -use std::net::{Ipv4Addr, SocketAddr}; +use std::net::{IpAddr, SocketAddr}; use std::sync::OnceLock; use std::time::Duration; use tokio::signal; @@ -33,7 +33,8 @@ const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3"; #[folder = "$CARGO_MANIFEST_DIR/static"] struct StaticFiles; -async fn static_handler(uri: axum::http::Uri) -> impl IntoResponse { +/// Static file handler +async fn static_handler(uri: Uri) -> impl IntoResponse { let mut path = uri.path().trim_start_matches('/'); if path.is_empty() { path = "index.html" @@ -72,7 +73,7 @@ pub(crate) struct Config { } impl Config { - fn new(local_ip: Ipv4Addr, port: u16, version: &str, date: &str) -> Self { + fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self { Config { port, api: Api { @@ -143,7 +144,7 @@ struct License { pub(crate) static CONSOLE_CONFIG: OnceLock = OnceLock::new(); #[allow(clippy::const_is_empty)] -pub(crate) fn init_console_cfg(local_ip: Ipv4Addr, port: u16) { +pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) { CONSOLE_CONFIG.get_or_init(|| { let ver = { if !build::TAG.is_empty() { @@ -193,7 +194,7 @@ fn _is_private_ip(ip: std::net::IpAddr) -> bool { async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse { let scheme = uri.scheme().map(|s| s.as_str()).unwrap_or("http"); - // 从 uri 中获取 host,如果没有则使用 Host extractor 的值 + // Get the host from the uri and use the value of the host extractor if it doesn't have one let host = uri.host().unwrap_or(host.as_str()); let host = if host.contains(':') { @@ -203,7 +204,7 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse { host }; - // 将当前配置复制一份 + // Make a copy of the current configuration let mut cfg = CONSOLE_CONFIG.get().unwrap().clone(); let url = format!("{}://{}:{}", scheme, host, cfg.port); @@ -219,14 +220,14 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse { pub async fn start_static_file_server( addrs: &str, - local_ip: Ipv4Addr, + local_ip: IpAddr, access_key: &str, secret_key: &str, tls_path: Option, ) { - // 配置 CORS + // Configure CORS let cors = CorsLayer::new() - .allow_origin(Any) // 生产环境建议指定具体域名 + .allow_origin(Any) // In the production environment, we recommend that you specify a specific domain name .allow_methods([http::Method::GET, http::Method::POST]) .allow_headers([header::CONTENT_TYPE]); // Create a route @@ -298,7 +299,7 @@ async fn start_server(server_addr: SocketAddr, tls_path: Option, app: Ro } #[allow(dead_code)] -/// HTTP 到 HTTPS 的 308 重定向 +/// 308 redirect for HTTP to HTTPS fn redirect_to_https(https_port: u16) -> Router { Router::new().route( "/*path", diff --git a/rustfs/src/event.rs b/rustfs/src/event.rs new file mode 100644 index 00000000..99e2a75c --- /dev/null +++ b/rustfs/src/event.rs @@ -0,0 +1,21 @@ +use rustfs_event_notifier::NotifierConfig; +use tracing::{error, info, instrument}; + +#[instrument] +pub(crate) async fn init_event_notifier(notifier_config: Option) { + // Initialize event notifier + if notifier_config.is_some() { + info!("event_config is not empty"); + tokio::spawn(async move { + let config = NotifierConfig::event_load_config(notifier_config); + let result = rustfs_event_notifier::initialize(config).await; + if let Err(e) = result { + error!("Failed to initialize event notifier: {}", e); + } else { + info!("Event notifier initialized successfully"); + } + }); + } else { + info!("event_config is empty"); + } +} diff --git a/rustfs/src/license.rs b/rustfs/src/license.rs index 25d09c82..2206c40e 100644 --- a/rustfs/src/license.rs +++ b/rustfs/src/license.rs @@ -10,6 +10,7 @@ lazy_static::lazy_static! { static ref LICENSE: OnceLock = OnceLock::new(); } +/// Initialize the license pub fn init_license(license: Option) { if license.is_none() { error!("License is None"); @@ -23,10 +24,13 @@ pub fn init_license(license: Option) { }); } +/// Get the license pub fn get_license() -> Option { LICENSE.get().cloned() } +/// Check the license +/// This function checks if the license is valid. #[allow(unreachable_code)] pub fn license_check() -> Result<()> { return Ok(()); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 7871fba5..bf94a6f6 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -2,18 +2,18 @@ mod admin; mod auth; mod config; mod console; +mod event; mod grpc; pub mod license; mod logging; mod server; mod service; mod storage; -mod utils; + use crate::auth::IAMAuth; use crate::console::{init_console_cfg, CONSOLE_CONFIG}; // Ensure the correct path for parse_license is imported use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT}; -use crate::utils::error; use bytes::Bytes; use chrono::Datelike; use clap::Parser; @@ -21,7 +21,6 @@ use common::{ error::{Error, Result}, globals::set_global_addr, }; -use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use ecstore::bucket::metadata_sys::init_bucket_metadata_sys; use ecstore::config as ecconfig; use ecstore::config::GLOBAL_ConfigSys; @@ -48,11 +47,12 @@ use hyper_util::{ use iam::init_iam_sys; use license::init_license; use protos::proto_gen::node_service::node_service_server::NodeServiceServer; -use rustfs_event_notifier::NotifierConfig; +use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard}; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; +use socket2::SockRef; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -62,8 +62,10 @@ use tokio_rustls::TlsAcceptor; use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; -use tracing::Span; -use tracing::{debug, error, info, info_span, warn}; +use tracing::{debug, error, info, warn}; +use tracing::{instrument, Span}; + +const MI_B: usize = 1024 * 1024; #[cfg(all(target_os = "linux", target_env = "gnu"))] #[global_allocator] @@ -77,12 +79,12 @@ fn check_auth(req: Request<()>) -> Result, Status> { _ => Err(Status::unauthenticated("No valid auth token")), } } - +#[instrument] fn print_server_info() { let cfg = CONSOLE_CONFIG.get().unwrap(); let current_year = chrono::Utc::now().year(); - // 使用自定义宏打印服务器信息 + // Use custom macros to print server information info!("RustFS Object Storage Server"); info!("Copyright: 2024-{} RustFS, Inc", current_year); info!("License: {}", cfg.license()); @@ -95,8 +97,7 @@ async fn main() -> Result<()> { // Parse the obtained parameters let opt = config::Opt::parse(); - // config::init_config(opt.clone()); - + // Initialize the configuration init_license(opt.license.clone()); // Load the configuration file @@ -108,50 +109,34 @@ async fn main() -> Result<()> { // Store in global storage set_global_guard(guard)?; - // Initialize event notifier - let notifier_config = opt.clone().event_config; - if notifier_config.is_some() { - info!("event_config is not empty"); - tokio::spawn(async move { - let config = NotifierConfig::event_load_config(notifier_config); - let result = rustfs_event_notifier::initialize(config).await; - if let Err(e) = result { - error!("Failed to initialize event notifier: {}", e); - } else { - info!("Event notifier initialized successfully"); - } - }); - } else { - info!("event_config is empty"); - } - // Run parameters run(opt).await } -// #[tokio::main] +#[instrument(skip(opt))] async fn run(opt: config::Opt) -> Result<()> { - let span = info_span!("trace-main-run"); - let _enter = span.enter(); - debug!("opt: {:?}", &opt); + // Initialize event notifier + event::init_event_notifier(opt.event_config).await; + let server_addr = net::parse_and_resolve_address(opt.address.as_str())?; let server_port = server_addr.port(); let server_address = server_addr.to_string(); debug!("server_address {}", &server_address); - //设置 AK 和 SK + // Set up AK and SK iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?; set_global_rustfs_port(server_port); - //监听地址,端口从参数中获取 + // The listening address and port are obtained from the parameters let listener = TcpListener::bind(server_address.clone()).await?; - //获取监听地址 + // Obtain the listener address let local_addr: SocketAddr = listener.local_addr()?; - let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap(); + // let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap(); + let local_ip = rustfs_utils::get_local_ip().ok_or(local_addr.ip()).unwrap(); // 用于 rpc let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone()) @@ -175,7 +160,7 @@ async fn run(opt: config::Opt) -> Result<()> { // Detailed endpoint information (showing all API endpoints) let api_endpoints = format!("http://{}:{}", local_ip, server_port); let localhost_endpoint = format!("http://127.0.0.1:{}", server_port); - info!("API: {} {}", api_endpoints, localhost_endpoint); + info!(" API: {} {}", api_endpoints, localhost_endpoint); info!(" RootUser: {}", opt.access_key.clone()); info!(" RootPass: {}", opt.secret_key.clone()); if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) { @@ -198,13 +183,13 @@ async fn run(opt: config::Opt) -> Result<()> { set_global_endpoints(endpoint_pools.as_ref().clone()); update_erasure_type(setup_type).await; - // 初始化本地磁盘 + // Initialize the local disk init_local_disks(endpoint_pools.clone()) .await .map_err(|err| Error::from_string(err.to_string()))?; // Setup S3 service - // 本项目使用 s3s 库来实现 s3 服务 + // This project uses the S3S library to implement S3 services let s3_service = { let store = storage::ecfs::FS::new(); // let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?); @@ -212,7 +197,7 @@ async fn run(opt: config::Opt) -> Result<()> { let access_key = opt.access_key.clone(); let secret_key = opt.secret_key.clone(); - //显示 info 信息 + // Displays info information debug!("authentication is enabled {}, {}", &access_key, &secret_key); b.set_auth(IAMAuth::new(access_key, secret_key)); @@ -263,7 +248,7 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("Found TLS directory, checking for certificates"); // 1. Try to load all certificates directly (including root and subdirectories) - match utils::load_all_certs_from_directory(&tls_path) { + match rustfs_utils::load_all_certs_from_directory(&tls_path) { Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => { debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len()); let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); @@ -271,7 +256,7 @@ async fn run(opt: config::Opt) -> Result<()> { // create a multi certificate configuration let mut server_config = ServerConfig::builder() .with_no_client_auth() - .with_cert_resolver(Arc::new(utils::create_multi_cert_resolver(cert_key_pairs)?)); + .with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?)); server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; Some(TlsAcceptor::from(Arc::new(server_config))) @@ -286,12 +271,14 @@ async fn run(opt: config::Opt) -> Result<()> { if has_single_cert { debug!("Found legacy single TLS certificate, starting with HTTPS"); let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?; - let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?; + let certs = + rustfs_utils::load_certs(cert_path.as_str()).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + let key = rustfs_utils::load_private_key(key_path.as_str()) + .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; let mut server_config = ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key) - .map_err(|e| error(e.to_string()))?; + .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; Some(TlsAcceptor::from(Arc::new(server_config))) } else { @@ -339,7 +326,7 @@ async fn run(opt: config::Opt) -> Result<()> { .layer( TraceLayer::new_for_http() .make_span_with(|request: &HttpRequest<_>| { - let span = tracing::debug_span!("http-request", + let span = tracing::info_span!("http-request", status_code = tracing::field::Empty, method = %request.method(), uri = %request.uri(), @@ -368,13 +355,14 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("http response generated in {:?}", latency) }) .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { - info!(histogram.request.body.len = chunk.len(), "histogram request body lenght",); + info!(histogram.request.body.len = chunk.len(), "histogram request body length",); debug!("http body sending {} bytes in {:?}", chunk.len(), latency) }) .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { debug!("http stream closed after {:?}", stream_duration) }) .on_failure(|_error, latency: Duration, _span: &Span| { + info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); debug!("http request failure error: {:?} in {:?}", _error, latency) }), ) @@ -425,15 +413,22 @@ async fn run(opt: config::Opt) -> Result<()> { } }; - if let Err(err) = socket.set_nodelay(true) { + let socket_ref = SockRef::from(&socket); + if let Err(err) = socket_ref.set_nodelay(true) { warn!(?err, "Failed to set TCP_NODELAY"); } + if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) { + warn!(?err, "Failed to set set_recv_buffer_size"); + } + if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) { + warn!(?err, "Failed to set set_send_buffer_size"); + } if has_tls_certs { debug!("TLS certificates found, starting with SIGINT"); let tls_socket = match tls_acceptor .as_ref() - .ok_or_else(|| error("TLS not configured".to_string())) + .ok_or_else(|| rustfs_utils::certs_error("TLS not configured".to_string())) .unwrap() .accept(socket) .await diff --git a/rustfs/src/server/service_state.rs b/rustfs/src/server/service_state.rs index ad137f66..5390fb1e 100644 --- a/rustfs/src/server/service_state.rs +++ b/rustfs/src/server/service_state.rs @@ -129,7 +129,7 @@ impl Default for ServiceStateManager { } } -// 使用示例 +// Example of use #[cfg(test)] mod tests { use super::*; @@ -138,18 +138,18 @@ mod tests { fn test_service_state_manager() { let manager = ServiceStateManager::new(); - // 初始状态应该是 Starting + // The initial state should be Starting assert_eq!(manager.current_state(), ServiceState::Starting); - // 更新状态到 Ready + // Update the status to Ready manager.update(ServiceState::Ready); assert_eq!(manager.current_state(), ServiceState::Ready); - // 更新状态到 Stopping + // Update the status to Stopping manager.update(ServiceState::Stopping); assert_eq!(manager.current_state(), ServiceState::Stopping); - // 更新状态到 Stopped + // Update the status to Stopped manager.update(ServiceState::Stopped); assert_eq!(manager.current_state(), ServiceState::Stopped); } diff --git a/rustfs/src/storage/access.rs b/rustfs/src/storage/access.rs index e2cad0c0..3589923f 100644 --- a/rustfs/src/storage/access.rs +++ b/rustfs/src/storage/access.rs @@ -20,6 +20,7 @@ pub(crate) struct ReqInfo { pub version_id: Option, } +/// Authorizes the request based on the action and credentials. pub async fn authorize_request(req: &mut S3Request, action: Action) -> S3Result<()> { let req_info = req.extensions.get_mut::().expect("ReqInfo not found"); diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 252cd316..fd7b4c0c 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -62,6 +62,7 @@ use s3s::S3; use s3s::{S3Request, S3Response}; use std::fmt::Debug; use std::str::FromStr; +use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; @@ -141,6 +142,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } + /// Copy an object from one location to another #[tracing::instrument(level = "debug", skip(self, req))] async fn copy_object(&self, req: S3Request) -> S3Result> { let CopyObjectInput { @@ -227,6 +229,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } + /// Delete a bucket #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; @@ -249,6 +252,7 @@ impl S3 for FS { Ok(S3Response::new(DeleteBucketOutput {})) } + /// Delete an object #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_object(&self, req: S3Request) -> S3Result> { let DeleteObjectInput { @@ -308,6 +312,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } + /// Delete multiple objects #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_objects(&self, req: S3Request) -> S3Result> { // info!("delete_objects args {:?}", req.input); @@ -367,6 +372,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } + /// Get bucket location #[tracing::instrument(level = "debug", skip(self, req))] async fn get_bucket_location(&self, req: S3Request) -> S3Result> { // mc get 1 @@ -385,6 +391,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } + /// Get bucket notification #[tracing::instrument( level = "debug", skip(self, req), @@ -1890,14 +1897,14 @@ impl S3 for FS { ) -> S3Result> { info!("handle select_object_content"); - let input = req.input; + let input = Arc::new(req.input); info!("{:?}", input); let db = make_rustfsms(input.clone(), false).await.map_err(|e| { error!("make db failed, {}", e.to_string()); s3_error!(InternalError, "{}", e.to_string()) })?; - let query = Query::new(Context { input: input.clone() }, input.request.expression); + let query = Query::new(Context { input: input.clone() }, input.request.expression.clone()); let result = db .execute(&query) .await diff --git a/rustfs/src/storage/event_notifier.rs b/rustfs/src/storage/event_notifier.rs new file mode 100644 index 00000000..292b45e3 --- /dev/null +++ b/rustfs/src/storage/event_notifier.rs @@ -0,0 +1,17 @@ +use rustfs_event_notifier::{Event, Metadata}; + +/// Create a new metadata object +#[allow(dead_code)] +pub(crate) fn create_metadata() -> Metadata { + // Create a new metadata object + let mut metadata = Metadata::new(); + metadata.set_configuration_id("test-config".to_string()); + // Return the created metadata object + metadata +} + +/// Create a new event object +#[allow(dead_code)] +pub(crate) async fn send_event(event: Event) -> Result<(), Box> { + rustfs_event_notifier::send_event(event).await.map_err(|e| e.into()) +} diff --git a/rustfs/src/storage/mod.rs b/rustfs/src/storage/mod.rs index d7d9d87e..2f8ec8b8 100644 --- a/rustfs/src/storage/mod.rs +++ b/rustfs/src/storage/mod.rs @@ -1,4 +1,5 @@ pub mod access; pub mod ecfs; pub mod error; +mod event_notifier; pub mod options; diff --git a/rustfs/src/storage/options.rs b/rustfs/src/storage/options.rs index 18a13974..ae0df539 100644 --- a/rustfs/src/storage/options.rs +++ b/rustfs/src/storage/options.rs @@ -8,6 +8,7 @@ use lazy_static::lazy_static; use std::collections::HashMap; use uuid::Uuid; +/// Creates options for deleting an object in a bucket. pub async fn del_opts( bucket: &str, object: &str, @@ -56,6 +57,7 @@ pub async fn del_opts( Ok(opts) } +/// Creates options for getting an object from a bucket. pub async fn get_opts( bucket: &str, object: &str, @@ -105,6 +107,7 @@ pub async fn get_opts( Ok(opts) } +/// Creates options for putting an object in a bucket. pub async fn put_opts( bucket: &str, object: &str, @@ -151,6 +154,7 @@ pub async fn put_opts( Ok(opts) } +/// Creates options for copying an object in a bucket. pub async fn copy_dst_opts( bucket: &str, object: &str, @@ -172,6 +176,7 @@ pub fn put_opts_from_headers( get_default_opts(headers, metadata, false) } +/// Creates default options for getting an object from a bucket. pub fn get_default_opts( _headers: &HeaderMap, metadata: Option>, @@ -183,6 +188,7 @@ pub fn get_default_opts( }) } +/// Extracts metadata from headers and returns it as a HashMap. pub fn extract_metadata(headers: &HeaderMap) -> HashMap { let mut metadata = HashMap::new(); @@ -191,6 +197,7 @@ pub fn extract_metadata(headers: &HeaderMap) -> HashMap, metadata: &mut HashMap) { for (k, v) in headers.iter() { if let Some(key) = k.as_str().strip_prefix("x-amz-meta-") { @@ -219,7 +226,9 @@ pub fn extract_metadata_from_mime(headers: &HeaderMap, metadata: &m metadata.insert("content-type".to_owned(), "binary/octet-stream".to_owned()); } } + lazy_static! { + /// List of supported headers. static ref SUPPORTED_HEADERS: Vec<&'static str> = vec![ "content-type", "cache-control", diff --git a/s3select/api/Cargo.toml b/s3select/api/Cargo.toml index 27e260c4..125849cd 100644 --- a/s3select/api/Cargo.toml +++ b/s3select/api/Cargo.toml @@ -7,12 +7,14 @@ edition.workspace = true async-trait.workspace = true bytes.workspace = true chrono.workspace = true +common.workspace = true datafusion = { workspace = true } ecstore.workspace = true futures = { workspace = true } futures-core = { workspace = true } http.workspace = true object_store = { workspace = true } +pin-project-lite.workspace = true s3s.workspace = true snafu = { workspace = true, features = ["backtrace"] } tokio.workspace = true diff --git a/s3select/api/src/object_store.rs b/s3select/api/src/object_store.rs index 7772936c..52132bc1 100644 --- a/s3select/api/src/object_store.rs +++ b/s3select/api/src/object_store.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; +use common::DEFAULT_DELIMITER; use ecstore::io::READ_BUFFER_SIZE; use ecstore::new_object_layer_fn; use ecstore::store::ECStore; @@ -24,29 +25,54 @@ use object_store::PutOptions; use object_store::PutPayload; use object_store::PutResult; use object_store::{Error as o_Error, Result}; +use pin_project_lite::pin_project; use s3s::dto::SelectObjectContentInput; use s3s::s3_error; use s3s::S3Result; use std::ops::Range; +use std::pin::Pin; use std::sync::Arc; +use std::task::ready; +use std::task::Poll; +use tokio::io::AsyncRead; use tokio_util::io::ReaderStream; use tracing::info; use transform_stream::AsyncTryStream; #[derive(Debug)] pub struct EcObjectStore { - input: SelectObjectContentInput, + input: Arc, + need_convert: bool, + delimiter: String, store: Arc, } - impl EcObjectStore { - pub fn new(input: SelectObjectContentInput) -> S3Result { + pub fn new(input: Arc) -> S3Result { let Some(store) = new_object_layer_fn() else { return Err(s3_error!(InternalError, "ec store not inited")); }; - Ok(Self { input, store }) + let (need_convert, delimiter) = if let Some(csv) = input.request.input_serialization.csv.as_ref() { + if let Some(delimiter) = csv.field_delimiter.as_ref() { + if delimiter.len() > 1 { + (true, delimiter.to_owned()) + } else { + (false, String::new()) + } + } else { + (false, String::new()) + } + } else { + (false, String::new()) + }; + + Ok(Self { + input, + need_convert, + delimiter, + store, + }) } } @@ -79,16 +105,6 @@ impl ObjectStore for EcObjectStore { source: "can not get object info".into(), })?; - // let stream = stream::unfold(reader.stream, |mut blob| async move { - // match blob.next().await { - // Some(Ok(chunk)) => { - // let bytes = chunk; - // Some((Ok(bytes), blob)) - // } - // _ => None, - // } - // }) - // .boxed(); let meta = ObjectMeta { location: location.clone(), last_modified: Utc::now(), @@ -98,10 +114,21 @@ impl ObjectStore for EcObjectStore { }; let attributes = Attributes::default(); - Ok(GetResult { - payload: object_store::GetResultPayload::Stream( + let payload = if self.need_convert { + object_store::GetResultPayload::Stream( + bytes_stream( + ReaderStream::with_capacity(ConvertStream::new(reader.stream, self.delimiter.clone()), READ_BUFFER_SIZE), + reader.object_info.size, + ) + .boxed(), + ) + } else { + object_store::GetResultPayload::Stream( bytes_stream(ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE), reader.object_info.size).boxed(), - ), + ) + }; + Ok(GetResult { + payload, meta, range: 0..reader.object_info.size, attributes, @@ -154,6 +181,54 @@ impl ObjectStore for EcObjectStore { } } +pin_project! { + struct ConvertStream { + inner: R, + delimiter: Vec, + } +} + +impl ConvertStream { + fn new(inner: R, delimiter: String) -> Self { + ConvertStream { + inner, + delimiter: delimiter.as_bytes().to_vec(), + } + } +} + +impl AsyncRead for ConvertStream { + #[tracing::instrument(level = "debug", skip_all)] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let me = self.project(); + ready!(Pin::new(&mut *me.inner).poll_read(cx, buf))?; + let bytes = buf.filled(); + let replaced = replace_symbol(me.delimiter, bytes); + buf.clear(); + buf.put_slice(&replaced); + Poll::Ready(Ok(())) + } +} + +fn replace_symbol(delimiter: &[u8], slice: &[u8]) -> Vec { + let mut result = Vec::with_capacity(slice.len()); + let mut i = 0; + while i < slice.len() { + if slice[i..].starts_with(delimiter) { + result.push(DEFAULT_DELIMITER); + i += delimiter.len(); + } else { + result.push(slice[i]); + i += 1; + } + } + result +} + pub fn bytes_stream(stream: S, content_length: usize) -> impl Stream> + Send + 'static where S: Stream> + Send + 'static, @@ -175,3 +250,21 @@ where Ok(()) }) } + +#[cfg(test)] +mod test { + use super::replace_symbol; + + #[test] + fn test_replace() { + let ss = String::from("dandan&&is&&best"); + let slice = ss.as_bytes(); + let delimiter = b"&&"; + println!("len: {}", "╦".len()); + let result = replace_symbol(delimiter, slice); + match String::from_utf8(result) { + Ok(s) => println!("slice: {}", s), + Err(e) => eprintln!("Error converting to string: {}", e), + } + } +} diff --git a/s3select/api/src/query/mod.rs b/s3select/api/src/query/mod.rs index 6ddd2dc8..93736be2 100644 --- a/s3select/api/src/query/mod.rs +++ b/s3select/api/src/query/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use s3s::dto::SelectObjectContentInput; pub mod analyzer; @@ -16,7 +18,7 @@ pub mod session; #[derive(Clone)] pub struct Context { // maybe we need transfer some info? - pub input: SelectObjectContentInput, + pub input: Arc, } #[derive(Clone)] diff --git a/s3select/api/src/query/session.rs b/s3select/api/src/query/session.rs index 286ee9f8..581cdf39 100644 --- a/s3select/api/src/query/session.rs +++ b/s3select/api/src/query/session.rs @@ -66,7 +66,7 @@ impl SessionCtxFactory { 9,Ivy,24,Marketing,4800 10,Jack,38,Finance,7500"; let data_bytes = data.as_bytes(); - // let data = r#""year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank" + // let data = r#""year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank" // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"SOPHIA"╦"119"╦"1" // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"CHLOE"╦"106"╦"2" // "2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMILY"╦"93"╦"3" diff --git a/s3select/query/src/dispatcher/manager.rs b/s3select/query/src/dispatcher/manager.rs index 80543ed6..ee5386e2 100644 --- a/s3select/query/src/dispatcher/manager.rs +++ b/s3select/query/src/dispatcher/manager.rs @@ -49,7 +49,7 @@ lazy_static! { #[derive(Clone)] pub struct SimpleQueryDispatcher { - input: SelectObjectContentInput, + input: Arc, // client for default tenant _default_table_provider: TableHandleProviderRef, session_factory: Arc, @@ -164,7 +164,9 @@ impl SimpleQueryDispatcher { .map(|e| e.as_bytes().first().copied().unwrap_or_default()), ); if let Some(delimiter) = csv.field_delimiter.as_ref() { - file_format = file_format.with_delimiter(delimiter.as_bytes().first().copied().unwrap_or_default()); + if delimiter.len() == 1 { + file_format = file_format.with_delimiter(delimiter.as_bytes()[0]); + } } // TODO waiting for processing @junxiang Mu // if csv.file_header_info.is_some() {} @@ -272,7 +274,7 @@ impl Stream for TrackedRecordBatchStream { #[derive(Default, Clone)] pub struct SimpleQueryDispatcherBuilder { - input: Option, + input: Option>, default_table_provider: Option, session_factory: Option>, parser: Option>, @@ -283,7 +285,7 @@ pub struct SimpleQueryDispatcherBuilder { } impl SimpleQueryDispatcherBuilder { - pub fn with_input(mut self, input: SelectObjectContentInput) -> Self { + pub fn with_input(mut self, input: Arc) -> Self { self.input = Some(input); self } diff --git a/s3select/query/src/instance.rs b/s3select/query/src/instance.rs index 44952a4a..34492063 100644 --- a/s3select/query/src/instance.rs +++ b/s3select/query/src/instance.rs @@ -63,7 +63,7 @@ where } } -pub async fn make_rustfsms(input: SelectObjectContentInput, is_test: bool) -> QueryResult { +pub async fn make_rustfsms(input: Arc, is_test: bool) -> QueryResult { // init Function Manager, we can define some UDF if need let func_manager = SimpleFunctionMetadataManager::default(); // TODO session config need load global system config @@ -95,6 +95,8 @@ pub async fn make_rustfsms(input: SelectObjectContentInput, is_test: bool) -> Qu #[cfg(test)] mod tests { + use std::sync::Arc; + use api::{ query::{Context, Query}, server::dbms::DatabaseManagerSystem, @@ -111,7 +113,7 @@ mod tests { #[ignore] async fn test_simple_sql() { let sql = "select * from S3Object"; - let input = SelectObjectContentInput { + let input = Arc::new(SelectObjectContentInput { bucket: "dandan".to_string(), expected_bucket_owner: None, key: "test.csv".to_string(), @@ -135,7 +137,7 @@ mod tests { request_progress: None, scan_range: None, }, - }; + }); let db = make_rustfsms(input.clone(), true).await.unwrap(); let query = Query::new(Context { input }, sql.to_string()); @@ -167,8 +169,8 @@ mod tests { #[tokio::test] #[ignore] async fn test_func_sql() { - let sql = "SELECT s._1 FROM S3Object s"; - let input = SelectObjectContentInput { + let sql = "SELECT * FROM S3Object s"; + let input = Arc::new(SelectObjectContentInput { bucket: "dandan".to_string(), expected_bucket_owner: None, key: "test.csv".to_string(), @@ -194,7 +196,7 @@ mod tests { request_progress: None, scan_range: None, }, - }; + }); let db = make_rustfsms(input.clone(), true).await.unwrap(); let query = Query::new(Context { input }, sql.to_string()); diff --git a/scripts/run.sh b/scripts/run.sh index 352568ec..59703af9 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -33,7 +33,7 @@ export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS=":9002" # export RUSTFS_SERVER_DOMAINS="localhost:9000" # HTTPS 证书目录 -# export RUSTFS_TLS_PATH="./deploy/certs" +# export RUSTFS_TLS_PATH="./deploy/certs" # 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一 export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"