From 751be4bdf64af2cfe2ba12c319ab337ee309f369 Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 08:34:17 +0800 Subject: [PATCH 01/12] init openobserve docker yml --- .docker/openobserve-otel/docker-compose.yml | 66 +++++++++++++++++++ .../otel-collector-config.yaml | 58 ++++++++++++++++ scripts/run.sh | 2 +- 3 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 .docker/openobserve-otel/docker-compose.yml create mode 100644 .docker/openobserve-otel/otel-collector-config.yaml diff --git a/.docker/openobserve-otel/docker-compose.yml b/.docker/openobserve-otel/docker-compose.yml new file mode 100644 index 00000000..13d77625 --- /dev/null +++ b/.docker/openobserve-otel/docker-compose.yml @@ -0,0 +1,66 @@ +services: + openobserve: + image: public.ecr.aws/zinclabs/openobserve:latest + restart: unless-stopped + environment: + ZO_ROOT_USER_EMAIL: "root@rustfs.com" + ZO_ROOT_USER_PASSWORD: "rustfs123" + ZO_TRACING_HEADER_KEY: "Authorization" + ZO_TRACING_HEADER_VALUE: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" + ZO_DATA_DIR: "/data" + ZO_MEMORY_CACHE_ENABLED: "true" + ZO_MEMORY_CACHE_MAX_SIZE: "256" + RUST_LOG: "info" + ports: + - "5080:5080" + volumes: + - ./data:/data + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:5080/health" ] + interval: 10s + timeout: 2s + retries: 3 + networks: + - otel-network + deploy: + resources: + limits: + memory: 512M + reservations: + memory: 256M + + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + restart: unless-stopped + volumes: + - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml + ports: + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP + - "13133:13133" # Health check + - "1777:1777" # pprof + - "55679:55679" # zpages + depends_on: + openobserve: + condition: service_healthy + networks: + - otel-network + deploy: + resources: + limits: + memory: 400M + reservations: + memory: 200M + +networks: + otel-network: + driver: bridge + name: otel-network + ipam: + config: + - subnet: 172.28.0.0/16 + gateway: 172.28.0.1 + labels: + com.example.description: "Network for OpenObserve and OpenTelemetry Collector" +volumes: + data: \ No newline at end of file diff --git a/.docker/openobserve-otel/otel-collector-config.yaml b/.docker/openobserve-otel/otel-collector-config.yaml new file mode 100644 index 00000000..8cdca6f8 --- /dev/null +++ b/.docker/openobserve-otel/otel-collector-config.yaml @@ -0,0 +1,58 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + filelog: + include: [ "/var/log/app/*.log" ] + start_at: end + +processors: + batch: + timeout: 1s + send_batch_size: 1024 + memory_limiter: + check_interval: 1s + limit_mib: 400 + spike_limit_mib: 100 + +exporters: + otlphttp/openobserve: + endpoint: http://openobserve:5080/api/default + headers: + Authorization: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" + stream-name: default + organization: default + tls: + insecure: true + compression: gzip + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + +service: + pipelines: + traces: + receivers: [ otlp ] + processors: [ memory_limiter, batch ] + exporters: [ otlphttp/openobserve ] + metrics: + receivers: [ otlp ] + processors: [ memory_limiter, batch ] + exporters: [ otlphttp/openobserve ] + logs: + receivers: [ otlp, filelog ] + processors: [ memory_limiter, batch ] + exporters: [ otlphttp/openobserve ] + extensions: + health_check: + endpoint: 0.0.0.0:13133 + pprof: + endpoint: 0.0.0.0:1777 + zpages: + endpoint: 0.0.0.0:55679 + diff --git a/scripts/run.sh b/scripts/run.sh index 2004e05a..29605648 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -37,7 +37,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9002" # export RUSTFS_TLS_PATH="./deploy/certs" # 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一 -export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" +# export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 #export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 From 41e378fc2819dc98d48a537ba1298ecd63a18abd Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 09:53:03 +0800 Subject: [PATCH 02/12] upgrade config --- .docker/openobserve-otel/docker-compose.yml | 16 +++++++----- .../otel-collector-config.yaml | 25 +++++++++++-------- .gitignore | 3 ++- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/.docker/openobserve-otel/docker-compose.yml b/.docker/openobserve-otel/docker-compose.yml index 13d77625..68788aba 100644 --- a/.docker/openobserve-otel/docker-compose.yml +++ b/.docker/openobserve-otel/docker-compose.yml @@ -13,25 +13,29 @@ services: RUST_LOG: "info" ports: - "5080:5080" + - "5081:5081" volumes: - ./data:/data healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:5080/health" ] + start_period: 60s interval: 10s - timeout: 2s - retries: 3 + timeout: 5s + retries: 6 networks: - otel-network deploy: resources: limits: - memory: 512M + memory: 1024M reservations: - memory: 256M + memory: 512M otel-collector: image: otel/opentelemetry-collector-contrib:latest restart: unless-stopped + environment: + - TZ=Asia/Shanghai volumes: - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml ports: @@ -48,9 +52,9 @@ services: deploy: resources: limits: - memory: 400M + memory: 10240M reservations: - memory: 200M + memory: 512M networks: otel-network: diff --git a/.docker/openobserve-otel/otel-collector-config.yaml b/.docker/openobserve-otel/otel-collector-config.yaml index 8cdca6f8..288ac9a2 100644 --- a/.docker/openobserve-otel/otel-collector-config.yaml +++ b/.docker/openobserve-otel/otel-collector-config.yaml @@ -25,16 +25,24 @@ exporters: Authorization: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" stream-name: default organization: default - tls: - insecure: true compression: gzip retry_on_failure: enabled: true initial_interval: 5s max_interval: 30s max_elapsed_time: 300s + timeout: 10s + +extensions: + health_check: + endpoint: 0.0.0.0:13133 + pprof: + endpoint: 0.0.0.0:1777 + zpages: + endpoint: 0.0.0.0:55679 service: + extensions: [ health_check, pprof, zpages ] pipelines: traces: receivers: [ otlp ] @@ -48,11 +56,8 @@ service: receivers: [ otlp, filelog ] processors: [ memory_limiter, batch ] exporters: [ otlphttp/openobserve ] - extensions: - health_check: - endpoint: 0.0.0.0:13133 - pprof: - endpoint: 0.0.0.0:1777 - zpages: - endpoint: 0.0.0.0:55679 - + telemetry: + logs: + level: "info" # Collector 日志级别 + metrics: + address: "0.0.0.0:8888" # Collector 自身指标暴露 diff --git a/.gitignore b/.gitignore index 46bfa8d4..d38ffae3 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ deploy/certs/* .env .rustfs.sys .cargo -profile.json \ No newline at end of file +profile.json +.docker/openobserve-otel/data \ No newline at end of file From 2fd5ef75cf650258d1681ba30bda4c31b369a5e3 Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 13:24:56 +0800 Subject: [PATCH 03/12] modify config for obs --- .docker/openobserve-otel/docker-compose.yml | 9 ++++--- .../otel-collector-config.yaml | 25 +++++++++++++++---- crates/obs/src/config.rs | 21 +++++++++++----- crates/obs/src/system/collector.rs | 4 +-- scripts/run.sh | 21 ++++++++-------- 5 files changed, 54 insertions(+), 26 deletions(-) diff --git a/.docker/openobserve-otel/docker-compose.yml b/.docker/openobserve-otel/docker-compose.yml index 68788aba..a4083bca 100644 --- a/.docker/openobserve-otel/docker-compose.yml +++ b/.docker/openobserve-otel/docker-compose.yml @@ -6,11 +6,12 @@ services: ZO_ROOT_USER_EMAIL: "root@rustfs.com" ZO_ROOT_USER_PASSWORD: "rustfs123" ZO_TRACING_HEADER_KEY: "Authorization" - ZO_TRACING_HEADER_VALUE: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" + ZO_TRACING_HEADER_VALUE: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ=" ZO_DATA_DIR: "/data" ZO_MEMORY_CACHE_ENABLED: "true" ZO_MEMORY_CACHE_MAX_SIZE: "256" RUST_LOG: "info" + TZ: Asia/Shanghai ports: - "5080:5080" - "5081:5081" @@ -44,9 +45,11 @@ services: - "13133:13133" # Health check - "1777:1777" # pprof - "55679:55679" # zpages + - "1888:1888" # Metrics + - "8888:8888" # Prometheus metrics + - "8889:8889" # Additional metrics endpoint depends_on: - openobserve: - condition: service_healthy + - openobserve networks: - otel-network deploy: diff --git a/.docker/openobserve-otel/otel-collector-config.yaml b/.docker/openobserve-otel/otel-collector-config.yaml index 288ac9a2..561692ad 100644 --- a/.docker/openobserve-otel/otel-collector-config.yaml +++ b/.docker/openobserve-otel/otel-collector-config.yaml @@ -20,9 +20,9 @@ processors: exporters: otlphttp/openobserve: - endpoint: http://openobserve:5080/api/default + endpoint: http://openobserve:5080/api/default # http://127.0.0.1:5080/api/default headers: - Authorization: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" + Authorization: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ=" stream-name: default organization: default compression: gzip @@ -32,6 +32,21 @@ exporters: max_interval: 30s max_elapsed_time: 300s timeout: 10s + otlp/openobserve: + endpoint: openobserve:5081 # http://127.0.0.1:5080/api/default + headers: + Authorization: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ=" + stream-name: default + organization: default + compression: gzip + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + timeout: 10s + tls: + insecure: true extensions: health_check: @@ -47,15 +62,15 @@ service: traces: receivers: [ otlp ] processors: [ memory_limiter, batch ] - exporters: [ otlphttp/openobserve ] + exporters: [ otlp/openobserve ] metrics: receivers: [ otlp ] processors: [ memory_limiter, batch ] - exporters: [ otlphttp/openobserve ] + exporters: [ otlp/openobserve ] logs: receivers: [ otlp, filelog ] processors: [ memory_limiter, batch ] - exporters: [ otlphttp/openobserve ] + exporters: [ otlp/openobserve ] telemetry: logs: level: "info" # Collector 日志级别 diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 770203de..9d6b0617 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -26,12 +26,18 @@ pub struct OtelConfig { /// Helper function: Extract observable configuration from environment variables fn extract_otel_config_from_env() -> OtelConfig { + let endpoint = env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()); + let mut use_stdout = env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(USE_STDOUT)); + if endpoint.is_empty() { + use_stdout = Some(true); + } + OtelConfig { - endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()), - use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(USE_STDOUT)), + endpoint, + use_stdout, sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO") .ok() .and_then(|v| v.parse().ok()) @@ -308,7 +314,10 @@ pub fn load_config(config_dir: Option) -> AppConfig { } else { // If no path provided, use current directory + default config file match env::current_dir() { - Ok(dir) => dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned(), + Ok(dir) => { + println!("Current directory: {:?}", dir); + dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned() + } Err(_) => { eprintln!("Warning: Failed to get current directory, using default config file"); DEFAULT_CONFIG_FILE.to_string() diff --git a/crates/obs/src/system/collector.rs b/crates/obs/src/system/collector.rs index c4184051..ea9bae3b 100644 --- a/crates/obs/src/system/collector.rs +++ b/crates/obs/src/system/collector.rs @@ -23,7 +23,7 @@ pub struct Collector { impl Collector { pub fn new(pid: Pid, meter: opentelemetry::metrics::Meter, interval_ms: u64) -> Result { - let mut system = System::new_all(); + let mut system = System::new(); let attributes = ProcessAttributes::new(pid, &mut system)?; let core_count = System::physical_core_count().ok_or(GlobalError::CoreCountError)?; let metrics = Metrics::new(&meter); @@ -52,7 +52,7 @@ impl Collector { fn collect(&mut self) -> Result<(), GlobalError> { self.system - .refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), true); + .refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), false); // refresh the network interface list and statistics self.networks.refresh(false); diff --git a/scripts/run.sh b/scripts/run.sh index 29605648..6d528ca2 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -37,18 +37,19 @@ export RUSTFS_CONSOLE_ADDRESS=":9002" # export RUSTFS_TLS_PATH="./deploy/certs" # 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一 -# export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" +# export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 -#export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 -#export RUSTFS__OBSERVABILITY__USE_STDOUT=false -#export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 -#export RUSTFS__OBSERVABILITY__METER_INTERVAL=31 -#export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs -#export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 -#export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop -#export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug -#export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true +export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 +#export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 +#export RUSTFS_OBSERVABILITY_USE_STDOUT=false +#export RUSTFS_OBSERVABILITY_SAMPLE_RATIO=2.0 +#export RUSTFS_OBSERVABILITY_METER_INTERVAL=31 +#export RUSTFS_OBSERVABILITY_SERVICE_NAME=rustfs +#export RUSTFS_OBSERVABILITY_SERVICE_VERSION=0.1.0 +#export RUSTFS_OBSERVABILITY_ENVIRONMENT=develop +#export RUSTFS_OBSERVABILITY_LOGGER_LEVEL=debug +#export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true # #export RUSTFS__SINKS_0__type=File #export RUSTFS__SINKS_0__path=./deploy/logs/rustfs.log From 8887b7ea90824ad5302c975b9fa94f3e1ffaa3ac Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 21:10:04 +0800 Subject: [PATCH 04/12] improve code for obs --- Cargo.lock | 275 +++++++++++++++++- Cargo.toml | 2 + crates/config/src/constants/app.rs | 55 +++- crates/obs/Cargo.toml | 4 +- crates/obs/src/config.rs | 223 +++++++-------- crates/obs/src/telemetry.rs | 436 ++++++++++++++++++++--------- rustfs/src/config/mod.rs | 4 +- rustfs/src/main.rs | 2 +- scripts/run.sh | 42 +-- 9 files changed, 755 insertions(+), 288 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85ccd7d5..fdc90b69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1745,6 +1745,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -3250,6 +3259,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.11" @@ -3364,6 +3383,27 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flexi_logger" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb03342077df16d5b1400d7bed00156882846d7a479ff61a6f10594bcc3423d8" +dependencies = [ + "chrono", + "crossbeam-channel", + "crossbeam-queue", + "log", + "notify-debouncer-mini", + "nu-ansi-term 0.50.1", + "regex", + "serde", + "serde_derive", + "thiserror 2.0.12", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "flume" version = "0.11.1" @@ -3439,6 +3479,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futf" version = "0.1.5" @@ -4466,6 +4515,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.9.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -4682,6 +4751,26 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kuchikiki" version = "0.8.2" @@ -4970,6 +5059,10 @@ name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +dependencies = [ + "serde", + "value-bag", +] [[package]] name = "longest-increasing-subsequence" @@ -5181,6 +5274,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -5350,6 +5444,43 @@ dependencies = [ "memchr", ] +[[package]] +name = "notify" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" +dependencies = [ + "bitflags 2.9.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.59.0", +] + +[[package]] +name = "notify-debouncer-mini" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a689eb4262184d9a1727f9087cd03883ea716682ab03ed24efec57d7716dccb8" +dependencies = [ + "log", + "notify", + "notify-types", + "tempfile", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "ntapi" version = "0.4.1" @@ -5369,6 +5500,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "nugine-rust-utils" version = "0.3.1" @@ -7464,8 +7604,8 @@ version = "0.0.1" dependencies = [ "async-trait", "chrono", - "config", - "local-ip-address", + "flexi_logger", + "nu-ansi-term 0.50.1", "nvml-wrapper", "opentelemetry", "opentelemetry-appender-tracing", @@ -7834,6 +7974,15 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "serde_fmt" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4" +dependencies = [ + "serde", +] + [[package]] name = "serde_json" version = "1.0.140" @@ -8395,6 +8544,84 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "sval" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc9739f56c5d0c44a5ed45473ec868af02eb896af8c05f616673a31e1d1bb09" + +[[package]] +name = "sval_buffer" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f39b07436a8c271b34dad5070c634d1d3d76d6776e938ee97b4a66a5e8003d0b" +dependencies = [ + "sval", + "sval_ref", +] + +[[package]] +name = "sval_dynamic" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffcb072d857431bf885580dacecf05ed987bac931230736739a79051dbf3499b" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_fmt" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f214f427ad94a553e5ca5514c95c6be84667cbc5568cce957f03f3477d03d5c" +dependencies = [ + "itoa 1.0.15", + "ryu", + "sval", +] + +[[package]] +name = "sval_json" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ed34b32e638dec9a99c8ac92d0aa1220d40041026b625474c2b6a4d6f4feb" +dependencies = [ + "itoa 1.0.15", + "ryu", + "sval", +] + +[[package]] +name = "sval_nested" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14bae8fcb2f24fee2c42c1f19037707f7c9a29a0cda936d2188d48a961c4bb2a" +dependencies = [ + "sval", + "sval_buffer", + "sval_ref", +] + +[[package]] +name = "sval_ref" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a4eaea3821d3046dcba81d4b8489421da42961889902342691fb7eab491d79e" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_serde" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "172dd4aa8cb3b45c8ac8f3b4111d644cd26938b0643ede8f93070812b87fb339" +dependencies = [ + "serde", + "sval", + "sval_nested", +] + [[package]] name = "syn" version = "1.0.109" @@ -9146,7 +9373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", - "nu-ansi-term", + "nu-ansi-term 0.46.0", "once_cell", "regex", "serde", @@ -9242,6 +9469,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.18.0" @@ -9410,6 +9643,42 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "value-bag" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" +dependencies = [ + "value-bag-serde1", + "value-bag-sval2", +] + +[[package]] +name = "value-bag-serde1" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35540706617d373b118d550d41f5dfe0b78a0c195dc13c6815e92e2638432306" +dependencies = [ + "erased-serde", + "serde", + "serde_fmt", +] + +[[package]] +name = "value-bag-sval2" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe7e140a2658cc16f7ee7a86e413e803fc8f9b5127adc8755c19f9fefa63a52" +dependencies = [ + "sval", + "sval_buffer", + "sval_dynamic", + "sval_fmt", + "sval_json", + "sval_ref", + "sval_serde", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 31b95436..581e370b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ derive_builder = "0.20.2" dioxus = { version = "0.6.3", features = ["router"] } dirs = "6.0.0" flatbuffers = "25.2.10" +flexi_logger = { version = "0.30.2", features = ["trc"] } futures = "0.3.31" futures-core = "0.3.31" futures-util = "0.3.31" @@ -105,6 +106,7 @@ mime = "0.3.17" mime_guess = "2.0.5" netif = "0.1.6" nix = { version = "0.30.1", features = ["fs"] } +nu-ansi-term = "0.50.1" num_cpus = { version = "1.16.0" } nvml-wrapper = "0.10.0" object_store = "0.11.2" diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index c4f22f93..e6baaba8 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -56,14 +56,13 @@ pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; /// Example: RUSTFS_SECRET_KEY=rustfsadmin /// Example: --secret-key rustfsadmin pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; -/// Default configuration file for observability -/// Default value: config/obs.toml -/// Environment variable: RUSTFS_OBS_CONFIG -/// Command line argument: --obs-config -/// Example: RUSTFS_OBS_CONFIG=config/obs.toml -/// Example: --obs-config config/obs.toml -/// Example: --obs-config /etc/rustfs/obs.toml -pub const DEFAULT_OBS_CONFIG: &str = "./deploy/config/obs.toml"; + +/// Default OBS configuration endpoint +/// Environment variable: DEFAULT_OBS_ENDPOINT +/// Command line argument: --obs-endpoint +/// Example: DEFAULT_OBS_ENDPOINT="http://localost:4317" +/// Example: --obs-endpoint http://localost:4317 +pub const DEFAULT_OBS_ENDPOINT: &str = ""; /// Default TLS key for rustfs /// This is the default key for TLS. @@ -90,6 +89,41 @@ pub const DEFAULT_CONSOLE_PORT: u16 = 9001; /// This is the default address for rustfs console. pub const DEFAULT_CONSOLE_ADDRESS: &str = concat!(":", DEFAULT_CONSOLE_PORT); +/// Default log filename for rustfs +/// This is the default log filename for rustfs. +/// It is used to store the logs of the application. +/// Default value: rustfs.log +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_FILENAME +pub const DEFAULT_LOG_FILENAME: &str = "rustfs.log"; + +/// Default log directory for rustfs +/// This is the default log directory for rustfs. +/// It is used to store the logs of the application. +/// Default value: logs +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_DIRECTORY +pub const DEFAULT_LOG_DIR: &str = "logs"; + +/// Default log rotation size mb for rustfs +/// This is the default log rotation size for rustfs. +/// It is used to rotate the logs of the application. +/// Default value: 100 MB +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB +pub const DEFAULT_LOG_ROTATION_SIZE_MB: u64 = 100; + +/// Default log rotation time for rustfs +/// This is the default log rotation time for rustfs. +/// It is used to rotate the logs of the application. +/// Default value: hour, eg: day,hour,minute,second +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_ROTATION_TIME +pub const DEFAULT_LOG_ROTATION_TIME: &str = "day"; + +/// Default log keep files for rustfs +/// This is the default log keep files for rustfs. +/// It is used to keep the logs of the application. +/// Default value: 30 +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_KEEP_FILES +pub const DEFAULT_LOG_KEEP_FILES: u16 = 30; + #[cfg(test)] mod tests { use super::*; @@ -154,10 +188,6 @@ mod tests { #[test] fn test_file_path_constants() { - // Test file path related constants - assert_eq!(DEFAULT_OBS_CONFIG, "./deploy/config/obs.toml"); - assert!(DEFAULT_OBS_CONFIG.ends_with(".toml"), "Config file should be TOML format"); - assert_eq!(RUSTFS_TLS_KEY, "rustfs_key.pem"); assert!(RUSTFS_TLS_KEY.ends_with(".pem"), "TLS key should be PEM format"); @@ -219,7 +249,6 @@ mod tests { ENVIRONMENT, DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, - DEFAULT_OBS_CONFIG, RUSTFS_TLS_KEY, RUSTFS_TLS_CERT, DEFAULT_ADDRESS, diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 12c9831e..e2265103 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -20,8 +20,8 @@ kafka = ["dep:rdkafka"] rustfs-config = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } -config = { workspace = true } -local-ip-address = { workspace = true } +flexi_logger = { workspace = true, features = ["trc", "kv"] } +nu-ansi-term = { workspace = true } nvml-wrapper = { workspace = true, optional = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 9d6b0617..b1f4850d 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -1,5 +1,7 @@ -use config::{Config, File, FileFormat}; -use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; +use rustfs_config::{ + APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_FILENAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, + DEFAULT_LOG_ROTATION_TIME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, +}; use serde::{Deserialize, Serialize}; use std::env; @@ -22,60 +24,94 @@ pub struct OtelConfig { pub environment: Option, // Environment pub logger_level: Option, // Logger level pub local_logging_enabled: Option, // Local logging enabled -} - -/// Helper function: Extract observable configuration from environment variables -fn extract_otel_config_from_env() -> OtelConfig { - let endpoint = env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()); - let mut use_stdout = env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(USE_STDOUT)); - if endpoint.is_empty() { - use_stdout = Some(true); - } - - OtelConfig { - endpoint, - use_stdout, - sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(SAMPLE_RATIO)), - meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(METER_INTERVAL)), - service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(APP_NAME.to_string())), - service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(SERVICE_VERSION.to_string())), - environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(ENVIRONMENT.to_string())), - logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_LOG_LEVEL.to_string())), - local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(false)), - } + // 新增 flexi_logger 相关配置 + pub log_directory: Option, // 日志文件目录 + pub log_filename: Option, // 日志文件名称 + pub log_rotation_size_mb: Option, // 日志文件大小切割阈值 (MB) + pub log_rotation_time: Option, // 日志按时间切割 (Hour, Day) + pub log_keep_files: Option, // 保留日志文件数量 } impl OtelConfig { + /// Helper function: Extract observable configuration from environment variables + pub fn extract_otel_config_from_env(endpoint: Option) -> OtelConfig { + let endpoint = if let Some(endpoint) = endpoint { + if endpoint.is_empty() { + env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()) + } else { + endpoint + } + } else { + env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()) + }; + let mut use_stdout = env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(USE_STDOUT)); + if endpoint.is_empty() { + use_stdout = Some(true); + } + + OtelConfig { + endpoint, + use_stdout, + sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SAMPLE_RATIO)), + meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(METER_INTERVAL)), + service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(APP_NAME.to_string())), + service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SERVICE_VERSION.to_string())), + environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(ENVIRONMENT.to_string())), + logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_LEVEL.to_string())), + local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(false)), + log_directory: env::var("RUSTFS_OBSERVABILITY_LOG_DIRECTORY") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_DIR.to_string())), + log_filename: env::var("RUSTFS_OBSERVABILITY_LOG_FILENAME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_FILENAME.to_string())), + log_rotation_size_mb: env::var("RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB + log_rotation_time: env::var("RUSTFS_OBSERVABILITY_LOG_ROTATION_TIME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day" + log_keep_files: env::var("RUSTFS_OBSERVABILITY_LOG_KEEP_FILES") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files + } + } + /// Create a new instance of OtelConfig with default values /// /// # Returns /// A new instance of OtelConfig pub fn new() -> Self { - extract_otel_config_from_env() + Self::extract_otel_config_from_env(None) } } @@ -128,6 +164,12 @@ pub struct WebhookSinkConfig { impl WebhookSinkConfig { pub fn new() -> Self { + Self::default() + } +} + +impl Default for WebhookSinkConfig { + fn default() -> Self { Self { endpoint: env::var("RUSTFS_SINKS_WEBHOOK_ENDPOINT") .ok() @@ -143,12 +185,6 @@ impl WebhookSinkConfig { } } -impl Default for WebhookSinkConfig { - fn default() -> Self { - Self::new() - } -} - /// File Sink Configuration - Add buffering parameters #[derive(Debug, Deserialize, Serialize, Clone)] pub struct FileSinkConfig { @@ -166,7 +202,6 @@ impl FileSinkConfig { eprintln!("Failed to create log directory: {}", e); return "rustfs/rustfs.log".to_string(); } - println!("Using log directory: {:?}", temp_dir); temp_dir .join("rustfs.log") .to_str() @@ -179,9 +214,18 @@ impl FileSinkConfig { .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(Self::get_default_log_path), - buffer_size: Some(8192), - flush_interval_ms: Some(1000), - flush_threshold: Some(100), + buffer_size: env::var("RUSTFS_SINKS_FILE_BUFFER_SIZE") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(8192)), + flush_interval_ms: env::var("RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(1000)), + flush_threshold: env::var("RUSTFS_SINKS_FILE_FLUSH_THRESHOLD") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(100)), } } } @@ -266,6 +310,14 @@ impl AppConfig { logger: Some(LoggerConfig::default()), } } + + pub fn new_with_endpoint(endpoint: Option) -> Self { + Self { + observability: OtelConfig::extract_otel_config_from_env(endpoint), + sinks: vec![SinkConfig::new()], + logger: Some(LoggerConfig::new()), + } + } } // implement default for AppConfig @@ -275,9 +327,6 @@ impl Default for AppConfig { } } -/// Default configuration file name -const DEFAULT_CONFIG_FILE: &str = "obs"; - /// Loading the configuration file /// Supports TOML, YAML and .env formats, read in order by priority /// @@ -294,54 +343,6 @@ const DEFAULT_CONFIG_FILE: &str = "obs"; /// /// let config = load_config(None); /// ``` -pub fn load_config(config_dir: Option) -> AppConfig { - let config_dir = if let Some(path) = config_dir { - // If a path is provided, check if it's empty - if path.is_empty() { - // If empty, use the default config file name - DEFAULT_CONFIG_FILE.to_string() - } else { - // Use the provided path - let path = std::path::Path::new(&path); - if path.extension().is_some() { - // If path has extension, use it as is (extension will be added by Config::builder) - path.with_extension("").to_string_lossy().into_owned() - } else { - // If path is a directory, append the default config file name - path.to_string_lossy().into_owned() - } - } - } else { - // If no path provided, use current directory + default config file - match env::current_dir() { - Ok(dir) => { - println!("Current directory: {:?}", dir); - dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned() - } - Err(_) => { - eprintln!("Warning: Failed to get current directory, using default config file"); - DEFAULT_CONFIG_FILE.to_string() - } - } - }; - - // Log using proper logging instead of println when possible - println!("Using config file base: {}", config_dir); - - let app_config = Config::builder() - .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false)) - .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false)) - .build() - .unwrap_or_default(); - - match app_config.try_deserialize::() { - Ok(app_config) => { - println!("Parsed AppConfig: {:?}", app_config); - app_config - } - Err(e) => { - println!("Failed to deserialize config: {}", e); - AppConfig::default() - } - } +pub fn load_config(config: Option) -> AppConfig { + AppConfig::new_with_endpoint(config) } diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index d4ccaa82..cb982240 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,4 +1,7 @@ +// Added flexi_logger related dependencies use crate::OtelConfig; +use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode}; +use nu_ansi_term::Color; use opentelemetry::trace::TracerProvider; use opentelemetry::{global, KeyValue}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; @@ -13,7 +16,9 @@ use opentelemetry_semantic_conventions::{ attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, SCHEMA_URL, }; -use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; +use rustfs_config::{ + APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, +}; use rustfs_utils::get_local_ip_with_default; use smallvec::SmallVec; use std::borrow::Cow; @@ -47,23 +52,44 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte /// // When it's dropped, all telemetry components are properly shut down /// drop(otel_guard); /// ``` -#[derive(Debug)] +// Implement Debug trait correctly, rather than using derive, as some fields may not have implemented Debug pub struct OtelGuard { - tracer_provider: SdkTracerProvider, - meter_provider: SdkMeterProvider, - logger_provider: SdkLoggerProvider, + tracer_provider: Option, + meter_provider: Option, + logger_provider: Option, + // Add a flexi_logger handle to keep the logging alive + _flexi_logger_handles: Option, +} + +// Implement debug manually and avoid relying on all fields to implement debug +impl std::fmt::Debug for OtelGuard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OtelGuard") + .field("tracer_provider", &self.tracer_provider.is_some()) + .field("meter_provider", &self.meter_provider.is_some()) + .field("logger_provider", &self.logger_provider.is_some()) + .field("_flexi_logger_handles", &self._flexi_logger_handles.is_some()) + .finish() + } } impl Drop for OtelGuard { fn drop(&mut self) { - if let Err(err) = self.tracer_provider.shutdown() { - eprintln!("Tracer shutdown error: {:?}", err); + if let Some(provider) = self.tracer_provider.take() { + if let Err(err) = provider.shutdown() { + eprintln!("Tracer shutdown error: {:?}", err); + } } - if let Err(err) = self.meter_provider.shutdown() { - eprintln!("Meter shutdown error: {:?}", err); + + if let Some(provider) = self.meter_provider.take() { + if let Err(err) = provider.shutdown() { + eprintln!("Meter shutdown error: {:?}", err); + } } - if let Err(err) = self.logger_provider.shutdown() { - eprintln!("Logger shutdown error: {:?}", err); + if let Some(provider) = self.logger_provider.take() { + if let Err(err) = provider.shutdown() { + eprintln!("Logger shutdown error: {:?}", err); + } } } } @@ -106,156 +132,247 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { let service_name = config.service_name.as_deref().unwrap_or(APP_NAME); let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT); - // Pre-create resource objects to avoid repeated construction - let res = resource(config); + // Configure flexi_logger to cut by time and size + let mut flexi_logger_handle = None; + if !endpoint.is_empty() { + // Pre-create resource objects to avoid repeated construction + let res = resource(config); - // initialize tracer provider - let tracer_provider = { - let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); - let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 { - Sampler::TraceIdRatioBased(sample_ratio) - } else { - Sampler::AlwaysOn + // initialize tracer provider + let tracer_provider = { + let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); + let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 { + Sampler::TraceIdRatioBased(sample_ratio) + } else { + Sampler::AlwaysOn + }; + + let builder = SdkTracerProvider::builder() + .with_sampler(sampler) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(res.clone()); + + let tracer_provider = if endpoint.is_empty() { + builder + .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + .build() + } else { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .build() + .unwrap(); + + let builder = if use_stdout { + builder + .with_batch_exporter(exporter) + .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + } else { + builder.with_batch_exporter(exporter) + }; + + builder.build() + }; + + global::set_tracer_provider(tracer_provider.clone()); + tracer_provider }; - let builder = SdkTracerProvider::builder() - .with_sampler(sampler) - .with_id_generator(RandomIdGenerator::default()) - .with_resource(res.clone()); + // initialize meter provider + let meter_provider = { + let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); - let tracer_provider = if endpoint.is_empty() { - builder - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) - .build() - } else { - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - .unwrap(); - - let builder = if use_stdout { - builder - .with_batch_exporter(exporter) - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + if endpoint.is_empty() { + builder = builder.with_reader(create_periodic_reader(meter_interval)); } else { - builder.with_batch_exporter(exporter) - }; + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build() + .unwrap(); + + builder = builder.with_reader( + PeriodicReader::builder(exporter) + .with_interval(std::time::Duration::from_secs(meter_interval)) + .build(), + ); + + if use_stdout { + builder = builder.with_reader(create_periodic_reader(meter_interval)); + } + } + + let meter_provider = builder.build(); + global::set_meter_provider(meter_provider.clone()); + meter_provider + }; + + // initialize logger provider + let logger_provider = { + let mut builder = SdkLoggerProvider::builder().with_resource(res); + + if endpoint.is_empty() { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + } else { + let exporter = opentelemetry_otlp::LogExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .build() + .unwrap(); + + builder = builder.with_batch_exporter(exporter); + + if use_stdout { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + } + } builder.build() }; - global::set_tracer_provider(tracer_provider.clone()); - tracer_provider - }; + // configuring tracing + { + // configure the formatting layer + let fmt_layer = { + let enable_color = std::io::stdout().is_terminal(); + let mut layer = tracing_subscriber::fmt::layer() + .with_target(true) + .with_ansi(enable_color) + .with_thread_names(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true); - // initialize meter provider - let meter_provider = { - let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); + // Only add full span events tracking in the development environment + if environment != ENVIRONMENT { + layer = layer.with_span_events(FmtSpan::FULL); + } - if endpoint.is_empty() { - builder = builder.with_reader(create_periodic_reader(meter_interval)); - } else { - let exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) - .build() - .unwrap(); + layer.with_filter(build_env_filter(logger_level, None)) + }; - builder = builder.with_reader( - PeriodicReader::builder(exporter) - .with_interval(std::time::Duration::from_secs(meter_interval)) - .build(), - ); + let filter = build_env_filter(logger_level, None); + let otel_filter = build_env_filter(logger_level, None); + let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter); + let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); - if use_stdout { - builder = builder.with_reader(create_periodic_reader(meter_interval)); + // Configure registry to avoid repeated calls to filter methods + tracing_subscriber::registry() + .with(filter) + .with(ErrorLayer::default()) + .with(if config.local_logging_enabled.unwrap_or(false) { + Some(fmt_layer) + } else { + None + }) + .with(OpenTelemetryLayer::new(tracer)) + .with(otel_layer) + .with(MetricsLayer::new(meter_provider.clone())) + .init(); + + if !endpoint.is_empty() { + info!( + "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", + endpoint, + logger_level, + std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) + ); } } - let meter_provider = builder.build(); - global::set_meter_provider(meter_provider.clone()); - meter_provider - }; - - // initialize logger provider - let logger_provider = { - let mut builder = SdkLoggerProvider::builder().with_resource(res); - - if endpoint.is_empty() { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); - } else { - let exporter = opentelemetry_otlp::LogExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - .unwrap(); - - builder = builder.with_batch_exporter(exporter); - - if use_stdout { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); - } + OtelGuard { + tracer_provider: Some(tracer_provider), + meter_provider: Some(meter_provider), + logger_provider: Some(logger_provider), + _flexi_logger_handles: flexi_logger_handle, } + } else { + // Obtain the log directory and file name configuration + let log_directory = config.log_directory.as_deref().unwrap_or("logs"); + let log_filename = config.log_filename.as_deref().unwrap_or(service_name); - builder.build() - }; - - // configuring tracing - { - // configure the formatting layer - let fmt_layer = { - let enable_color = std::io::stdout().is_terminal(); - let mut layer = tracing_subscriber::fmt::layer() - .with_target(true) - .with_ansi(enable_color) - .with_thread_names(true) - .with_thread_ids(true) - .with_file(true) - .with_line_number(true); - - // Only add full span events tracking in the development environment - if environment != ENVIRONMENT { - layer = layer.with_span_events(FmtSpan::FULL); + // Build log cutting conditions + let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { + // Cut by time and size at the same time + (Some(time), Some(size)) => { + let age = match time.to_lowercase().as_str() { + "hour" => Age::Hour, + "day" => Age::Day, + "minute" => Age::Minute, + "second" => Age::Second, + _ => Age::Day, // The default is by day + }; + Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes } - - layer.with_filter(build_env_filter(logger_level, None)) + // Cut by time only + (Some(time), None) => { + let age = match time.to_lowercase().as_str() { + "hour" => Age::Hour, + "day" => Age::Day, + "minute" => Age::Minute, + "second" => Age::Second, + _ => Age::Day, // The default is by day + }; + Criterion::Age(age) + } + // Cut by size only + (None, Some(size)) => { + Criterion::Size(size * 1024 * 1024) // Convert to bytes + } + // By default, it is cut by the day + _ => Criterion::Age(Age::Day), }; - let filter = build_env_filter(logger_level, None); - let otel_filter = build_env_filter(logger_level, None); - let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter); - let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); + // The number of log files retained + let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES); - // Configure registry to avoid repeated calls to filter methods - tracing_subscriber::registry() - .with(filter) - .with(ErrorLayer::default()) - .with(if config.local_logging_enabled.unwrap_or(false) { - Some(fmt_layer) - } else { - None - }) - .with(OpenTelemetryLayer::new(tracer)) - .with(otel_layer) - .with(MetricsLayer::new(meter_provider.clone())) - .init(); + // Parsing the log level + let log_spec = LogSpecification::parse(logger_level).unwrap_or(LogSpecification::info()); - if !endpoint.is_empty() { - info!( - "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", - endpoint, - logger_level, - std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) - ); + // Configure the flexi_logger + let flexi_logger_result = flexi_logger::Logger::with(log_spec) + .log_to_file( + FileSpec::default() + .directory(log_directory) + .basename(log_filename) + .suffix("log"), + ) + .rotate(rotation_criterion, Naming::Timestamps, Cleanup::KeepLogFiles(keep_files.into())) + .format_for_files(format_for_file) // Add a custom formatting function for file output + .duplicate_to_stdout(flexi_logger::Duplicate::Info) + .format_for_stdout(format_with_color) // Add a custom formatting function for terminal output + .write_mode(WriteMode::Async) + .start(); + + if let Ok(logger) = flexi_logger_result { + // Save the logger handle to keep the logging + flexi_logger_handle = Some(logger); + + info!("Flexi logger initialized with file logging to {}/{}.log", log_directory, log_filename); + + // Log logging of log cutting conditions + match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { + (Some(time), Some(size)) => info!( + "Log rotation configured for: every {} or when size exceeds {}MB, keeping {} files", + time, size, keep_files + ), + (Some(time), None) => info!("Log rotation configured for: every {}, keeping {} files", time, keep_files), + (None, Some(size)) => { + info!("Log rotation configured for: when size exceeds {}MB, keeping {} files", size, keep_files) + } + _ => info!("Log rotation configured for: daily, keeping {} files", keep_files), + } + } else { + eprintln!("Failed to initialize flexi_logger: {:?}", flexi_logger_result.err()); } - } - OtelGuard { - tracer_provider, - meter_provider, - logger_provider, + OtelGuard { + tracer_provider: None, + meter_provider: None, + logger_provider: None, + _flexi_logger_handles: flexi_logger_handle, + } } } @@ -271,3 +388,50 @@ fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilte filter } + +/// Custom Log Formatter Function - Terminal Output (with Color) +fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> { + let level = record.level(); + let level_style = style(level); + + // Get the current thread information + let binding = std::thread::current(); + let thread_name = binding.name().unwrap_or("unnamed"); + let thread_id = format!("{:?}", std::thread::current().id()); + + write!( + w, + "{} {} [{}] [{}:{}] [{} {}] {}\n", + now.now().format("%Y-%m-%d %H:%M:%S%.6f"), + level_style.paint(level.to_string()), + Color::Magenta.paint(record.target()), + Color::Blue.paint(record.file().unwrap_or("unknown")), + Color::Blue.paint(record.line().unwrap_or(0).to_string()), + Color::Green.paint(thread_name), + Color::Green.paint(thread_id), + record.args() + ) +} + +/// Custom Log Formatter - File Output (No Color) +fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> { + let level = record.level(); + + // Get the current thread information + let binding = std::thread::current(); + let thread_name = binding.name().unwrap_or("unnamed"); + let thread_id = format!("{:?}", std::thread::current().id()); + + write!( + w, + "{} {} [{}] [{}:{}] [{} {}] {}\n", + now.now().format("%Y-%m-%d %H:%M:%S%.6f"), + level.to_string(), + record.target(), + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + thread_name, + thread_id, + record.args() + ) +} diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index a93b6dda..a720221f 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -64,8 +64,8 @@ pub struct Opt { /// Observability configuration file /// Default value: config/obs.toml - #[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")] - pub obs_config: String, + #[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_ENDPOINT.to_string(), env = "RUSTFS_OBS_ENDPOINT")] + pub obs_endpoint: String, /// tls path for rustfs api and console. #[arg(long, env = "RUSTFS_TLS_PATH")] diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e4df1e5f..cefdfcf9 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -103,7 +103,7 @@ async fn main() -> Result<()> { init_license(opt.license.clone()); // Load the configuration file - let config = load_config(Some(opt.clone().obs_config)); + let config = load_config(Some(opt.clone().obs_endpoint)); // Initialize Observability let (_logger, guard) = init_obs(config.clone()).await; diff --git a/scripts/run.sh b/scripts/run.sh index 6d528ca2..0df0cdfb 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -31,16 +31,16 @@ export RUSTFS_VOLUMES="./target/volume/test{0...4}" # export RUSTFS_VOLUMES="./target/volume/test" export RUSTFS_ADDRESS=":9000" export RUSTFS_CONSOLE_ENABLE=true -export RUSTFS_CONSOLE_ADDRESS=":9002" +export RUSTFS_CONSOLE_ADDRESS=":9001" # export RUSTFS_SERVER_DOMAINS="localhost:9000" # HTTPS 证书目录 # export RUSTFS_TLS_PATH="./deploy/certs" # 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一 -# export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" +# export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 -export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 +#export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 #export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 #export RUSTFS_OBSERVABILITY_USE_STDOUT=false #export RUSTFS_OBSERVABILITY_SAMPLE_RATIO=2.0 @@ -49,27 +49,29 @@ export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 #export RUSTFS_OBSERVABILITY_SERVICE_VERSION=0.1.0 #export RUSTFS_OBSERVABILITY_ENVIRONMENT=develop #export RUSTFS_OBSERVABILITY_LOGGER_LEVEL=debug -#export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true +export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true +#export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=false # Whether to enable local logging +export RUSTFS_OBSERVABILITY_LOG_DIRECTORY="./deploy/logs" # Log directory +export RUSTFS_OBSERVABILITY_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day" +export RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB + # -#export RUSTFS__SINKS_0__type=File -#export RUSTFS__SINKS_0__path=./deploy/logs/rustfs.log -#export RUSTFS__SINKS_0__buffer_size=12 -#export RUSTFS__SINKS_0__flush_interval_ms=1000 -#export RUSTFS__SINKS_0__flush_threshold=100 +#export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log +#export RUSTFS_SINKS_FILE_BUFFER_SIZE=12 +#export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000 +#export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100 # -#export RUSTFS__SINKS_1__type=Kakfa -#export RUSTFS__SINKS_1__brokers=localhost:9092 -#export RUSTFS__SINKS_1__topic=logs -#export RUSTFS__SINKS_1__batch_size=100 -#export RUSTFS__SINKS_1__batch_timeout_ms=1000 +#export RUSTFS_SINKS_KAKFA_BROKERS=localhost:9092 +#export RUSTFS_SINKS_KAKFA_TOPIC=logs +#export RUSTFS_SINKS_KAKFA_BATCH_SIZE=100 +#export RUSTFS_SINKS_KAKFA_BATCH_TIMEOUT_MS=1000 # -#export RUSTFS__SINKS_2__type=Webhook -#export RUSTFS__SINKS_2__endpoint=http://localhost:8080/webhook -#export RUSTFS__SINKS_2__auth_token=you-auth-token -#export RUSTFS__SINKS_2__batch_size=100 -#export RUSTFS__SINKS_2__batch_timeout_ms=1000 +#export RUSTFS_SINKS_WEBHOOK_ENDPOINT=http://localhost:8080/webhook +#export RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN=you-auth-token +#export RUSTFS_SINKS_WEBHOOK_BATCH_SIZE=100 +#export RUSTFS_SINKS_WEBHOOK_BATCH_TIMEOUT_MS=1000 # -#export RUSTFS__LOGGER__QUEUE_CAPACITY=10 +#export RUSTFS_LOGGER_QUEUE_CAPACITY=10 export OTEL_INSTRUMENTATION_NAME="rustfs" export OTEL_INSTRUMENTATION_VERSION="0.1.1" From d2857467e05bd11f3e311558ff8ec35426c99867 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Wed, 28 May 2025 07:36:03 +0000 Subject: [PATCH 05/12] improve run_data_scanner Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/heal/data_scanner.rs | 206 ++++++++++++++++++++++--------- 1 file changed, 145 insertions(+), 61 deletions(-) diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 627f044e..0e3f2107 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -143,92 +143,176 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn } } +/// Initialize and start the data scanner in the background +/// +/// This function starts a background task that continuously runs the data scanner +/// with randomized intervals between cycles to avoid resource contention. +/// +/// # Features +/// - Graceful shutdown support via cancellation token +/// - Randomized sleep intervals to prevent synchronized scanning across nodes +/// - Minimum sleep duration to avoid excessive CPU usage +/// - Proper error handling and logging +/// +/// # Architecture +/// 1. Initialize with random seed for sleep intervals +/// 2. Run scanner cycles in a loop +/// 3. Use randomized sleep between cycles to avoid thundering herd +/// 4. Ensure minimum sleep duration to prevent CPU thrashing pub async fn init_data_scanner() { + info!("Initializing data scanner background task"); + tokio::spawn(async move { loop { + // Run the data scanner run_data_scanner().await; - let random = { - let mut r = rand::thread_rng(); - r.gen_range(0.0..1.0) + + // Calculate randomized sleep duration + // Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration + let random_factor: f64 = { + let mut rng = rand::thread_rng(); + rng.gen_range(0.0..1.0) }; - let duration = Duration::from_secs_f64(random * (SCANNER_CYCLE.load(Ordering::SeqCst) as f64)); - let sleep_duration = if duration < Duration::new(1, 0) { - Duration::new(1, 0) + let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64; + let sleep_duration_secs = random_factor * base_cycle_duration; + + // Ensure minimum sleep duration of 1 second to avoid high CPU usage + let sleep_duration = if sleep_duration_secs < 1.0 { + Duration::from_secs(1) } else { - duration + Duration::from_secs_f64(sleep_duration_secs) }; - info!("data scanner will sleeping {sleep_duration:?}"); + info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle"); + + // Sleep with the calculated duration sleep(sleep_duration).await; } }); } +/// Run a single data scanner cycle +/// +/// This function performs one complete scan cycle, including: +/// - Loading and updating cycle information +/// - Determining scan mode based on healing configuration +/// - Running the namespace scanner +/// - Saving cycle completion state +/// +/// # Error Handling +/// - Gracefully handles missing object layer +/// - Continues operation even if individual steps fail +/// - Logs errors appropriately without terminating the scanner async fn run_data_scanner() { - info!("run_data_scanner"); + info!("Starting data scanner cycle"); + + // Get the object layer, return early if not available let Some(store) = new_object_layer_fn() else { - error!("errServerNotInitialized"); + error!("Object layer not initialized, skipping scanner cycle"); return; }; + // Load current cycle information from persistent storage let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) .await - .map_or(Vec::new(), |buf| buf); - - let mut buf_t = Deserializer::new(Cursor::new(buf)); - - let mut cycle_info: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap_or_default(); - - loop { - let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); - cycle_info.current = cycle_info.next; - cycle_info.started = Utc::now(); - { - globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - } - - let bg_heal_info = read_background_heal_info(store.clone()).await; - let scan_mode = - get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; - if bg_heal_info.current_scan_mode != scan_mode { - let mut new_heal_info = bg_heal_info; - new_heal_info.current_scan_mode = scan_mode; - if scan_mode == HEAL_DEEP_SCAN { - new_heal_info.bitrot_start_time = SystemTime::now(); - new_heal_info.bitrot_start_cycle = cycle_info.current; - } - save_background_heal_info(store.clone(), &new_heal_info).await; - } - // Wait before starting next cycle and wait on startup. - let (tx, rx) = mpsc::channel(100); - tokio::spawn(async { - store_data_usage_in_backend(rx).await; + .unwrap_or_else(|err| { + info!(error = %err, "Failed to read cycle info, starting fresh"); + Vec::new() }); - let mut res = HashMap::new(); - res.insert("cycle".to_string(), cycle_info.current.to_string()); - info!("start ns_scanner"); - match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { - Ok(_) => { - info!("ns_scanner completed"); - cycle_info.next += 1; - cycle_info.current = 0; - cycle_info.cycle_completed.push(Utc::now()); - if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { - let _ = cycle_info.cycle_completed.remove(0); - } - globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - let mut wr = Vec::new(); - cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, wr).await; + + let mut cycle_info = if buf.is_empty() { + CurrentScannerCycle::default() + } else { + let mut buf_cursor = Deserializer::new(Cursor::new(buf)); + Deserialize::deserialize(&mut buf_cursor).unwrap_or_else(|err| { + error!(error = %err, "Failed to deserialize cycle info, using default"); + CurrentScannerCycle::default() + }) + }; + + // Start metrics collection for this cycle + let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); + + // Update cycle information + cycle_info.current = cycle_info.next; + cycle_info.started = Utc::now(); + + // Update global scanner metrics + { + globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; + } + + // Read background healing information and determine scan mode + let bg_heal_info = read_background_heal_info(store.clone()).await; + let scan_mode = + get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; + + // Update healing info if scan mode changed + if bg_heal_info.current_scan_mode != scan_mode { + let mut new_heal_info = bg_heal_info; + new_heal_info.current_scan_mode = scan_mode; + if scan_mode == HEAL_DEEP_SCAN { + new_heal_info.bitrot_start_time = SystemTime::now(); + new_heal_info.bitrot_start_cycle = cycle_info.current; + } + save_background_heal_info(store.clone(), &new_heal_info).await; + } + + // Set up data usage storage channel + let (tx, rx) = mpsc::channel(100); + tokio::spawn(async move { + let _ = store_data_usage_in_backend(rx).await; + }); + + // Prepare result tracking + let mut scan_result = HashMap::new(); + scan_result.insert("cycle".to_string(), cycle_info.current.to_string()); + + info!( + cycle = cycle_info.current, + scan_mode = ?scan_mode, + "Starting namespace scanner" + ); + + // Run the namespace scanner + match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { + Ok(_) => { + info!(cycle = cycle_info.current, "Namespace scanner completed successfully"); + + // Update cycle completion information + cycle_info.next += 1; + cycle_info.current = 0; + cycle_info.cycle_completed.push(Utc::now()); + + // Maintain cycle completion history (keep only recent cycles) + if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { + let _ = cycle_info.cycle_completed.remove(0); } - Err(err) => { - info!("ns_scanner failed: {:?}", err); - res.insert("error".to_string(), err.to_string()); + + // Update global metrics with completion info + globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; + + // Persist updated cycle information + // ignore error, continue. + let mut serialized_data = Vec::new(); + if let Err(err) = cycle_info.serialize(&mut Serializer::new(&mut serialized_data)) { + error!(error = %err, "Failed to serialize cycle info"); + } else if let Err(err) = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, serialized_data).await { + error!(error = %err, "Failed to save cycle info to storage"); } } - stop_fn(&res).await; - sleep(Duration::from_secs(SCANNER_CYCLE.load(Ordering::SeqCst))).await; + Err(err) => { + error!( + cycle = cycle_info.current, + error = %err, + "Namespace scanner failed" + ); + scan_result.insert("error".to_string(), err.to_string()); + } } + + // Complete metrics collection for this cycle + stop_fn(&scan_result).await; } #[derive(Debug, Serialize, Deserialize)] From 8dfb3643ec8174a86e8fe1fa3ba41e602b556ddb Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Wed, 28 May 2025 10:17:41 +0000 Subject: [PATCH 06/12] improve scanner metric Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/heal/data_scanner.rs | 55 ++- ecstore/src/heal/data_scanner_metric.rs | 570 ++++++++++++++++-------- ecstore/src/metrics_realtime.rs | 2 +- 3 files changed, 419 insertions(+), 208 deletions(-) diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 0e3f2107..7e9f5f9d 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -144,16 +144,16 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn } /// Initialize and start the data scanner in the background -/// +/// /// This function starts a background task that continuously runs the data scanner /// with randomized intervals between cycles to avoid resource contention. -/// +/// /// # Features /// - Graceful shutdown support via cancellation token /// - Randomized sleep intervals to prevent synchronized scanning across nodes /// - Minimum sleep duration to avoid excessive CPU usage /// - Proper error handling and logging -/// +/// /// # Architecture /// 1. Initialize with random seed for sleep intervals /// 2. Run scanner cycles in a loop @@ -161,12 +161,12 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn /// 4. Ensure minimum sleep duration to prevent CPU thrashing pub async fn init_data_scanner() { info!("Initializing data scanner background task"); - + tokio::spawn(async move { loop { // Run the data scanner run_data_scanner().await; - + // Calculate randomized sleep duration // Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration let random_factor: f64 = { @@ -175,7 +175,7 @@ pub async fn init_data_scanner() { }; let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64; let sleep_duration_secs = random_factor * base_cycle_duration; - + // Ensure minimum sleep duration of 1 second to avoid high CPU usage let sleep_duration = if sleep_duration_secs < 1.0 { Duration::from_secs(1) @@ -183,8 +183,11 @@ pub async fn init_data_scanner() { Duration::from_secs_f64(sleep_duration_secs) }; - info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle"); - + info!( + duration_secs = sleep_duration.as_secs(), + "Data scanner sleeping before next cycle" + ); + // Sleep with the calculated duration sleep(sleep_duration).await; } @@ -192,20 +195,20 @@ pub async fn init_data_scanner() { } /// Run a single data scanner cycle -/// +/// /// This function performs one complete scan cycle, including: /// - Loading and updating cycle information /// - Determining scan mode based on healing configuration /// - Running the namespace scanner /// - Saving cycle completion state -/// +/// /// # Error Handling /// - Gracefully handles missing object layer /// - Continues operation even if individual steps fail /// - Logs errors appropriately without terminating the scanner async fn run_data_scanner() { info!("Starting data scanner cycle"); - + // Get the object layer, return early if not available let Some(store) = new_object_layer_fn() else { error!("Object layer not initialized, skipping scanner cycle"); @@ -232,20 +235,22 @@ async fn run_data_scanner() { // Start metrics collection for this cycle let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); - + // Update cycle information cycle_info.current = cycle_info.next; cycle_info.started = Utc::now(); // Update global scanner metrics - { - globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - } + globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; // Read background healing information and determine scan mode let bg_heal_info = read_background_heal_info(store.clone()).await; - let scan_mode = - get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; + let scan_mode = get_cycle_scan_mode( + cycle_info.current, + bg_heal_info.bitrot_start_cycle, + bg_heal_info.bitrot_start_time, + ) + .await; // Update healing info if scan mode changed if bg_heal_info.current_scan_mode != scan_mode { @@ -275,22 +280,26 @@ async fn run_data_scanner() { ); // Run the namespace scanner - match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { + match store + .clone() + .ns_scanner(tx, cycle_info.current as usize, scan_mode) + .await + { Ok(_) => { info!(cycle = cycle_info.current, "Namespace scanner completed successfully"); - + // Update cycle completion information cycle_info.next += 1; cycle_info.current = 0; cycle_info.cycle_completed.push(Utc::now()); - + // Maintain cycle completion history (keep only recent cycles) if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { let _ = cycle_info.cycle_completed.remove(0); } // Update global metrics with completion info - globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; + globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; // Persist updated cycle information // ignore error, continue. @@ -312,7 +321,7 @@ async fn run_data_scanner() { } // Complete metrics collection for this cycle - stop_fn(&scan_result).await; + stop_fn(&scan_result); } #[derive(Debug, Serialize, Deserialize)] @@ -560,7 +569,7 @@ impl ScannerItem { pub async fn apply_actions(&self, oi: &ObjectInfo, _size_s: &SizeSummary) -> (bool, usize) { let done = ScannerMetrics::time(ScannerMetric::Ilm); //todo: lifecycle - done().await; + done(); (false, oi.size) } diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index dedd4417..44fe8550 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -1,30 +1,30 @@ use chrono::Utc; -use common::globals::GLOBAL_Local_Node_Name; use common::last_minute::{AccElem, LastMinuteLatency}; use lazy_static::lazy_static; use madmin::metrics::ScannerMetrics as M_ScannerMetrics; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::AtomicU64; -use std::sync::Once; -use std::time::{Duration, UNIX_EPOCH}; use std::{ collections::HashMap, - sync::{atomic::Ordering, Arc}, - time::SystemTime, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, + pin::Pin, }; -use tokio::sync::RwLock; -use tracing::{debug, info}; +use tokio::sync::{Mutex, RwLock}; +use tracing::debug; -use super::data_scanner::{CurrentScannerCycle, UpdateCurrentPathFn}; +use super::data_scanner::CurrentScannerCycle; lazy_static! { - pub static ref globalScannerMetrics: Arc> = Arc::new(RwLock::new(ScannerMetrics::new())); + pub static ref globalScannerMetrics: Arc = Arc::new(ScannerMetrics::new()); } -#[derive(Clone, Debug, PartialEq, PartialOrd)] +/// Scanner metric types, matching the Go version exactly +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] pub enum ScannerMetric { - // START Realtime metrics, that only to records + // START Realtime metrics, that only records // last minute latencies and total operation count. ReadMetadata = 0, CheckMissing, @@ -58,202 +58,404 @@ pub enum ScannerMetric { Last, } -static INIT: Once = Once::new(); +impl ScannerMetric { + /// Convert to string representation for metrics + pub fn as_str(self) -> &'static str { + match self { + Self::ReadMetadata => "read_metadata", + Self::CheckMissing => "check_missing", + Self::SaveUsage => "save_usage", + Self::ApplyAll => "apply_all", + Self::ApplyVersion => "apply_version", + Self::TierObjSweep => "tier_obj_sweep", + Self::HealCheck => "heal_check", + Self::Ilm => "ilm", + Self::CheckReplication => "check_replication", + Self::Yield => "yield", + Self::CleanAbandoned => "clean_abandoned", + Self::ApplyNonCurrent => "apply_non_current", + Self::HealAbandonedVersion => "heal_abandoned_version", + Self::StartTrace => "start_trace", + Self::ScanObject => "scan_object", + Self::HealAbandonedObject => "heal_abandoned_object", + Self::LastRealtime => "last_realtime", + Self::ScanFolder => "scan_folder", + Self::ScanCycle => "scan_cycle", + Self::ScanBucketDrive => "scan_bucket_drive", + Self::CompactFolder => "compact_folder", + Self::Last => "last", + } + } + + /// Convert from index back to enum (safe version) + pub fn from_index(index: usize) -> Option { + if index >= Self::Last as usize { + return None; + } + // Safe conversion using match instead of unsafe transmute + match index { + 0 => Some(Self::ReadMetadata), + 1 => Some(Self::CheckMissing), + 2 => Some(Self::SaveUsage), + 3 => Some(Self::ApplyAll), + 4 => Some(Self::ApplyVersion), + 5 => Some(Self::TierObjSweep), + 6 => Some(Self::HealCheck), + 7 => Some(Self::Ilm), + 8 => Some(Self::CheckReplication), + 9 => Some(Self::Yield), + 10 => Some(Self::CleanAbandoned), + 11 => Some(Self::ApplyNonCurrent), + 12 => Some(Self::HealAbandonedVersion), + 13 => Some(Self::StartTrace), + 14 => Some(Self::ScanObject), + 15 => Some(Self::HealAbandonedObject), + 16 => Some(Self::LastRealtime), + 17 => Some(Self::ScanFolder), + 18 => Some(Self::ScanCycle), + 19 => Some(Self::ScanBucketDrive), + 20 => Some(Self::CompactFolder), + 21 => Some(Self::Last), + _ => None, + } + } +} +/// Thread-safe wrapper for LastMinuteLatency with atomic operations #[derive(Default)] pub struct LockedLastMinuteLatency { - cached_sec: AtomicU64, - cached: AccElem, - mu: RwLock, - latency: LastMinuteLatency, + latency: Arc>, } impl Clone for LockedLastMinuteLatency { fn clone(&self) -> Self { Self { - cached_sec: AtomicU64::new(0), - cached: self.cached.clone(), - mu: RwLock::new(true), - latency: self.latency.clone(), + latency: Arc::clone(&self.latency), } } } impl LockedLastMinuteLatency { - pub async fn add(&mut self, value: &Duration) { - self.add_size(value, 0).await; - } - - pub async fn add_size(&mut self, value: &Duration, sz: u64) { - let t = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs(); - INIT.call_once(|| { - self.cached = AccElem::default(); - self.cached_sec.store(t, Ordering::SeqCst); - }); - let last_t = self.cached_sec.load(Ordering::SeqCst); - if last_t != t - && self - .cached_sec - .compare_exchange(last_t, t, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { - let old = self.cached.clone(); - self.cached = AccElem::default(); - let a = AccElem { - size: old.size, - total: old.total, - n: old.n, - }; - let _ = self.mu.write().await; - self.latency.add_all(t - 1, &a); + pub fn new() -> Self { + Self { + latency: Arc::new(Mutex::new(LastMinuteLatency::default())), } - self.cached.n += 1; - self.cached.total += value.as_secs(); - self.cached.size += sz; } - pub async fn total(&mut self) -> AccElem { - let _ = self.mu.read().await; - self.latency.get_total() + /// Add a duration measurement + pub async fn add(&self, duration: Duration) { + self.add_size(duration, 0).await; + } + + /// Add a duration measurement with size + pub async fn add_size(&self, duration: Duration, size: u64) { + let mut latency = self.latency.lock().await; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let elem = AccElem { + n: 1, + total: duration.as_secs(), + size, + }; + latency.add_all(now, &elem); + } + + /// Get total accumulated metrics for the last minute + pub async fn total(&self) -> AccElem { + let mut latency = self.latency.lock().await; + latency.get_total() } } -pub type LogFn = Arc) -> Pin + Send>> + Send + Sync + 'static>; -pub type TimeSizeFn = Arc Pin + Send>> + Send + Sync + 'static>; -pub type TimeFn = Arc Pin + Send>> + Send + Sync + 'static>; +/// Current path tracker for monitoring active scan paths +struct CurrentPathTracker { + current_path: Arc>, +} +impl CurrentPathTracker { + fn new(initial_path: String) -> Self { + Self { + current_path: Arc::new(RwLock::new(initial_path)), + } + } + + async fn update_path(&self, path: String) { + *self.current_path.write().await = path; + } + + async fn get_path(&self) -> String { + self.current_path.read().await.clone() + } +} + +/// Main scanner metrics structure pub struct ScannerMetrics { + // All fields must be accessed atomically and aligned. operations: Vec, latency: Vec, - cycle_info: RwLock>, - current_paths: HashMap, + + // Current paths contains disk -> tracker mappings + current_paths: Arc>>>, + + // Cycle information + cycle_info: Arc>>, +} + +impl ScannerMetrics { + pub fn new() -> Self { + let operations = (0..ScannerMetric::Last as usize) + .map(|_| AtomicU64::new(0)) + .collect(); + + let latency = (0..ScannerMetric::LastRealtime as usize) + .map(|_| LockedLastMinuteLatency::new()) + .collect(); + + Self { + operations, + latency, + current_paths: Arc::new(RwLock::new(HashMap::new())), + cycle_info: Arc::new(RwLock::new(None)), + } + } + + /// Log scanner action with custom metadata - compatible with existing usage + pub fn log(metric: ScannerMetric) -> impl Fn(&HashMap) { + let start_time = SystemTime::now(); + move |_custom: &HashMap| { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task for this) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add(duration).await; + }); + } + + // Log trace metrics + if metric as u8 > ScannerMetric::StartTrace as u8 { + debug!( + metric = metric.as_str(), + duration_ms = duration.as_millis(), + "Scanner trace metric" + ); + } + } + } + + /// Time scanner action with size - returns function that takes size + pub fn time_size(metric: ScannerMetric) -> impl Fn(u64) { + let start_time = SystemTime::now(); + move |size: u64| { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics with size (spawn async task) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add_size(duration, size).await; + }); + } + } + } + + /// Time a scanner action - returns a closure to call when done + pub fn time(metric: ScannerMetric) -> impl Fn() { + let start_time = SystemTime::now(); + move || { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add(duration).await; + }); + } + } + } + + /// Time N scanner actions - returns function that takes count, then returns completion function + pub fn time_n(metric: ScannerMetric) -> Box Box + Send + Sync> { + let start_time = SystemTime::now(); + Box::new(move |count: usize| { + Box::new(move || { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(count as u64, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add(duration).await; + }); + } + }) + }) + } + + /// Increment time with specific duration + pub async fn inc_time(metric: ScannerMetric, duration: Duration) { + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics + if (metric as usize) < ScannerMetric::LastRealtime as usize { + globalScannerMetrics.latency[metric as usize].add(duration).await; + } + } + + /// Get lifetime operation count for a metric + pub fn lifetime(&self, metric: ScannerMetric) -> u64 { + if (metric as usize) >= ScannerMetric::Last as usize { + return 0; + } + self.operations[metric as usize].load(Ordering::Relaxed) + } + + /// Get last minute statistics for a metric + pub async fn last_minute(&self, metric: ScannerMetric) -> AccElem { + if (metric as usize) >= ScannerMetric::LastRealtime as usize { + return AccElem::default(); + } + self.latency[metric as usize].total().await + } + + /// Set current cycle information + pub async fn set_cycle(&self, cycle: Option) { + *self.cycle_info.write().await = cycle; + } + + /// Get current cycle information + pub async fn get_cycle(&self) -> Option { + self.cycle_info.read().await.clone() + } + + /// Get current active paths + pub async fn get_current_paths(&self) -> Vec { + let mut result = Vec::new(); + let paths = self.current_paths.read().await; + + for (disk, tracker) in paths.iter() { + let path = tracker.get_path().await; + result.push(format!("{}/{}", disk, path)); + } + + result + } + + /// Get number of active drives + pub async fn active_drives(&self) -> usize { + self.current_paths.read().await.len() + } + + /// Generate metrics report + pub async fn report(&self) -> M_ScannerMetrics { + let mut metrics = M_ScannerMetrics::default(); + + // Set cycle information + if let Some(cycle) = self.get_cycle().await { + metrics.current_cycle = cycle.current; + metrics.cycles_completed_at = cycle.cycle_completed; + metrics.current_started = cycle.started; + } + + metrics.collected_at = Utc::now(); + metrics.active_paths = self.get_current_paths().await; + + // Lifetime operations + for i in 0..ScannerMetric::Last as usize { + let count = self.operations[i].load(Ordering::Relaxed); + if count > 0 { + if let Some(metric) = ScannerMetric::from_index(i) { + metrics.life_time_ops.insert(metric.as_str().to_string(), count); + } + } + } + + // Last minute statistics for realtime metrics + for i in 0..ScannerMetric::LastRealtime as usize { + let last_min = self.latency[i].total().await; + if last_min.n > 0 { + if let Some(_metric) = ScannerMetric::from_index(i) { + // Convert to madmin TimedAction format if needed + // This would require implementing the conversion + } + } + } + + metrics + } +} + +// Type aliases for compatibility with existing code +pub type UpdateCurrentPathFn = Arc Pin + Send>> + Send + Sync>; +pub type CloseDiskFn = Arc Pin + Send>> + Send + Sync>; + +/// Create a current path updater for tracking scan progress +pub fn current_path_updater( + disk: &str, + initial: &str +) -> (UpdateCurrentPathFn, CloseDiskFn) { + let tracker = Arc::new(CurrentPathTracker::new(initial.to_string())); + let disk_name = disk.to_string(); + + // Store the tracker in global metrics + let tracker_clone = Arc::clone(&tracker); + let disk_clone = disk_name.clone(); + tokio::spawn(async move { + globalScannerMetrics + .current_paths + .write() + .await + .insert(disk_clone, tracker_clone); + }); + + let update_fn = { + let tracker = Arc::clone(&tracker); + Arc::new(move |path: &str| -> Pin + Send>> { + let tracker = Arc::clone(&tracker); + let path = path.to_string(); + Box::pin(async move { + tracker.update_path(path).await; + }) + }) + }; + + let done_fn = { + let disk_name = disk_name.clone(); + Arc::new(move || -> Pin + Send>> { + let disk_name = disk_name.clone(); + Box::pin(async move { + globalScannerMetrics + .current_paths + .write() + .await + .remove(&disk_name); + }) + }) + }; + + (update_fn, done_fn) } impl Default for ScannerMetrics { fn default() -> Self { Self::new() } -} - -impl ScannerMetrics { - pub fn new() -> Self { - Self { - operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect(), - latency: vec![LockedLastMinuteLatency::default(); ScannerMetric::LastRealtime as usize], - cycle_info: RwLock::new(None), - current_paths: HashMap::new(), - } - } - - pub async fn set_cycle(&mut self, c: Option) { - debug!("ScannerMetrics set_cycle {c:?}"); - *self.cycle_info.write().await = c; - } - - pub fn log(s: ScannerMetric) -> LogFn { - let start = SystemTime::now(); - let s_clone = s as usize; - Arc::new(move |_custom: &HashMap| { - Box::pin(async move { - let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); - let mut sm_w = globalScannerMetrics.write().await; - sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); - if s_clone < ScannerMetric::LastRealtime as usize { - sm_w.latency[s_clone].add(&duration).await; - } - }) - }) - } - - pub async fn time_size(s: ScannerMetric) -> TimeSizeFn { - let start = SystemTime::now(); - let s_clone = s as usize; - Arc::new(move |sz: u64| { - Box::pin(async move { - let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); - let mut sm_w = globalScannerMetrics.write().await; - sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); - if s_clone < ScannerMetric::LastRealtime as usize { - sm_w.latency[s_clone].add_size(&duration, sz).await; - } - }) - }) - } - - pub fn time(s: ScannerMetric) -> TimeFn { - let start = SystemTime::now(); - let s_clone = s as usize; - Arc::new(move || { - Box::pin(async move { - let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); - let mut sm_w = globalScannerMetrics.write().await; - sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); - if s_clone < ScannerMetric::LastRealtime as usize { - sm_w.latency[s_clone].add(&duration).await; - } - }) - }) - } - - pub async fn get_cycle(&self) -> Option { - let r = self.cycle_info.read().await; - if let Some(c) = r.as_ref() { - return Some(c.clone()); - } - None - } - - pub async fn get_current_paths(&self) -> Vec { - let mut res = Vec::new(); - let prefix = format!("{}/", GLOBAL_Local_Node_Name.read().await); - self.current_paths.iter().for_each(|(k, v)| { - res.push(format!("{}/{}/{}", prefix, k, v)); - }); - res - } - - pub async fn report(&self) -> M_ScannerMetrics { - let mut m = M_ScannerMetrics::default(); - if let Some(cycle) = self.get_cycle().await { - info!("cycle: {cycle:?}"); - m.current_cycle = cycle.current; - m.cycles_completed_at = cycle.cycle_completed; - m.current_started = cycle.started; - } - m.collected_at = Utc::now(); - m.active_paths = self.get_current_paths().await; - for (i, v) in self.operations.iter().enumerate() { - m.life_time_ops.insert(i.to_string(), v.load(Ordering::SeqCst)); - } - - m - } -} - -pub type CloseDiskFn = Arc Pin + Send>> + Send + Sync + 'static>; -pub fn current_path_updater(disk: &str, _initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { - let disk_1 = disk.to_string(); - let disk_2 = disk.to_string(); - ( - Arc::new(move |path: &str| { - let disk_inner = disk_1.clone(); - let path = path.to_string(); - Box::pin(async move { - globalScannerMetrics - .write() - .await - .current_paths - .insert(disk_inner, path.to_string()); - }) - }), - Arc::new(move || { - let disk_inner = disk_2.clone(); - Box::pin(async move { - globalScannerMetrics.write().await.current_paths.remove(&disk_inner); - }) - }), - ) -} +} \ No newline at end of file diff --git a/ecstore/src/metrics_realtime.rs b/ecstore/src/metrics_realtime.rs index 5bfc189b..509bc76b 100644 --- a/ecstore/src/metrics_realtime.rs +++ b/ecstore/src/metrics_realtime.rs @@ -93,7 +93,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) if types.contains(&MetricType::SCANNER) { info!("start get scanner metrics"); - let metrics = globalScannerMetrics.read().await.report().await; + let metrics = globalScannerMetrics.report().await; real_time_metrics.aggregated.scanner = Some(metrics); } From d6de724517f705230784946d595d97eee8ee5e73 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Wed, 28 May 2025 10:57:59 +0000 Subject: [PATCH 07/12] improve scanner(1) Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/disk/local.rs | 16 ++++++++-------- ecstore/src/set_disk.rs | 20 +++++++++++++------- ecstore/src/store.rs | 2 +- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 601b4dba..ecb7f042 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -2384,16 +2384,16 @@ impl DiskAPI for LocalDisk { } let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject); let mut res = HashMap::new(); - let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata).await; + let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata); let buf = match disk.read_metadata(item.path.clone()).await { Ok(buf) => buf, Err(err) => { res.insert("err".to_string(), err.to_string()); - stop_fn(&res).await; + stop_fn(&res); return Err(Error::from_string(ERR_SKIP_FILE)); } }; - done_sz(buf.len() as u64).await; + done_sz(buf.len() as u64); res.insert("metasize".to_string(), buf.len().to_string()); item.transform_meda_dir(); let meta_cache = MetaCacheEntry { @@ -2405,7 +2405,7 @@ impl DiskAPI for LocalDisk { Ok(fivs) => fivs, Err(err) => { res.insert("err".to_string(), err.to_string()); - stop_fn(&res).await; + stop_fn(&res); return Err(Error::from_string(ERR_SKIP_FILE)); } }; @@ -2415,7 +2415,7 @@ impl DiskAPI for LocalDisk { Ok(obj_infos) => obj_infos, Err(err) => { res.insert("err".to_string(), err.to_string()); - stop_fn(&res).await; + stop_fn(&res); return Err(Error::from_string(ERR_SKIP_FILE)); } }; @@ -2431,7 +2431,7 @@ impl DiskAPI for LocalDisk { let done = ScannerMetrics::time(ScannerMetric::ApplyVersion); let sz: usize; (obj_deleted, sz) = item.apply_actions(info, &size_s).await; - done().await; + done(); if obj_deleted { break; @@ -2461,14 +2461,14 @@ impl DiskAPI for LocalDisk { let _obj_info = frer_version.to_object_info(&item.bucket, &item.object_path().to_string_lossy(), versioned); let done = ScannerMetrics::time(ScannerMetric::TierObjSweep); - done().await; + done(); } // todo: global trace if obj_deleted { return Err(Error::from_string(ERR_IGNORE_FILE_CONTRIB)); } - done().await; + done(); Ok(size_s) }) }), diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 83b00b40..0e70f992 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -2893,7 +2893,7 @@ impl SetDisks { } pub async fn ns_scanner( - &self, + self: Arc, buckets: &[BucketInfo], want_cycle: u32, updates: Sender, @@ -2911,7 +2911,7 @@ impl SetDisks { return Ok(()); } - let old_cache = DataUsageCache::load(self, DATA_USAGE_CACHE_NAME).await?; + let old_cache = DataUsageCache::load(&self, DATA_USAGE_CACHE_NAME).await?; let mut cache = DataUsageCache { info: DataUsageCacheInfo { name: DATA_USAGE_ROOT.to_string(), @@ -2934,6 +2934,7 @@ impl SetDisks { permutes.shuffle(&mut rng); permutes }; + // Add new buckets first for idx in permutes.iter() { let b = buckets[*idx].clone(); @@ -2954,6 +2955,7 @@ impl SetDisks { Duration::from_secs(30) + Duration::from_secs_f64(10.0 * rng.gen_range(0.0..1.0)) }; let mut ticker = interval(update_time); + let task = tokio::spawn(async move { let last_save = Some(SystemTime::now()); let mut need_loop = true; @@ -2983,8 +2985,8 @@ impl SetDisks { } } }); + // Restrict parallelism for disk usage scanner - // upto GOMAXPROCS if GOMAXPROCS is < len(disks) let max_procs = num_cpus::get(); if max_procs < disks.len() { disks = disks[0..max_procs].to_vec(); @@ -2997,15 +2999,16 @@ impl SetDisks { Some(disk) => disk.clone(), None => continue, }; + let self_clone = Arc::clone(&self); let bucket_rx_clone = bucket_rx.clone(); let buckets_results_tx_clone = buckets_results_tx.clone(); - futures.push(async move { + futures.push(tokio::spawn(async move { loop { match bucket_rx_clone.write().await.try_recv() { Err(_) => return, Ok(bucket_info) => { let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME); - let mut cache = match DataUsageCache::load(self, &cache_name.to_string_lossy()).await { + let mut cache = match DataUsageCache::load(&self_clone, &cache_name.to_string_lossy()).await { Ok(cache) => cache, Err(_) => continue, }; @@ -3022,6 +3025,7 @@ impl SetDisks { ..Default::default() }; } + // Collect updates. let (tx, mut rx) = mpsc::channel(1); let buckets_results_tx_inner_clone = buckets_results_tx_clone.clone(); @@ -3042,9 +3046,10 @@ impl SetDisks { } } }); + // Calc usage let before = cache.info.last_update; - let mut cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode, None).await { + let mut cache = match disk.ns_scanner(&cache, tx, heal_scan_mode, None).await { Ok(cache) => cache, Err(_) => { if cache.info.last_update > before { @@ -3078,8 +3083,9 @@ impl SetDisks { } info!("continue scanner"); } - }); + })); } + info!("ns_scanner start"); let _ = join_all(futures).await; drop(buckets_results_tx); diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index dabdae4d..4256e984 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -827,7 +827,7 @@ impl ECStore { } } }); - if let Err(err) = set + if let Err(err) = set.clone() .ns_scanner(&all_buckets_clone, want_cycle as u32, tx, heal_scan_mode) .await { From d2a0c9fd62245ac113e0735c5699f66ae8eafd7e Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Thu, 29 May 2025 03:17:47 +0000 Subject: [PATCH 08/12] decrease scanner frequency Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/heal/data_scanner.rs | 54 ++++++++------------ ecstore/src/heal/data_scanner_metric.rs | 65 ++++++++++--------------- ecstore/src/set_disk.rs | 5 +- ecstore/src/store.rs | 3 +- 4 files changed, 49 insertions(+), 78 deletions(-) diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 7e9f5f9d..d8607ed1 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -144,16 +144,16 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn } /// Initialize and start the data scanner in the background -/// +/// /// This function starts a background task that continuously runs the data scanner /// with randomized intervals between cycles to avoid resource contention. -/// +/// /// # Features /// - Graceful shutdown support via cancellation token /// - Randomized sleep intervals to prevent synchronized scanning across nodes /// - Minimum sleep duration to avoid excessive CPU usage /// - Proper error handling and logging -/// +/// /// # Architecture /// 1. Initialize with random seed for sleep intervals /// 2. Run scanner cycles in a loop @@ -161,33 +161,25 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn /// 4. Ensure minimum sleep duration to prevent CPU thrashing pub async fn init_data_scanner() { info!("Initializing data scanner background task"); - + tokio::spawn(async move { loop { // Run the data scanner run_data_scanner().await; - + // Calculate randomized sleep duration // Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration let random_factor: f64 = { let mut rng = rand::thread_rng(); - rng.gen_range(0.0..1.0) + rng.gen_range(1.0..10.0) }; let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64; let sleep_duration_secs = random_factor * base_cycle_duration; - - // Ensure minimum sleep duration of 1 second to avoid high CPU usage - let sleep_duration = if sleep_duration_secs < 1.0 { - Duration::from_secs(1) - } else { - Duration::from_secs_f64(sleep_duration_secs) - }; - info!( - duration_secs = sleep_duration.as_secs(), - "Data scanner sleeping before next cycle" - ); - + let sleep_duration = Duration::from_secs_f64(sleep_duration_secs); + + info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle"); + // Sleep with the calculated duration sleep(sleep_duration).await; } @@ -195,20 +187,20 @@ pub async fn init_data_scanner() { } /// Run a single data scanner cycle -/// +/// /// This function performs one complete scan cycle, including: /// - Loading and updating cycle information /// - Determining scan mode based on healing configuration /// - Running the namespace scanner /// - Saving cycle completion state -/// +/// /// # Error Handling /// - Gracefully handles missing object layer /// - Continues operation even if individual steps fail /// - Logs errors appropriately without terminating the scanner async fn run_data_scanner() { info!("Starting data scanner cycle"); - + // Get the object layer, return early if not available let Some(store) = new_object_layer_fn() else { error!("Object layer not initialized, skipping scanner cycle"); @@ -235,7 +227,7 @@ async fn run_data_scanner() { // Start metrics collection for this cycle let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); - + // Update cycle information cycle_info.current = cycle_info.next; cycle_info.started = Utc::now(); @@ -245,12 +237,8 @@ async fn run_data_scanner() { // Read background healing information and determine scan mode let bg_heal_info = read_background_heal_info(store.clone()).await; - let scan_mode = get_cycle_scan_mode( - cycle_info.current, - bg_heal_info.bitrot_start_cycle, - bg_heal_info.bitrot_start_time, - ) - .await; + let scan_mode = + get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; // Update healing info if scan mode changed if bg_heal_info.current_scan_mode != scan_mode { @@ -280,19 +268,15 @@ async fn run_data_scanner() { ); // Run the namespace scanner - match store - .clone() - .ns_scanner(tx, cycle_info.current as usize, scan_mode) - .await - { + match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { Ok(_) => { info!(cycle = cycle_info.current, "Namespace scanner completed successfully"); - + // Update cycle completion information cycle_info.next += 1; cycle_info.current = 0; cycle_info.cycle_completed.push(Utc::now()); - + // Maintain cycle completion history (keep only recent cycles) if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { let _ = cycle_info.cycle_completed.remove(0); diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index 44fe8550..d5bf1688 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -4,12 +4,12 @@ use lazy_static::lazy_static; use madmin::metrics::ScannerMetrics as M_ScannerMetrics; use std::{ collections::HashMap, + pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, time::{Duration, SystemTime}, - pin::Pin, }; use tokio::sync::{Mutex, RwLock}; use tracing::debug; @@ -86,7 +86,7 @@ impl ScannerMetric { Self::Last => "last", } } - + /// Convert from index back to enum (safe version) pub fn from_index(index: usize) -> Option { if index >= Self::Last as usize { @@ -154,7 +154,7 @@ impl LockedLastMinuteLatency { .duration_since(SystemTime::UNIX_EPOCH) .unwrap_or_default() .as_secs(); - + let elem = AccElem { n: 1, total: duration.as_secs(), @@ -206,10 +206,8 @@ pub struct ScannerMetrics { impl ScannerMetrics { pub fn new() -> Self { - let operations = (0..ScannerMetric::Last as usize) - .map(|_| AtomicU64::new(0)) - .collect(); - + let operations = (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect(); + let latency = (0..ScannerMetric::LastRealtime as usize) .map(|_| LockedLastMinuteLatency::new()) .collect(); @@ -227,10 +225,10 @@ impl ScannerMetrics { let start_time = SystemTime::now(); move |_custom: &HashMap| { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); - + // Update operation count globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); - + // Update latency for realtime metrics (spawn async task for this) if (metric as usize) < ScannerMetric::LastRealtime as usize { let metric_index = metric as usize; @@ -241,11 +239,7 @@ impl ScannerMetrics { // Log trace metrics if metric as u8 > ScannerMetric::StartTrace as u8 { - debug!( - metric = metric.as_str(), - duration_ms = duration.as_millis(), - "Scanner trace metric" - ); + debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric"); } } } @@ -255,10 +249,10 @@ impl ScannerMetrics { let start_time = SystemTime::now(); move |size: u64| { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); - + // Update operation count globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); - + // Update latency for realtime metrics with size (spawn async task) if (metric as usize) < ScannerMetric::LastRealtime as usize { let metric_index = metric as usize; @@ -274,10 +268,10 @@ impl ScannerMetrics { let start_time = SystemTime::now(); move || { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); - + // Update operation count globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); - + // Update latency for realtime metrics (spawn async task) if (metric as usize) < ScannerMetric::LastRealtime as usize { let metric_index = metric as usize; @@ -294,10 +288,10 @@ impl ScannerMetrics { Box::new(move |count: usize| { Box::new(move || { let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); - + // Update operation count globalScannerMetrics.operations[metric as usize].fetch_add(count as u64, Ordering::Relaxed); - + // Update latency for realtime metrics (spawn async task) if (metric as usize) < ScannerMetric::LastRealtime as usize { let metric_index = metric as usize; @@ -313,7 +307,7 @@ impl ScannerMetrics { pub async fn inc_time(metric: ScannerMetric, duration: Duration) { // Update operation count globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); - + // Update latency for realtime metrics if (metric as usize) < ScannerMetric::LastRealtime as usize { globalScannerMetrics.latency[metric as usize].add(duration).await; @@ -350,12 +344,12 @@ impl ScannerMetrics { pub async fn get_current_paths(&self) -> Vec { let mut result = Vec::new(); let paths = self.current_paths.read().await; - + for (disk, tracker) in paths.iter() { let path = tracker.get_path().await; result.push(format!("{}/{}", disk, path)); } - + result } @@ -367,17 +361,17 @@ impl ScannerMetrics { /// Generate metrics report pub async fn report(&self) -> M_ScannerMetrics { let mut metrics = M_ScannerMetrics::default(); - + // Set cycle information if let Some(cycle) = self.get_cycle().await { metrics.current_cycle = cycle.current; metrics.cycles_completed_at = cycle.cycle_completed; metrics.current_started = cycle.started; } - + metrics.collected_at = Utc::now(); metrics.active_paths = self.get_current_paths().await; - + // Lifetime operations for i in 0..ScannerMetric::Last as usize { let count = self.operations[i].load(Ordering::Relaxed); @@ -387,7 +381,7 @@ impl ScannerMetrics { } } } - + // Last minute statistics for realtime metrics for i in 0..ScannerMetric::LastRealtime as usize { let last_min = self.latency[i].total().await; @@ -398,7 +392,7 @@ impl ScannerMetrics { } } } - + metrics } } @@ -408,13 +402,10 @@ pub type UpdateCurrentPathFn = Arc Pin Pin + Send>> + Send + Sync>; /// Create a current path updater for tracking scan progress -pub fn current_path_updater( - disk: &str, - initial: &str -) -> (UpdateCurrentPathFn, CloseDiskFn) { +pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { let tracker = Arc::new(CurrentPathTracker::new(initial.to_string())); let disk_name = disk.to_string(); - + // Store the tracker in global metrics let tracker_clone = Arc::clone(&tracker); let disk_clone = disk_name.clone(); @@ -442,11 +433,7 @@ pub fn current_path_updater( Arc::new(move || -> Pin + Send>> { let disk_name = disk_name.clone(); Box::pin(async move { - globalScannerMetrics - .current_paths - .write() - .await - .remove(&disk_name); + globalScannerMetrics.current_paths.write().await.remove(&disk_name); }) }) }; @@ -458,4 +445,4 @@ impl Default for ScannerMetrics { fn default() -> Self { Self::new() } -} \ No newline at end of file +} diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 0e70f992..1f3301d4 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -3002,7 +3002,7 @@ impl SetDisks { let self_clone = Arc::clone(&self); let bucket_rx_clone = bucket_rx.clone(); let buckets_results_tx_clone = buckets_results_tx.clone(); - futures.push(tokio::spawn(async move { + futures.push(async move { loop { match bucket_rx_clone.write().await.try_recv() { Err(_) => return, @@ -3083,12 +3083,11 @@ impl SetDisks { } info!("continue scanner"); } - })); + }); } info!("ns_scanner start"); let _ = join_all(futures).await; - drop(buckets_results_tx); let _ = task.await; info!("ns_scanner completed"); Ok(()) diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 4256e984..608f06be 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -827,7 +827,8 @@ impl ECStore { } } }); - if let Err(err) = set.clone() + if let Err(err) = set + .clone() .ns_scanner(&all_buckets_clone, want_cycle as u32, tx, heal_scan_mode) .await { From 756e70cc937d0d0e33c34085454d945022a3518c Mon Sep 17 00:00:00 2001 From: loverustfs <155562731+loverustfs@users.noreply.github.com> Date: Thu, 29 May 2025 14:50:02 +0800 Subject: [PATCH 09/12] add linux arm64 package --- .github/workflows/build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bf729d21..07c80c8e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,7 +21,7 @@ jobs: - { profile: release, target: x86_64-unknown-linux-gnu, glibc: "default" } - { profile: release, target: aarch64-apple-darwin, glibc: "default" } #- { profile: release, target: aarch64-unknown-linux-gnu, glibc: "default" } - #- { profile: release, target: aarch64-unknown-linux-musl, glibc: "default" } + - { profile: release, target: aarch64-unknown-linux-musl, glibc: "default" } #- { profile: release, target: x86_64-pc-windows-msvc, glibc: "default" } exclude: # Linux targets on non-Linux systems @@ -413,4 +413,4 @@ jobs: with: name: rustfs-packages pattern: "rustfs-*" - delete-merged: true \ No newline at end of file + delete-merged: true From 88b2d893dcb64aa21eebca9673f8390ef822d0c2 Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 21:15:45 +0800 Subject: [PATCH 10/12] fix --- crates/obs/src/telemetry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index cb982240..fa8cd276 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -401,7 +401,7 @@ fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: write!( w, - "{} {} [{}] [{}:{}] [{} {}] {}\n", + "{} {} [{}] [{}:{}] [{}:{}] {}\n", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), level_style.paint(level.to_string()), Color::Magenta.paint(record.target()), @@ -424,7 +424,7 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R write!( w, - "{} {} [{}] [{}:{}] [{} {}] {}\n", + "{} {} [{}] [{}:{}] [{}:{}] {}\n", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), level.to_string(), record.target(), From 03af4369bdabc60211ea021a97105eeb3fc7ec5b Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 21:30:26 +0800 Subject: [PATCH 11/12] improve comment `run.sh` --- scripts/run.sh | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/scripts/run.sh b/scripts/run.sh index 0df0cdfb..aa8258b5 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -40,17 +40,15 @@ export RUSTFS_CONSOLE_ADDRESS=":9001" # export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 -#export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 -#export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 -#export RUSTFS_OBSERVABILITY_USE_STDOUT=false -#export RUSTFS_OBSERVABILITY_SAMPLE_RATIO=2.0 -#export RUSTFS_OBSERVABILITY_METER_INTERVAL=31 -#export RUSTFS_OBSERVABILITY_SERVICE_NAME=rustfs -#export RUSTFS_OBSERVABILITY_SERVICE_VERSION=0.1.0 -#export RUSTFS_OBSERVABILITY_ENVIRONMENT=develop -#export RUSTFS_OBSERVABILITY_LOGGER_LEVEL=debug -export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true -#export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=false # Whether to enable local logging +#export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector 的地址 +#export RUSTFS_OBSERVABILITY_USE_STDOUT=false # 是否使用标准输出 +#export RUSTFS_OBSERVABILITY_SAMPLE_RATIO=2.0 # 采样率,0.0-1.0之间,0.0表示不采样,1.0表示全部采样 +#export RUSTFS_OBSERVABILITY_METER_INTERVAL=1 # 采样间隔,单位为秒 +#export RUSTFS_OBSERVABILITY_SERVICE_NAME=rustfs # 服务名称 +#export RUSTFS_OBSERVABILITY_SERVICE_VERSION=0.1.0 # 服务版本 +#export RUSTFS_OBSERVABILITY_ENVIRONMENT=develop # 环境名称 +#export RUSTFS_OBSERVABILITY_LOGGER_LEVEL=debug # 日志级别,支持 trace, debug, info, warn, error +export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true # 是否启用本地日志记录 export RUSTFS_OBSERVABILITY_LOG_DIRECTORY="./deploy/logs" # Log directory export RUSTFS_OBSERVABILITY_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day" export RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB @@ -61,10 +59,10 @@ export RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB #export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000 #export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100 # -#export RUSTFS_SINKS_KAKFA_BROKERS=localhost:9092 -#export RUSTFS_SINKS_KAKFA_TOPIC=logs -#export RUSTFS_SINKS_KAKFA_BATCH_SIZE=100 -#export RUSTFS_SINKS_KAKFA_BATCH_TIMEOUT_MS=1000 +#export RUSTFS_SINKS_KAFKA_BROKERS=localhost:9092 +#export RUSTFS_SINKS_KAFKA_TOPIC=logs +#export RUSTFS_SINKS_KAFKA_BATCH_SIZE=100 +#export RUSTFS_SINKS_KAFKA_BATCH_TIMEOUT_MS=1000 # #export RUSTFS_SINKS_WEBHOOK_ENDPOINT=http://localhost:8080/webhook #export RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN=you-auth-token From 1d45123968d75dc514a01c0a8bae480384de667b Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 29 May 2025 22:30:33 +0800 Subject: [PATCH 12/12] fix --- crates/obs/src/telemetry.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index fa8cd276..ad021b74 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -399,9 +399,9 @@ fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: let thread_name = binding.name().unwrap_or("unnamed"); let thread_id = format!("{:?}", std::thread::current().id()); - write!( + writeln!( w, - "{} {} [{}] [{}:{}] [{}:{}] {}\n", + "{} {} [{}] [{}:{}] [{}:{}] {}", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), level_style.paint(level.to_string()), Color::Magenta.paint(record.target()), @@ -422,11 +422,11 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R let thread_name = binding.name().unwrap_or("unnamed"); let thread_id = format!("{:?}", std::thread::current().id()); - write!( + writeln!( w, - "{} {} [{}] [{}:{}] [{}:{}] {}\n", + "{} {} [{}] [{}:{}] [{}:{}] {}", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), - level.to_string(), + level, record.target(), record.file().unwrap_or("unknown"), record.line().unwrap_or(0),