Compare commits

..

2 Commits

Author SHA1 Message Date
houseme
705cc0c9f6 Merge branch 'main' of github.com:rustfs/rustfs into feature/metric-1205 2025-12-21 17:56:06 +08:00
houseme
6273b138f6 upgrade mio version to 1.1.1 2025-12-05 14:55:17 +08:00
301 changed files with 8773 additions and 21311 deletions

View File

@@ -1,64 +0,0 @@
## —— Development/Source builds using direct buildx commands ---------------------------------------
.PHONY: docker-dev
docker-dev: ## Build dev multi-arch image (cannot load locally)
@echo "🏗️ Building multi-architecture development Docker images with buildx..."
@echo "💡 This builds from source code and is intended for local development and testing"
@echo "⚠️ Multi-arch images cannot be loaded locally, use docker-dev-push to push to registry"
$(DOCKER_CLI) buildx build \
--platform linux/amd64,linux/arm64 \
--file $(DOCKERFILE_SOURCE) \
--tag rustfs:source-latest \
--tag rustfs:dev-latest \
.
.PHONY: docker-dev-local
docker-dev-local: ## Build dev single-arch image (local load)
@echo "🏗️ Building single-architecture development Docker image for local use..."
@echo "💡 This builds from source code for the current platform and loads locally"
$(DOCKER_CLI) buildx build \
--file $(DOCKERFILE_SOURCE) \
--tag rustfs:source-latest \
--tag rustfs:dev-latest \
--load \
.
.PHONY: docker-dev-push
docker-dev-push: ## Build and push multi-arch development image # e.g (make docker-dev-push REGISTRY=xxx)
@if [ -z "$(REGISTRY)" ]; then \
echo "❌ Error: Please specify registry, example: make docker-dev-push REGISTRY=ghcr.io/username"; \
exit 1; \
fi
@echo "🚀 Building and pushing multi-architecture development Docker images..."
@echo "💡 Pushing to registry: $(REGISTRY)"
$(DOCKER_CLI) buildx build \
--platform linux/amd64,linux/arm64 \
--file $(DOCKERFILE_SOURCE) \
--tag $(REGISTRY)/rustfs:source-latest \
--tag $(REGISTRY)/rustfs:dev-latest \
--push \
.
.PHONY: dev-env-start
dev-env-start: ## Start development container environment
@echo "🚀 Starting development environment..."
$(DOCKER_CLI) buildx build \
--file $(DOCKERFILE_SOURCE) \
--tag rustfs:dev \
--load \
.
$(DOCKER_CLI) stop $(CONTAINER_NAME) 2>/dev/null || true
$(DOCKER_CLI) rm $(CONTAINER_NAME) 2>/dev/null || true
$(DOCKER_CLI) run -d --name $(CONTAINER_NAME) \
-p 9010:9010 -p 9000:9000 \
-v $(shell pwd):/workspace \
-it rustfs:dev
.PHONY: dev-env-stop
dev-env-stop: ## Stop development container environment
@echo "🛑 Stopping development environment..."
$(DOCKER_CLI) stop $(CONTAINER_NAME) 2>/dev/null || true
$(DOCKER_CLI) rm $(CONTAINER_NAME) 2>/dev/null || true
.PHONY: dev-env-restart
dev-env-restart: dev-env-stop dev-env-start ## Restart development container environment

View File

@@ -1,41 +0,0 @@
## —— Production builds using docker buildx (for CI/CD and production) -----------------------------
.PHONY: docker-buildx
docker-buildx: ## Build production multi-arch image (no push)
@echo "🏗️ Building multi-architecture production Docker images with buildx..."
./docker-buildx.sh
.PHONY: docker-buildx-push
docker-buildx-push: ## Build and push production multi-arch image
@echo "🚀 Building and pushing multi-architecture production Docker images with buildx..."
./docker-buildx.sh --push
.PHONY: docker-buildx-version
docker-buildx-version: ## Build and version production multi-arch image # e.g (make docker-buildx-version VERSION=v1.0.0)
@if [ -z "$(VERSION)" ]; then \
echo "❌ Error: Please specify version, example: make docker-buildx-version VERSION=v1.0.0"; \
exit 1; \
fi
@echo "🏗️ Building multi-architecture production Docker images (version: $(VERSION))..."
./docker-buildx.sh --release $(VERSION)
.PHONY: docker-buildx-push-version
docker-buildx-push-version: ## Build and version and push production multi-arch image # e.g (make docker-buildx-push-version VERSION=v1.0.0)
@if [ -z "$(VERSION)" ]; then \
echo "❌ Error: Please specify version, example: make docker-buildx-push-version VERSION=v1.0.0"; \
exit 1; \
fi
@echo "🚀 Building and pushing multi-architecture production Docker images (version: $(VERSION))..."
./docker-buildx.sh --release $(VERSION) --push
.PHONY: docker-buildx-production-local
docker-buildx-production-local: ## Build production single-arch image locally
@echo "🏗️ Building single-architecture production Docker image locally..."
@echo "💡 Alternative to docker-buildx.sh for local testing"
$(DOCKER_CLI) buildx build \
--file $(DOCKERFILE_PRODUCTION) \
--tag rustfs:production-latest \
--tag rustfs:latest \
--load \
--build-arg RELEASE=latest \
.

View File

@@ -1,16 +0,0 @@
## —— Single Architecture Docker Builds (Traditional) ----------------------------------------------
.PHONY: docker-build-production
docker-build-production: ## Build single-arch production image
@echo "🏗️ Building single-architecture production Docker image..."
@echo "💡 Consider using 'make docker-buildx-production-local' for multi-arch support"
$(DOCKER_CLI) build -f $(DOCKERFILE_PRODUCTION) -t rustfs:latest .
.PHONY: docker-build-source
docker-build-source: ## Build single-arch source image
@echo "🏗️ Building single-architecture source Docker image..."
@echo "💡 Consider using 'make docker-dev-local' for multi-arch support"
DOCKER_BUILDKIT=1 $(DOCKER_CLI) build \
--build-arg BUILDKIT_INLINE_CACHE=1 \
-f $(DOCKERFILE_SOURCE) -t rustfs:source .

View File

@@ -1,22 +0,0 @@
## —— Docker-based build (alternative approach) ----------------------------------------------------
# Usage: make BUILD_OS=ubuntu22.04 build-docker
# Output: target/ubuntu22.04/release/rustfs
.PHONY: build-docker
build-docker: SOURCE_BUILD_IMAGE_NAME = rustfs-$(BUILD_OS):v1
build-docker: SOURCE_BUILD_CONTAINER_NAME = rustfs-$(BUILD_OS)-build
build-docker: BUILD_CMD = /root/.cargo/bin/cargo build --release --bin rustfs --target-dir /root/s3-rustfs/target/$(BUILD_OS)
build-docker: ## Build using Docker container # e.g (make build-docker BUILD_OS=ubuntu22.04)
@echo "🐳 Building RustFS using Docker ($(BUILD_OS))..."
$(DOCKER_CLI) buildx build -t $(SOURCE_BUILD_IMAGE_NAME) -f $(DOCKERFILE_SOURCE) .
$(DOCKER_CLI) run --rm --name $(SOURCE_BUILD_CONTAINER_NAME) -v $(shell pwd):/root/s3-rustfs -it $(SOURCE_BUILD_IMAGE_NAME) $(BUILD_CMD)
.PHONY: docker-inspect-multiarch
docker-inspect-multiarch: ## Check image architecture support
@if [ -z "$(IMAGE)" ]; then \
echo "❌ Error: Please specify image, example: make docker-inspect-multiarch IMAGE=rustfs/rustfs:latest"; \
exit 1; \
fi
@echo "🔍 Inspecting multi-architecture image: $(IMAGE)"
docker buildx imagetools inspect $(IMAGE)

View File

@@ -1,55 +0,0 @@
## —— Local Native Build using build-rustfs.sh script (Recommended) --------------------------------
.PHONY: build
build: ## Build RustFS binary (includes console by default)
@echo "🔨 Building RustFS using build-rustfs.sh script..."
./build-rustfs.sh
.PHONY: build-dev
build-dev: ## Build RustFS in Development mode
@echo "🔨 Building RustFS in development mode..."
./build-rustfs.sh --dev
.PHONY: build-musl
build-musl: ## Build x86_64 musl version
@echo "🔨 Building rustfs for x86_64-unknown-linux-musl..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform x86_64-unknown-linux-musl
.PHONY: build-gnu
build-gnu: ## Build x86_64 GNU version
@echo "🔨 Building rustfs for x86_64-unknown-linux-gnu..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform x86_64-unknown-linux-gnu
.PHONY: build-musl-arm64
build-musl-arm64: ## Build aarch64 musl version
@echo "🔨 Building rustfs for aarch64-unknown-linux-musl..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform aarch64-unknown-linux-musl
.PHONY: build-gnu-arm64
build-gnu-arm64: ## Build aarch64 GNU version
@echo "🔨 Building rustfs for aarch64-unknown-linux-gnu..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform aarch64-unknown-linux-gnu
.PHONY: build-cross-all
build-cross-all: core-deps ## Build binaries for all architectures
@echo "🔧 Building all target architectures..."
@echo "💡 On macOS/Windows, use 'make docker-dev' for reliable multi-arch builds"
@echo "🔨 Generating protobuf code..."
cargo run --bin gproto || true
@echo "🔨 Building rustfs for x86_64-unknown-linux-musl..."
./build-rustfs.sh --platform x86_64-unknown-linux-musl
@echo "🔨 Building rustfs for x86_64-unknown-linux-gnu..."
./build-rustfs.sh --platform x86_64-unknown-linux-gnu
@echo "🔨 Building rustfs for aarch64-unknown-linux-musl..."
./build-rustfs.sh --platform aarch64-unknown-linux-musl
@echo "🔨 Building rustfs for aarch64-unknown-linux-gnu..."
./build-rustfs.sh --platform aarch64-unknown-linux-gnu

View File

@@ -1,24 +0,0 @@
## —— Check and Inform Dependencies ----------------------------------------------------------------
# Fatal check
# Checks all required dependencies and exits with error if not found
# (e.g., cargo, rustfmt)
check-%:
@command -v $* >/dev/null 2>&1 || { \
echo >&2 "❌ '$*' is not installed."; \
exit 1; \
}
# Warning-only check
# Checks for optional dependencies and issues a warning if not found
# (e.g., cargo-nextest for enhanced testing)
warn-%:
@command -v $* >/dev/null 2>&1 || { \
echo >&2 "⚠️ '$*' is not installed."; \
}
# For checking dependencies use check-<dep-name> or warn-<dep-name>
.PHONY: core-deps fmt-deps test-deps
core-deps: check-cargo ## Check core dependencies
fmt-deps: check-rustfmt ## Check lint and formatting dependencies
test-deps: warn-cargo-nextest ## Check tests dependencies

View File

@@ -1,6 +0,0 @@
## —— Deploy using dev_deploy.sh script ------------------------------------------------------------
.PHONY: deploy-dev
deploy-dev: build-musl ## Deploy to dev server
@echo "🚀 Deploying to dev server: $${IP}"
./scripts/dev_deploy.sh $${IP}

View File

@@ -1,38 +0,0 @@
## —— Help, Help Build and Help Docker -------------------------------------------------------------
.PHONY: help
help: ## Shows This Help Menu
echo -e "$$HEADER"
grep -E '(^[a-zA-Z0-9_-]+:.*?## .*$$)|(^## )' $(MAKEFILE_LIST) | sed 's/^[^:]*://g' | awk 'BEGIN {FS = ":.*?## | #"} ; {printf "${cyan}%-30s${reset} ${white}%s${reset} ${green}%s${reset}\n", $$1, $$2, $$3}' | sed -e 's/\[36m##/\n[32m##/'
.PHONY: help-build
help-build: ## Shows RustFS build help
@echo ""
@echo "💡 build-rustfs.sh script provides more options, smart detection and binary verification"
@echo ""
@echo "🔧 Direct usage of build-rustfs.sh script:"
@echo ""
@echo " ./build-rustfs.sh --help # View script help"
@echo " ./build-rustfs.sh --no-console # Build without console resources"
@echo " ./build-rustfs.sh --force-console-update # Force update console resources"
@echo " ./build-rustfs.sh --dev # Development mode build"
@echo " ./build-rustfs.sh --sign # Sign binary files"
@echo " ./build-rustfs.sh --platform x86_64-unknown-linux-gnu # Specify target platform"
@echo " ./build-rustfs.sh --skip-verification # Skip binary verification"
@echo ""
.PHONY: help-docker
help-docker: ## Shows docker environment and suggestion help
@echo ""
@echo "📋 Environment Variables:"
@echo " REGISTRY Image registry address (required for push)"
@echo " DOCKERHUB_USERNAME Docker Hub username"
@echo " DOCKERHUB_TOKEN Docker Hub access token"
@echo " GITHUB_TOKEN GitHub access token"
@echo ""
@echo "💡 Suggestions:"
@echo " Production use: Use docker-buildx* commands (based on precompiled binaries)"
@echo " Local development: Use docker-dev* commands (build from source)"
@echo " Development environment: Use dev-env-* commands to manage dev containers"
@echo ""

View File

@@ -1,22 +0,0 @@
## —— Code quality and Formatting ------------------------------------------------------------------
.PHONY: fmt
fmt: core-deps fmt-deps ## Format code
@echo "🔧 Formatting code..."
cargo fmt --all
.PHONY: fmt-check
fmt-check: core-deps fmt-deps ## Check code formatting
@echo "📝 Checking code formatting..."
cargo fmt --all --check
.PHONY: clippy-check
clippy-check: core-deps ## Run clippy checks
@echo "🔍 Running clippy checks..."
cargo clippy --fix --allow-dirty
cargo clippy --all-targets --all-features -- -D warnings
.PHONY: compilation-check
compilation-check: core-deps ## Run compilation check
@echo "🔨 Running compilation check..."
cargo check --all-targets

View File

@@ -1,11 +0,0 @@
## —— Pre Commit Checks ----------------------------------------------------------------------------
.PHONY: setup-hooks
setup-hooks: ## Set up git hooks
@echo "🔧 Setting up git hooks..."
chmod +x .git/hooks/pre-commit
@echo "✅ Git hooks setup complete!"
.PHONY: pre-commit
pre-commit: fmt clippy-check compilation-check test ## Run pre-commit checks
@echo "✅ All pre-commit checks passed!"

View File

@@ -1,20 +0,0 @@
## —— Tests and e2e test ---------------------------------------------------------------------------
.PHONY: test
test: core-deps test-deps ## Run all tests
@echo "🧪 Running tests..."
@if command -v cargo-nextest >/dev/null 2>&1; then \
cargo nextest run --all --exclude e2e_test; \
else \
echo " cargo-nextest not found; falling back to 'cargo test'"; \
cargo test --workspace --exclude e2e_test -- --nocapture; \
fi
cargo test --all --doc
.PHONY: e2e-server
e2e-server: ## Run e2e-server tests
sh $(shell pwd)/scripts/run.sh
.PHONY: probe-e2e
probe-e2e: ## Probe e2e tests
sh $(shell pwd)/scripts/probe.sh

0
.docker/observability/prometheus-data/.gitignore vendored Executable file → Normal file
View File

View File

@@ -40,7 +40,7 @@ env:
jobs:
security-audit:
name: Security Audit
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 15
steps:
- name: Checkout repository
@@ -57,7 +57,7 @@ jobs:
- name: Upload audit results
if: always()
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: security-audit-results-${{ github.run_number }}
path: audit-results.json
@@ -65,7 +65,7 @@ jobs:
dependency-review:
name: Dependency Review
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
if: github.event_name == 'pull_request'
permissions:
contents: read

View File

@@ -83,7 +83,7 @@ jobs:
# Build strategy check - determine build type based on trigger
build-check:
name: Build Strategy Check
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
outputs:
should_build: ${{ steps.check.outputs.should_build }}
build_type: ${{ steps.check.outputs.build_type }}
@@ -167,19 +167,19 @@ jobs:
matrix:
include:
# Linux builds
- os: ubicloud-standard-2
- os: ubicloud-standard-4
target: x86_64-unknown-linux-musl
cross: false
platform: linux
- os: ubicloud-standard-2
- os: ubicloud-standard-4
target: aarch64-unknown-linux-musl
cross: true
platform: linux
- os: ubicloud-standard-2
- os: ubicloud-standard-4
target: x86_64-unknown-linux-gnu
cross: false
platform: linux
- os: ubicloud-standard-2
- os: ubicloud-standard-4
target: aarch64-unknown-linux-gnu
cross: true
platform: linux
@@ -442,7 +442,7 @@ jobs:
echo "📊 Version: ${VERSION}"
- name: Upload to GitHub artifacts
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: ${{ steps.package.outputs.package_name }}
path: "rustfs-*.zip"
@@ -532,7 +532,7 @@ jobs:
name: Build Summary
needs: [ build-check, build-rustfs ]
if: always() && needs.build-check.outputs.should_build == 'true'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
steps:
- name: Build completion summary
shell: bash
@@ -584,7 +584,7 @@ jobs:
name: Create GitHub Release
needs: [ build-check, build-rustfs ]
if: startsWith(github.ref, 'refs/tags/') && needs.build-check.outputs.build_type != 'development'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
permissions:
contents: write
outputs:
@@ -670,7 +670,7 @@ jobs:
name: Upload Release Assets
needs: [ build-check, build-rustfs, create-release ]
if: startsWith(github.ref, 'refs/tags/') && needs.build-check.outputs.build_type != 'development'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
permissions:
contents: write
actions: read
@@ -679,7 +679,7 @@ jobs:
uses: actions/checkout@v6
- name: Download all build artifacts
uses: actions/download-artifact@v7
uses: actions/download-artifact@v5
with:
path: ./artifacts
pattern: rustfs-*
@@ -751,7 +751,7 @@ jobs:
name: Update Latest Version
needs: [ build-check, upload-release-assets ]
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
steps:
- name: Update latest.json
env:
@@ -801,7 +801,7 @@ jobs:
name: Publish Release
needs: [ build-check, create-release, upload-release-assets ]
if: startsWith(github.ref, 'refs/tags/') && needs.build-check.outputs.build_type != 'development'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
permissions:
contents: write
steps:

View File

@@ -69,7 +69,7 @@ concurrency:
env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
CARGO_BUILD_JOBS: 2
CARGO_BUILD_JOBS: 8
jobs:
@@ -78,7 +78,7 @@ jobs:
permissions:
actions: write
contents: read
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
outputs:
should_skip: ${{ steps.skip_check.outputs.should_skip }}
steps:
@@ -93,7 +93,7 @@ jobs:
typos:
name: Typos
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
@@ -136,7 +136,7 @@ jobs:
name: End-to-End Tests
needs: skip-check
if: needs.skip-check.outputs.should_skip != 'true'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 30
steps:
- name: Checkout repository
@@ -160,13 +160,13 @@ jobs:
with:
tool: s3s-e2e
git: https://github.com/Nugine/s3s.git
rev: 9e41304ed549b89cfb03ede98e9c0d2ac7522051
rev: b7714bfaa17ddfa9b23ea01774a1e7bbdbfc2ca3
- name: Build debug binary
run: |
touch rustfs/build.rs
# Limit concurrency to prevent OOM
cargo build -p rustfs --bins --jobs 2
cargo build -p rustfs --bins --jobs 4
- name: Run end-to-end tests
run: |
@@ -175,7 +175,7 @@ jobs:
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: e2e-test-logs-${{ github.run_number }}
path: /tmp/rustfs.log

View File

@@ -72,7 +72,7 @@ jobs:
# Check if we should build Docker images
build-check:
name: Docker Build Check
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
outputs:
should_build: ${{ steps.check.outputs.should_build }}
should_push: ${{ steps.check.outputs.should_push }}
@@ -264,7 +264,7 @@ jobs:
name: Build Docker Images
needs: build-check
if: needs.build-check.outputs.should_build == 'true'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 60
steps:
- name: Checkout repository
@@ -404,7 +404,7 @@ jobs:
name: Docker Build Summary
needs: [ build-check, build-docker ]
if: always() && needs.build-check.outputs.should_build == 'true'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
steps:
- name: Docker build completion summary
run: |

260
.github/workflows/e2e-mint.yml vendored Normal file
View File

@@ -0,0 +1,260 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: e2e-mint
on:
push:
branches: [ main ]
paths:
- ".github/workflows/e2e-mint.yml"
- "Dockerfile.source"
- "rustfs/**"
- "crates/**"
workflow_dispatch:
inputs:
run-multi:
description: "Run multi-node Mint as well"
required: false
default: "false"
env:
ACCESS_KEY: rustfsadmin
SECRET_KEY: rustfsadmin
RUST_LOG: info
PLATFORM: linux/amd64
jobs:
mint-single:
runs-on: ubicloud-standard-4
timeout-minutes: 40
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Enable buildx
uses: docker/setup-buildx-action@v3
- name: Build RustFS image (source)
run: |
DOCKER_BUILDKIT=1 docker buildx build --load \
--platform ${PLATFORM} \
-t rustfs-ci \
-f Dockerfile.source .
- name: Create network
run: |
docker network inspect rustfs-net >/dev/null 2>&1 || docker network create rustfs-net
- name: Remove existing rustfs-single (if any)
run: docker rm -f rustfs-single >/dev/null 2>&1 || true
- name: Start single RustFS
run: |
docker run -d --name rustfs-single \
--network rustfs-net \
-e RUSTFS_ADDRESS=0.0.0.0:9000 \
-e RUSTFS_ACCESS_KEY=$ACCESS_KEY \
-e RUSTFS_SECRET_KEY=$SECRET_KEY \
-e RUSTFS_VOLUMES="/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3" \
-v /tmp/rustfs-single:/data \
rustfs-ci
- name: Wait for RustFS ready
run: |
for i in {1..30}; do
if docker exec rustfs-single curl -sf http://localhost:9000/health >/dev/null; then
exit 0
fi
sleep 2
done
echo "RustFS did not become ready" >&2
docker logs rustfs-single || true
exit 1
- name: Run Mint (single, S3-only)
run: |
mkdir -p artifacts/mint-single
docker run --rm --network rustfs-net \
--platform ${PLATFORM} \
-e SERVER_ENDPOINT=rustfs-single:9000 \
-e ACCESS_KEY=$ACCESS_KEY \
-e SECRET_KEY=$SECRET_KEY \
-e ENABLE_HTTPS=0 \
-e SERVER_REGION=us-east-1 \
-e RUN_ON_FAIL=1 \
-e MINT_MODE=core \
-v ${GITHUB_WORKSPACE}/artifacts/mint-single:/mint/log \
--entrypoint /mint/mint.sh \
minio/mint:edge \
awscli aws-sdk-go aws-sdk-java-v2 aws-sdk-php aws-sdk-ruby s3cmd s3select
- name: Collect RustFS logs
run: |
mkdir -p artifacts/rustfs-single
docker logs rustfs-single > artifacts/rustfs-single/rustfs.log || true
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: mint-single
path: artifacts/**
mint-multi:
if: github.event_name == 'workflow_dispatch' && github.event.inputs.run-multi == 'true'
needs: mint-single
runs-on: ubicloud-standard-4
timeout-minutes: 60
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Enable buildx
uses: docker/setup-buildx-action@v3
- name: Build RustFS image (source)
run: |
DOCKER_BUILDKIT=1 docker buildx build --load \
--platform ${PLATFORM} \
-t rustfs-ci \
-f Dockerfile.source .
- name: Prepare cluster compose
run: |
cat > compose.yml <<'EOF'
version: '3.8'
services:
rustfs1:
image: rustfs-ci
hostname: rustfs1
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs1-data:/data
rustfs2:
image: rustfs-ci
hostname: rustfs2
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs2-data:/data
rustfs3:
image: rustfs-ci
hostname: rustfs3
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs3-data:/data
rustfs4:
image: rustfs-ci
hostname: rustfs4
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs4-data:/data
lb:
image: haproxy:2.9
hostname: lb
networks: [rustfs-net]
ports:
- "9000:9000"
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
networks:
rustfs-net:
name: rustfs-net
volumes:
rustfs1-data:
rustfs2-data:
rustfs3-data:
rustfs4-data:
EOF
cat > haproxy.cfg <<'EOF'
defaults
mode http
timeout connect 5s
timeout client 30s
timeout server 30s
frontend fe_s3
bind *:9000
default_backend be_s3
backend be_s3
balance roundrobin
server s1 rustfs1:9000 check
server s2 rustfs2:9000 check
server s3 rustfs3:9000 check
server s4 rustfs4:9000 check
EOF
- name: Launch cluster
run: docker compose -f compose.yml up -d
- name: Wait for LB ready
run: |
for i in {1..60}; do
if docker run --rm --network rustfs-net curlimages/curl -sf http://lb:9000/health >/dev/null; then
exit 0
fi
sleep 2
done
echo "LB or backend not ready" >&2
docker compose -f compose.yml logs --tail=200 || true
exit 1
- name: Run Mint (multi, S3-only)
run: |
mkdir -p artifacts/mint-multi
docker run --rm --network rustfs-net \
--platform ${PLATFORM} \
-e SERVER_ENDPOINT=lb:9000 \
-e ACCESS_KEY=$ACCESS_KEY \
-e SECRET_KEY=$SECRET_KEY \
-e ENABLE_HTTPS=0 \
-e SERVER_REGION=us-east-1 \
-e RUN_ON_FAIL=1 \
-e MINT_MODE=core \
-v ${GITHUB_WORKSPACE}/artifacts/mint-multi:/mint/log \
--entrypoint /mint/mint.sh \
minio/mint:edge \
awscli aws-sdk-go aws-sdk-java-v2 aws-sdk-php aws-sdk-ruby s3cmd s3select
- name: Collect logs
run: |
mkdir -p artifacts/cluster
docker compose -f compose.yml logs --no-color > artifacts/cluster/cluster.log || true
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: mint-multi
path: artifacts/**

View File

@@ -58,7 +58,7 @@ defaults:
jobs:
s3tests-single:
if: github.event.inputs.test-mode == 'single'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 120
steps:
- uses: actions/checkout@v6
@@ -205,14 +205,14 @@ jobs:
- name: Upload artifacts
if: always() && env.ACT != 'true'
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: s3tests-single
path: artifacts/**
s3tests-multi:
if: github.event_name == 'workflow_dispatch' && github.event.inputs.test-mode == 'multi'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 150
steps:
- uses: actions/checkout@v6
@@ -416,7 +416,7 @@ jobs:
- name: Upload artifacts
if: always() && env.ACT != 'true'
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: s3tests-multi
path: artifacts/**

View File

@@ -27,7 +27,7 @@ env:
jobs:
build-helm-package:
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
# Only run on successful builds triggered by tag pushes (version format: x.y.z or x.y.z-suffix)
if: |
github.event.workflow_run.conclusion == 'success' &&
@@ -44,6 +44,7 @@ jobs:
set -x
old_version=$(grep "^appVersion:" helm/rustfs/Chart.yaml | awk '{print $2}')
sed -i "s/$old_version/$new_version/g" helm/rustfs/Chart.yaml
sed -i "/^image:/,/^[^ ]/ s/tag:.*/tag: "$new_version"/" helm/rustfs/values.yaml
- name: Set up Helm
uses: azure/setup-helm@v4.3.0
@@ -55,14 +56,14 @@ jobs:
helm package ./helm/rustfs --destination helm/rustfs/ --version "0.0.$package_version"
- name: Upload helm package as artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: helm-package
path: helm/rustfs/*.tgz
retention-days: 1
publish-helm-package:
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
needs: [ build-helm-package ]
steps:
@@ -73,7 +74,7 @@ jobs:
token: ${{ secrets.RUSTFS_HELM_PACKAGE }}
- name: Download helm package
uses: actions/download-artifact@v7
uses: actions/download-artifact@v4
with:
name: helm-package
path: ./

View File

@@ -40,7 +40,7 @@ env:
jobs:
performance-profile:
name: Performance Profiling
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 30
steps:
- name: Checkout repository
@@ -107,7 +107,7 @@ jobs:
- name: Upload profile data
if: steps.profiling.outputs.profile_generated == 'true'
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: performance-profile-${{ github.run_number }}
path: samply-profile.json
@@ -115,7 +115,7 @@ jobs:
benchmark:
name: Benchmark Tests
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 45
steps:
- name: Checkout repository
@@ -135,7 +135,7 @@ jobs:
tee benchmark-results.json
- name: Upload benchmark results
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v4
with:
name: benchmark-results-${{ github.run_number }}
path: benchmark-results.json

View File

@@ -1,20 +0,0 @@
name: "Mark stale issues"
on:
schedule:
- cron: "30 1 * * *"
jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v9
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
stale-issue-message: 'This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.'
stale-issue-label: 'stale'
## Mark if there is no activity for more than 7 days
days-before-stale: 7
# If no one responds after 3 days, the tag will be closed.
days-before-close: 3
# These tags are exempt and will not close automatically.
exempt-issue-labels: 'pinned,security'

4
.gitignore vendored
View File

@@ -25,7 +25,7 @@ profile.json
*.pb
*.svg
deploy/logs/*.log.*
artifacts/
# s3-tests local artifacts (root directory only)
/s3-tests/
/s3-tests-local/
@@ -33,4 +33,4 @@ artifacts/
/s3tests.conf.*
*.events
*.audit
*.snappy
*.snappy

View File

@@ -1,18 +1,8 @@
# Repository Guidelines
## ⚠️ Pre-Commit Checklist (MANDATORY)
**Before EVERY commit, you MUST run and pass ALL of the following:**
```bash
cargo fmt --all --check # Code formatting
cargo clippy --all-targets --all-features -- -D warnings # Lints
cargo test --workspace --exclude e2e_test # Unit tests
```
Or simply run `make pre-commit` which covers all checks. **DO NOT commit if any check fails.**
## Communication Rules
- Respond to the user in Chinese; use English in all other contexts.
- Code and documentation must be written in English only. Chinese text is allowed solely as test data/fixtures when a case explicitly requires Chinese-language content for validation.
- **Pull Request titles and descriptions must be written in English** to ensure consistency and accessibility for all contributors.
## Project Structure & Module Organization
The workspace root hosts shared dependencies in `Cargo.toml`. The service binary lives under `rustfs/src/main.rs`, while reusable crates sit in `crates/` (`crypto`, `iam`, `kms`, and `e2e_test`). Local fixtures for standalone flows reside in `test_standalone/`, deployment manifests are under `deploy/`, Docker assets sit at the root, and automation lives in `scripts/`. Skim each crates README or module docs before contributing changes.
@@ -29,13 +19,7 @@ Co-locate unit tests with their modules and give behavior-led names such as `han
When fixing bugs or adding features, include regression tests that capture the new behavior so future changes cannot silently break it.
## Commit & Pull Request Guidelines
Work on feature branches (e.g., `feat/...`) after syncing `main`. Follow Conventional Commits under 72 characters (e.g., `feat: add kms key rotation`). Each commit must compile, format cleanly, and pass `make pre-commit`.
**Pull Request Requirements:**
- PR titles and descriptions **MUST be written in English**
- Open PRs with a concise summary, note verification commands, link relevant issues
- Follow the PR template format and fill in all required sections
- Wait for reviewer approval before merging
Work on feature branches (e.g., `feat/...`) after syncing `main`. Follow Conventional Commits under 72 characters (e.g., `feat: add kms key rotation`). Each commit must compile, format cleanly, and pass `make pre-commit`. Open PRs with a concise summary, note verification commands, link relevant issues, and wait for reviewer approval.
## Security & Configuration Tips
Do not commit secrets or cloud credentials; prefer environment variables or vault tooling. Review IAM- and KMS-related changes with a second maintainer. Confirm proxy settings before running sensitive tests to avoid leaking traffic outside localhost.

3
CLA.md
View File

@@ -83,3 +83,6 @@ that body of laws known as conflict of laws. The parties expressly agree that th
for the International Sale of Goods will not apply. Any legal action or proceeding arising under this Agreement will be
brought exclusively in the courts located in Beijing, China, and the parties hereby irrevocably consent to the personal
jurisdiction and venue therein.
For your reading convenience, this Agreement is written in parallel English and Chinese sections. To the extent there is
a conflict between the English and Chinese sections, the English sections shall govern.

View File

@@ -186,39 +186,6 @@ cargo clippy --all-targets --all-features -- -D warnings
cargo clippy --fix --all-targets --all-features
```
## 📝 Pull Request Guidelines
### Language Requirements
**All Pull Request titles and descriptions MUST be written in English.**
This ensures:
- Consistency across all contributions
- Accessibility for international contributors
- Better integration with automated tools and CI/CD systems
- Clear communication in a globally understood language
#### PR Description Requirements
When creating a Pull Request, ensure:
1. **Title**: Use English and follow Conventional Commits format (e.g., `fix: improve s3-tests readiness detection`)
2. **Description**: Write in English, following the PR template format
3. **Code Comments**: Must be in English (as per coding standards)
4. **Commit Messages**: Must be in English (as per commit guidelines)
#### PR Template
Always use the PR template (`.github/pull_request_template.md`) and fill in all sections:
- Type of Change
- Related Issues
- Summary of Changes
- Checklist
- Impact
- Additional Notes
**Note**: While you may communicate with reviewers in Chinese during discussions, the PR itself (title, description, and all formal documentation) must be in English.
---
Following these guidelines ensures high code quality and smooth collaboration across the RustFS project! 🚀

2061
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -19,7 +19,6 @@ members = [
"crates/audit", # Audit target management system with multi-target fan-out
"crates/common", # Shared utilities and data structures
"crates/config", # Configuration management
"crates/credentials", # Credential management system
"crates/crypto", # Cryptography and security features
"crates/ecstore", # Erasure coding storage implementation
"crates/e2e_test", # End-to-end test suite
@@ -50,7 +49,7 @@ resolver = "2"
edition = "2024"
license = "Apache-2.0"
repository = "https://github.com/rustfs/rustfs"
rust-version = "1.90"
rust-version = "1.85"
version = "0.0.5"
homepage = "https://rustfs.com"
description = "RustFS is a high-performance distributed object storage software built using Rust, one of the most popular languages worldwide. "
@@ -72,7 +71,6 @@ rustfs-audit = { path = "crates/audit", version = "0.0.5" }
rustfs-checksums = { path = "crates/checksums", version = "0.0.5" }
rustfs-common = { path = "crates/common", version = "0.0.5" }
rustfs-config = { path = "./crates/config", version = "0.0.5" }
rustfs-credentials = { path = "crates/credentials", version = "0.0.5" }
rustfs-crypto = { path = "crates/crypto", version = "0.0.5" }
rustfs-ecstore = { path = "crates/ecstore", version = "0.0.5" }
rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" }
@@ -100,24 +98,24 @@ async-compression = { version = "0.4.19" }
async-recursion = "1.1.1"
async-trait = "0.1.89"
axum = "0.8.8"
axum-server = { version = "0.8.0", features = ["tls-rustls"], default-features = false }
axum-extra = "0.12.3"
axum-server = { version = "0.8.0", features = ["tls-rustls-no-provider"], default-features = false }
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
pollster = "0.4.0"
hyper = { version = "1.8.1", features = ["http2", "http1", "server"] }
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "aws-lc-rs", "webpki-roots"] }
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "ring", "webpki-roots"] }
hyper-util = { version = "0.1.19", features = ["tokio", "server-auto", "server-graceful"] }
http = "1.4.0"
http-body = "1.0.1"
http-body-util = "0.1.3"
reqwest = { version = "0.12.28", default-features = false, features = ["rustls-tls-no-provider", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
reqwest = { version = "0.12.26", default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
socket2 = "0.6.1"
tokio = { version = "1.49.0", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "aws-lc-rs"] }
tokio-stream = { version = "0.1.18" }
tokio-test = "0.4.5"
tokio-util = { version = "0.7.18", features = ["io", "compat"] }
tokio = { version = "1.48.0", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "ring"] }
tokio-stream = { version = "0.1.17" }
tokio-test = "0.4.4"
tokio-util = { version = "0.7.17", features = ["io", "compat"] }
tonic = { version = "0.14.2", features = ["gzip"] }
tonic-prost = { version = "0.14.2" }
tonic-prost-build = { version = "0.14.2" }
@@ -133,12 +131,12 @@ form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.4"
rmcp = { version = "0.12.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.149", features = ["raw_value"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.2.0"
schemars = "1.1.0"
# Cryptography and Security
aes-gcm = { version = "0.11.0-rc.2", features = ["rand_core"] }
@@ -149,8 +147,8 @@ crc-fast = "1.6.0"
hmac = { version = "0.13.0-rc.3" }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
pbkdf2 = "0.13.0-rc.5"
rsa = { version = "0.10.0-rc.11" }
rustls = { version = "0.23.36", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] }
rsa = { version = "0.10.0-rc.10" }
rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pemfile = "2.2.0"
rustls-pki-types = "1.13.2"
sha1 = "0.11.0-rc.3"
@@ -165,26 +163,25 @@ time = { version = "0.3.44", features = ["std", "parsing", "formatting", "macros
# Utilities and Tools
anyhow = "1.0.100"
arc-swap = "1.8.0"
arc-swap = "1.7.1"
astral-tokio-tar = "0.5.6"
atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.12" }
aws-credential-types = { version = "1.2.11" }
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-sdk-s3 = { version = "1.117.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-smithy-types = { version = "1.3.5" }
base64 = "0.22.1"
base64-simd = "0.8.0"
brotli = "8.0.2"
cfg-if = "1.0.4"
clap = { version = "4.5.54", features = ["derive", "env"] }
const-str = { version = "1.0.0", features = ["std", "proc"] }
clap = { version = "4.5.53", features = ["derive", "env"] }
const-str = { version = "0.7.1", features = ["std", "proc"] }
convert_case = "0.10.0"
criterion = { version = "0.8", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "51.0.0"
derive_builder = "0.20.2"
dunce = "1.0.5"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.5"
@@ -198,11 +195,11 @@ hex-simd = "0.8.0"
highway = { version = "1.3.0" }
ipnetwork = { version = "0.21.1", features = ["serde"] }
lazy_static = "1.5.0"
libc = "0.2.179"
libc = "0.2.178"
libsystemd = "0.7.2"
local-ip-address = "0.6.8"
lz4 = "1.28.1"
matchit = "0.9.1"
matchit = "0.9.0"
md-5 = "0.11.0-rc.3"
md5 = "0.8.0"
mime_guess = "2.0.5"
@@ -218,15 +215,15 @@ path-absolutize = "3.1.1"
path-clean = "1.0.1"
pin-project-lite = "0.2.16"
pretty_assertions = "1.4.1"
rand = { version = "0.10.0-rc.6", features = ["serde"] }
rand = { version = "0.10.0-rc.5", features = ["serde"] }
rayon = "1.11.0"
reed-solomon-simd = { version = "3.1.0" }
regex = { version = "1.12.2" }
rumqttc = { version = "0.25.1" }
rust-embed = { version = "8.9.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.13.0-alpha", features = ["minio"], git = "https://github.com/s3s-project/s3s.git", branch = "main" }
serial_test = "3.3.1"
s3s = { version = "0.12.0-rc.6", features = ["minio"], git = "https://github.com/s3s-project/s3s.git", branch = "main" }
serial_test = "3.2.0"
shadow-rs = { version = "1.5.0", default-features = false }
siphasher = "1.0.1"
smallvec = { version = "1.15.1", features = ["serde"] }
@@ -235,9 +232,10 @@ snafu = "0.8.9"
snap = "1.1.1"
starshard = { version = "0.6.0", features = ["rayon", "async", "serde"] }
strum = { version = "0.27.2", features = ["derive"] }
sysctl = "0.7.1"
sysinfo = "0.37.2"
temp-env = "0.3.6"
tempfile = "3.24.0"
tempfile = "3.23.0"
test-case = "3.3.1"
thiserror = "2.0.17"
tracing = { version = "0.1.44" }
@@ -246,13 +244,13 @@ tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.8"
url = "2.5.7"
urlencoding = "2.1.3"
uuid = { version = "1.19.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.6.1", features = ["serde"] }
windows = { version = "0.62.2" }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "7.0.0"
zstd = "0.13.3"
@@ -266,14 +264,6 @@ opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry-stdout = { version = "0.31.0" }
# FTP and SFTP
libunftp = "0.21.0"
russh = { version = "0.56.0", features = ["aws-lc-rs", "rsa"], default-features = false }
russh-sftp = "2.1.1"
ssh-key = { version = "0.7.0-rc.4", features = ["std", "rsa", "ed25519"] }
suppaftp = { version = "7.0.7", features = ["tokio", "tokio-rustls", "rustls"] }
rcgen = "0.14.6"
# Performance Analysis and Memory Profiling
mimalloc = "0.1"
# Use tikv-jemallocator as memory allocator and enable performance analysis
@@ -285,6 +275,8 @@ jemalloc_pprof = { version = "0.8.1", features = ["symbolize", "flamegraph"] }
# Used to generate CPU performance analysis data and flame diagrams
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rustfs-mcp"]

View File

@@ -1,4 +1,4 @@
FROM alpine:3.23 AS build
FROM alpine:3.22 AS build
ARG TARGETARCH
ARG RELEASE=latest
@@ -40,7 +40,7 @@ RUN set -eux; \
rm -rf rustfs.zip /build/.tmp || true
FROM alpine:3.23
FROM alpine:3.22
ARG RELEASE=latest
ARG BUILD_DATE

View File

@@ -16,7 +16,7 @@ ARG BUILDPLATFORM
# -----------------------------
# Build stage
# -----------------------------
FROM rust:1.91-trixie AS builder
FROM rust:1.88-bookworm AS builder
# Re-declare args after FROM
ARG TARGETPLATFORM
@@ -148,8 +148,8 @@ ENV RUSTFS_ADDRESS=":9000" \
RUSTFS_OBS_LOG_DIRECTORY="/logs" \
RUSTFS_USERNAME="rustfs" \
RUSTFS_GROUPNAME="rustfs" \
RUSTFS_UID="10001" \
RUSTFS_GID="10001"
RUSTFS_UID="1000" \
RUSTFS_GID="1000"
# Note: We don't COPY source here because we expect it to be mounted at /app
# We rely on cargo run to build and run
@@ -187,8 +187,8 @@ RUN set -eux; \
# Create a conventional runtime user/group (final switch happens in entrypoint via chroot --userspec)
RUN set -eux; \
groupadd -g 10001 rustfs; \
useradd -u 10001 -g rustfs -M -s /usr/sbin/nologin rustfs
groupadd -g 1000 rustfs; \
useradd -u 1000 -g rustfs -M -s /usr/sbin/nologin rustfs
WORKDIR /app
@@ -212,8 +212,8 @@ ENV RUSTFS_ADDRESS=":9000" \
RUST_LOG="warn" \
RUSTFS_USERNAME="rustfs" \
RUSTFS_GROUPNAME="rustfs" \
RUSTFS_UID="10001" \
RUSTFS_GID="10001"
RUSTFS_UID="1000" \
RUSTFS_GID="1000"
EXPOSE 9000
VOLUME ["/data"]

411
Makefile
View File

@@ -2,80 +2,375 @@
# Remote development requires VSCode with Dev Containers, Remote SSH, Remote Explorer
# https://code.visualstudio.com/docs/remote/containers
###########
.PHONY: SHELL
# Makefile global config
# Use config.mak to override any of the following variables.
# Do not make changes here.
.DEFAULT_GOAL := help
.EXPORT_ALL_VARIABLES:
.ONESHELL:
.SILENT:
NUM_CORES := $(shell nproc 2>/dev/null || sysctl -n hw.ncpu)
MAKEFLAGS += -j$(NUM_CORES) -l$(NUM_CORES)
MAKEFLAGS += --silent
SHELL := $(shell which bash)
.SHELLFLAGS = -eu -o pipefail -c
DOCKER_CLI ?= docker
IMAGE_NAME ?= rustfs:v1.0.0
CONTAINER_NAME ?= rustfs-dev
# Docker build configurations
DOCKERFILE_PRODUCTION = Dockerfile
DOCKERFILE_SOURCE = Dockerfile.source
# Code quality and formatting targets
.PHONY: fmt
fmt:
@echo "🔧 Formatting code..."
cargo fmt --all
.PHONY: fmt-check
fmt-check:
@echo "📝 Checking code formatting..."
cargo fmt --all --check
.PHONY: clippy
clippy:
@echo "🔍 Running clippy checks..."
cargo clippy --fix --allow-dirty
cargo clippy --all-targets --all-features -- -D warnings
.PHONY: check
check:
@echo "🔨 Running compilation check..."
cargo check --all-targets
.PHONY: test
test:
@echo "🧪 Running tests..."
@if command -v cargo-nextest >/dev/null 2>&1; then \
cargo nextest run --all --exclude e2e_test; \
else \
echo " cargo-nextest not found; falling back to 'cargo test'"; \
cargo test --workspace --exclude e2e_test -- --nocapture; \
fi
cargo test --all --doc
.PHONY: pre-commit
pre-commit: fmt clippy check test
@echo "✅ All pre-commit checks passed!"
.PHONY: setup-hooks
setup-hooks:
@echo "🔧 Setting up git hooks..."
chmod +x .git/hooks/pre-commit
@echo "✅ Git hooks setup complete!"
.PHONY: e2e-server
e2e-server:
sh $(shell pwd)/scripts/run.sh
.PHONY: probe-e2e
probe-e2e:
sh $(shell pwd)/scripts/probe.sh
# Native build using build-rustfs.sh script
.PHONY: build
build:
@echo "🔨 Building RustFS using build-rustfs.sh script..."
./build-rustfs.sh
.PHONY: build-dev
build-dev:
@echo "🔨 Building RustFS in development mode..."
./build-rustfs.sh --dev
# Docker-based build (alternative approach)
# Usage: make BUILD_OS=ubuntu22.04 build-docker
# Output: target/ubuntu22.04/release/rustfs
BUILD_OS ?= rockylinux9.3
.PHONY: build-docker
build-docker: SOURCE_BUILD_IMAGE_NAME = rustfs-$(BUILD_OS):v1
build-docker: SOURCE_BUILD_CONTAINER_NAME = rustfs-$(BUILD_OS)-build
build-docker: BUILD_CMD = /root/.cargo/bin/cargo build --release --bin rustfs --target-dir /root/s3-rustfs/target/$(BUILD_OS)
build-docker:
@echo "🐳 Building RustFS using Docker ($(BUILD_OS))..."
$(DOCKER_CLI) buildx build -t $(SOURCE_BUILD_IMAGE_NAME) -f $(DOCKERFILE_SOURCE) .
$(DOCKER_CLI) run --rm --name $(SOURCE_BUILD_CONTAINER_NAME) -v $(shell pwd):/root/s3-rustfs -it $(SOURCE_BUILD_IMAGE_NAME) $(BUILD_CMD)
# Makefile colors config
bold := $(shell tput bold)
normal := $(shell tput sgr0)
errorTitle := $(shell tput setab 1 && tput bold && echo '\n')
recommendation := $(shell tput setab 4)
underline := $(shell tput smul)
reset := $(shell tput -Txterm sgr0)
black := $(shell tput setaf 0)
red := $(shell tput setaf 1)
green := $(shell tput setaf 2)
yellow := $(shell tput setaf 3)
blue := $(shell tput setaf 4)
magenta := $(shell tput setaf 5)
cyan := $(shell tput setaf 6)
white := $(shell tput setaf 7)
.PHONY: build-musl
build-musl:
@echo "🔨 Building rustfs for x86_64-unknown-linux-musl..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform x86_64-unknown-linux-musl
define HEADER
How to use me:
# To get help for each target
${bold}make help${reset}
.PHONY: build-gnu
build-gnu:
@echo "🔨 Building rustfs for x86_64-unknown-linux-gnu..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform x86_64-unknown-linux-gnu
# To run and execute a target
${bold}make ${cyan}<target>${reset}
.PHONY: build-musl-arm64
build-musl-arm64:
@echo "🔨 Building rustfs for aarch64-unknown-linux-musl..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform aarch64-unknown-linux-musl
💡 For more help use 'make help', 'make help-build' or 'make help-docker'
.PHONY: build-gnu-arm64
build-gnu-arm64:
@echo "🔨 Building rustfs for aarch64-unknown-linux-gnu..."
@echo "💡 On macOS/Windows, use 'make build-docker' or 'make docker-dev' instead"
./build-rustfs.sh --platform aarch64-unknown-linux-gnu
🦀 RustFS Makefile Help:
.PHONY: deploy-dev
deploy-dev: build-musl
@echo "🚀 Deploying to dev server: $${IP}"
./scripts/dev_deploy.sh $${IP}
📋 Main Command Categories:
make help-build # Show build-related help
make help-docker # Show Docker-related help
# ========================================================================================
# Docker Multi-Architecture Builds (Primary Methods)
# ========================================================================================
🔧 Code Quality:
make fmt # Format code
make clippy # Run clippy checks
make test # Run tests
make pre-commit # Run all pre-commit checks
# Production builds using docker-buildx.sh (for CI/CD and production)
.PHONY: docker-buildx
docker-buildx:
@echo "🏗️ Building multi-architecture production Docker images with buildx..."
./docker-buildx.sh
🚀 Quick Start:
make build # Build RustFS binary
make docker-dev-local # Build development Docker image (local)
make dev-env-start # Start development environment
.PHONY: docker-buildx-push
docker-buildx-push:
@echo "🚀 Building and pushing multi-architecture production Docker images with buildx..."
./docker-buildx.sh --push
.PHONY: docker-buildx-version
docker-buildx-version:
@if [ -z "$(VERSION)" ]; then \
echo "❌ Error: Please specify version, example: make docker-buildx-version VERSION=v1.0.0"; \
exit 1; \
fi
@echo "🏗️ Building multi-architecture production Docker images (version: $(VERSION))..."
./docker-buildx.sh --release $(VERSION)
.PHONY: docker-buildx-push-version
docker-buildx-push-version:
@if [ -z "$(VERSION)" ]; then \
echo "❌ Error: Please specify version, example: make docker-buildx-push-version VERSION=v1.0.0"; \
exit 1; \
fi
@echo "🚀 Building and pushing multi-architecture production Docker images (version: $(VERSION))..."
./docker-buildx.sh --release $(VERSION) --push
# Development/Source builds using direct buildx commands
.PHONY: docker-dev
docker-dev:
@echo "🏗️ Building multi-architecture development Docker images with buildx..."
@echo "💡 This builds from source code and is intended for local development and testing"
@echo "⚠️ Multi-arch images cannot be loaded locally, use docker-dev-push to push to registry"
$(DOCKER_CLI) buildx build \
--platform linux/amd64,linux/arm64 \
--file $(DOCKERFILE_SOURCE) \
--tag rustfs:source-latest \
--tag rustfs:dev-latest \
.
.PHONY: docker-dev-local
docker-dev-local:
@echo "🏗️ Building single-architecture development Docker image for local use..."
@echo "💡 This builds from source code for the current platform and loads locally"
$(DOCKER_CLI) buildx build \
--file $(DOCKERFILE_SOURCE) \
--tag rustfs:source-latest \
--tag rustfs:dev-latest \
--load \
.
.PHONY: docker-dev-push
docker-dev-push:
@if [ -z "$(REGISTRY)" ]; then \
echo "❌ Error: Please specify registry, example: make docker-dev-push REGISTRY=ghcr.io/username"; \
exit 1; \
fi
@echo "🚀 Building and pushing multi-architecture development Docker images..."
@echo "💡 Pushing to registry: $(REGISTRY)"
$(DOCKER_CLI) buildx build \
--platform linux/amd64,linux/arm64 \
--file $(DOCKERFILE_SOURCE) \
--tag $(REGISTRY)/rustfs:source-latest \
--tag $(REGISTRY)/rustfs:dev-latest \
--push \
.
endef
export HEADER
-include $(addsuffix /*.mak, $(shell find .config/make -type d))
# Local production builds using direct buildx (alternative to docker-buildx.sh)
.PHONY: docker-buildx-production-local
docker-buildx-production-local:
@echo "🏗️ Building single-architecture production Docker image locally..."
@echo "💡 Alternative to docker-buildx.sh for local testing"
$(DOCKER_CLI) buildx build \
--file $(DOCKERFILE_PRODUCTION) \
--tag rustfs:production-latest \
--tag rustfs:latest \
--load \
--build-arg RELEASE=latest \
.
# ========================================================================================
# Single Architecture Docker Builds (Traditional)
# ========================================================================================
.PHONY: docker-build-production
docker-build-production:
@echo "🏗️ Building single-architecture production Docker image..."
@echo "💡 Consider using 'make docker-buildx-production-local' for multi-arch support"
$(DOCKER_CLI) build -f $(DOCKERFILE_PRODUCTION) -t rustfs:latest .
.PHONY: docker-build-source
docker-build-source:
@echo "🏗️ Building single-architecture source Docker image..."
@echo "💡 Consider using 'make docker-dev-local' for multi-arch support"
DOCKER_BUILDKIT=1 $(DOCKER_CLI) build \
--build-arg BUILDKIT_INLINE_CACHE=1 \
-f $(DOCKERFILE_SOURCE) -t rustfs:source .
# ========================================================================================
# Development Environment
# ========================================================================================
.PHONY: dev-env-start
dev-env-start:
@echo "🚀 Starting development environment..."
$(DOCKER_CLI) buildx build \
--file $(DOCKERFILE_SOURCE) \
--tag rustfs:dev \
--load \
.
$(DOCKER_CLI) stop $(CONTAINER_NAME) 2>/dev/null || true
$(DOCKER_CLI) rm $(CONTAINER_NAME) 2>/dev/null || true
$(DOCKER_CLI) run -d --name $(CONTAINER_NAME) \
-p 9010:9010 -p 9000:9000 \
-v $(shell pwd):/workspace \
-it rustfs:dev
.PHONY: dev-env-stop
dev-env-stop:
@echo "🛑 Stopping development environment..."
$(DOCKER_CLI) stop $(CONTAINER_NAME) 2>/dev/null || true
$(DOCKER_CLI) rm $(CONTAINER_NAME) 2>/dev/null || true
.PHONY: dev-env-restart
dev-env-restart: dev-env-stop dev-env-start
# ========================================================================================
# Build Utilities
# ========================================================================================
.PHONY: docker-inspect-multiarch
docker-inspect-multiarch:
@if [ -z "$(IMAGE)" ]; then \
echo "❌ Error: Please specify image, example: make docker-inspect-multiarch IMAGE=rustfs/rustfs:latest"; \
exit 1; \
fi
@echo "🔍 Inspecting multi-architecture image: $(IMAGE)"
docker buildx imagetools inspect $(IMAGE)
.PHONY: build-cross-all
build-cross-all:
@echo "🔧 Building all target architectures..."
@echo "💡 On macOS/Windows, use 'make docker-dev' for reliable multi-arch builds"
@echo "🔨 Generating protobuf code..."
cargo run --bin gproto || true
@echo "🔨 Building x86_64-unknown-linux-gnu..."
./build-rustfs.sh --platform x86_64-unknown-linux-gnu
@echo "🔨 Building aarch64-unknown-linux-gnu..."
./build-rustfs.sh --platform aarch64-unknown-linux-gnu
@echo "🔨 Building x86_64-unknown-linux-musl..."
./build-rustfs.sh --platform x86_64-unknown-linux-musl
@echo "🔨 Building aarch64-unknown-linux-musl..."
./build-rustfs.sh --platform aarch64-unknown-linux-musl
@echo "✅ All architectures built successfully!"
# ========================================================================================
# Help and Documentation
# ========================================================================================
.PHONY: help-build
help-build:
@echo "🔨 RustFS Build Help:"
@echo ""
@echo "🚀 Local Build (Recommended):"
@echo " make build # Build RustFS binary (includes console by default)"
@echo " make build-dev # Development mode build"
@echo " make build-musl # Build x86_64 musl version"
@echo " make build-gnu # Build x86_64 GNU version"
@echo " make build-musl-arm64 # Build aarch64 musl version"
@echo " make build-gnu-arm64 # Build aarch64 GNU version"
@echo ""
@echo "🐳 Docker Build:"
@echo " make build-docker # Build using Docker container"
@echo " make build-docker BUILD_OS=ubuntu22.04 # Specify build system"
@echo ""
@echo "🏗️ Cross-architecture Build:"
@echo " make build-cross-all # Build binaries for all architectures"
@echo ""
@echo "🔧 Direct usage of build-rustfs.sh script:"
@echo " ./build-rustfs.sh --help # View script help"
@echo " ./build-rustfs.sh --no-console # Build without console resources"
@echo " ./build-rustfs.sh --force-console-update # Force update console resources"
@echo " ./build-rustfs.sh --dev # Development mode build"
@echo " ./build-rustfs.sh --sign # Sign binary files"
@echo " ./build-rustfs.sh --platform x86_64-unknown-linux-gnu # Specify target platform"
@echo " ./build-rustfs.sh --skip-verification # Skip binary verification"
@echo ""
@echo "💡 build-rustfs.sh script provides more options, smart detection and binary verification"
.PHONY: help-docker
help-docker:
@echo "🐳 Docker Multi-architecture Build Help:"
@echo ""
@echo "🚀 Production Image Build (Recommended to use docker-buildx.sh):"
@echo " make docker-buildx # Build production multi-arch image (no push)"
@echo " make docker-buildx-push # Build and push production multi-arch image"
@echo " make docker-buildx-version VERSION=v1.0.0 # Build specific version"
@echo " make docker-buildx-push-version VERSION=v1.0.0 # Build and push specific version"
@echo ""
@echo "🔧 Development/Source Image Build (Local development testing):"
@echo " make docker-dev # Build dev multi-arch image (cannot load locally)"
@echo " make docker-dev-local # Build dev single-arch image (local load)"
@echo " make docker-dev-push REGISTRY=xxx # Build and push dev image"
@echo ""
@echo "🏗️ Local Production Image Build (Alternative):"
@echo " make docker-buildx-production-local # Build production single-arch image locally"
@echo ""
@echo "📦 Single-architecture Build (Traditional way):"
@echo " make docker-build-production # Build single-arch production image"
@echo " make docker-build-source # Build single-arch source image"
@echo ""
@echo "🚀 Development Environment Management:"
@echo " make dev-env-start # Start development container environment"
@echo " make dev-env-stop # Stop development container environment"
@echo " make dev-env-restart # Restart development container environment"
@echo ""
@echo "🔧 Auxiliary Tools:"
@echo " make build-cross-all # Build binaries for all architectures"
@echo " make docker-inspect-multiarch IMAGE=xxx # Check image architecture support"
@echo ""
@echo "📋 Environment Variables:"
@echo " REGISTRY Image registry address (required for push)"
@echo " DOCKERHUB_USERNAME Docker Hub username"
@echo " DOCKERHUB_TOKEN Docker Hub access token"
@echo " GITHUB_TOKEN GitHub access token"
@echo ""
@echo "💡 Suggestions:"
@echo " - Production use: Use docker-buildx* commands (based on precompiled binaries)"
@echo " - Local development: Use docker-dev* commands (build from source)"
@echo " - Development environment: Use dev-env-* commands to manage dev containers"
.PHONY: help
help:
@echo "🦀 RustFS Makefile Help:"
@echo ""
@echo "📋 Main Command Categories:"
@echo " make help-build # Show build-related help"
@echo " make help-docker # Show Docker-related help"
@echo ""
@echo "🔧 Code Quality:"
@echo " make fmt # Format code"
@echo " make clippy # Run clippy checks"
@echo " make test # Run tests"
@echo " make pre-commit # Run all pre-commit checks"
@echo ""
@echo "🚀 Quick Start:"
@echo " make build # Build RustFS binary"
@echo " make docker-dev-local # Build development Docker image (local)"
@echo " make dev-env-start # Start development environment"
@echo ""
@echo "💡 For more help use 'make help-build' or 'make help-docker'"

View File

@@ -10,11 +10,6 @@
<a href="https://hellogithub.com/repository/rustfs/rustfs" target="_blank"><img src="https://abroad.hellogithub.com/v1/widgets/recommend.svg?rid=b95bcb72bdc340b68f16fdf6790b7d5b&claim_uid=MsbvjYeLDKAH457&theme=small" alt="FeaturedHelloGitHub" /></a>
</p>
<p align="center">
<a href="https://trendshift.io/repositories/14181" target="_blank"><img src="https://trendshift.io/api/badge/repositories/14181" alt="rustfs%2Frustfs | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>
</p>
<p align="center">
<a href="https://docs.rustfs.com/installation/">Getting Started</a>
· <a href="https://docs.rustfs.com/">Docs</a>
@@ -50,10 +45,10 @@ Unlike other storage systems, RustFS is released under the permissible Apache 2.
| :--- | :--- | :--- | :--- |
| **S3 Core Features** | ✅ Available | **Bitrot Protection** | ✅ Available |
| **Upload / Download** | ✅ Available | **Single Node Mode** | ✅ Available |
| **Versioning** | ✅ Available | **Bucket Replication** | ✅ Available |
| **Versioning** | ✅ Available | **Bucket Replication** | ⚠️ Partial Support |
| **Logging** | ✅ Available | **Lifecycle Management** | 🚧 Under Testing |
| **Event Notifications** | ✅ Available | **Distributed Mode** | 🚧 Under Testing |
| **K8s Helm Charts** | ✅ Available | **RustFS KMS** | 🚧 Under Testing |
| **K8s Helm Charts** | ✅ Available | **OPA (Open Policy Agent)** | 🚧 Under Testing |
@@ -83,13 +78,6 @@ Unlike other storage systems, RustFS is released under the permissible Apache 2.
| **Edge & IoT** | **Strong Edge Support**<br>Ideal for secure, innovative edge devices. | **Weak Edge Support**<br>Often too heavy for edge gateways. |
| **Risk Profile** | **Enterprise Risk Mitigation**<br>Clear IP rights and safe for commercial use. | **Legal Risks**<br>Intellectual property ambiguity and usage restrictions. |
## Staying ahead
Star RustFS on GitHub and be instantly notified of new releases.
<img src="https://github.com/user-attachments/assets/7ee40bb4-3e46-4eac-b0d0-5fbeb85ff8f3" />
## Quickstart
To get started with RustFS, follow these steps:
@@ -186,7 +174,7 @@ nix run
### Accessing RustFS
5. **Access the Console**: Open your web browser and navigate to `http://localhost:9001` to access the RustFS console.
5. **Access the Console**: Open your web browser and navigate to `http://localhost:9000` to access the RustFS console.
* Default credentials: `rustfsadmin` / `rustfsadmin`
6. **Create a Bucket**: Use the console to create a new bucket for your objects.
7. **Upload Objects**: You can upload files directly through the console or use S3-compatible APIs/clients to interact with your RustFS instance.
@@ -227,6 +215,11 @@ RustFS is a community-driven project, and we appreciate all contributions. Check
<img src="https://opencollective.com/rustfs/contributors.svg?width=890&limit=500&button=false" alt="Contributors" />
</a>
## Github Trending Top
🚀 RustFS is beloved by open-source enthusiasts and enterprise users worldwide, often appearing on the GitHub Trending top charts.
<a href="https://trendshift.io/repositories/14181" target="_blank"><img src="https://raw.githubusercontent.com/rustfs/rustfs/refs/heads/main/docs/rustfs-trending.jpg" alt="rustfs%2Frustfs | Trendshift" /></a>
## Star History

View File

@@ -10,10 +10,6 @@
<a href="https://hellogithub.com/repository/rustfs/rustfs" target="_blank"><img src="https://abroad.hellogithub.com/v1/widgets/recommend.svg?rid=b95bcb72bdc340b68f16fdf6790b7d5b&claim_uid=MsbvjYeLDKAH457&theme=small" alt="FeaturedHelloGitHub" /></a>
</p>
<p align="center">
<a href="https://trendshift.io/repositories/14181" target="_blank"><img src="https://trendshift.io/api/badge/repositories/14181" alt="rustfs%2Frustfs | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>
</p>
<p align="center">
<a href="https://docs.rustfs.com/installation/">快速开始</a>
· <a href="https://docs.rustfs.com/">文档</a>
@@ -21,8 +17,6 @@
· <a href="https://github.com/rustfs/rustfs/discussions">社区讨论</a>
</p>
<p align="center">
<a href="https://github.com/rustfs/rustfs/blob/main/README.md">English</a> | 简体中文 |
<a href="https://readme-i18n.com/rustfs/rustfs?lang=de">Deutsch</a> |
@@ -52,7 +46,7 @@ RustFS 是一个基于 Rust 构建的高性能分布式对象存储系统。Rust
| :--- | :--- | :--- | :--- |
| **S3 核心功能** | ✅ 可用 | **Bitrot (防数据腐烂)** | ✅ 可用 |
| **上传 / 下载** | ✅ 可用 | **单机模式** | ✅ 可用 |
| **版本控制** | ✅ 可用 | **存储桶复制** | 可用 |
| **版本控制** | ✅ 可用 | **存储桶复制** | ⚠️ 部分可用 |
| **日志功能** | ✅ 可用 | **生命周期管理** | 🚧 测试中 |
| **事件通知** | ✅ 可用 | **分布式模式** | 🚧 测试中 |
| **K8s Helm Chart** | ✅ 可用 | **OPA (策略引擎)** | 🚧 测试中 |
@@ -86,15 +80,6 @@ RustFS 是一个基于 Rust 构建的高性能分布式对象存储系统。Rust
| **成本** | **稳定且免费**<br>免费社区支持,稳定的商业定价。 | **高昂成本**<br>1PiB 的成本可能高达 250,000 美元。 |
| **风险控制** | **企业级风险规避**<br>清晰的知识产权,商业使用安全无忧。 | **法律风险**<br>知识产权归属模糊及使用限制风险。 |
## 保持领先
在 GitHub 上为 RustFS 点赞,即可第一时间收到新版本发布通知。
<img src="https://github.com/user-attachments/assets/7ee40bb4-3e46-4eac-b0d0-5fbeb85ff8f3" />
## 快速开始
请按照以下步骤快速上手 RustFS
@@ -215,7 +200,11 @@ RustFS 是一个社区驱动的项目,我们感谢所有的贡献。请查看
<img src="https://opencollective.com/rustfs/contributors.svg?width=890&limit=500&button=false" alt="Contributors" />
</a>
## Github Trending Top
🚀 RustFS 深受全球开源爱好者和企业用户的喜爱,经常荣登 GitHub Trending 榜单。
<a href="https://trendshift.io/repositories/14181" target="_blank"><img src="https://raw.githubusercontent.com/rustfs/rustfs/refs/heads/main/docs/rustfs-trending.jpg" alt="rustfs%2Frustfs | Trendshift" /></a>
## Star 历史

View File

@@ -1,40 +1,19 @@
# Security Policy
## Security Philosophy
At RustFS, we take security seriously. We believe that **transparency leads to better security**. The more open our code is, the more eyes are on it, and the faster we can identify and resolve potential issues.
We highly value the contributions of the security community and welcome anyone to audit our code. Your efforts help us make RustFS safer for everyone.
## Supported Versions
To help us focus our security efforts, please refer to the table below to see which versions of RustFS are currently supported with security updates.
Security updates are provided for the latest released version of this project.
| Version | Supported |
| ------- | ------------------ |
| Latest | :white_check_mark: |
| < 1.0 | :x: |
| 1.x.x | :white_check_mark: |
## Reporting a Vulnerability
If you discover a security vulnerability in RustFS, we appreciate your help in disclosing it to us responsibly.
Please report security vulnerabilities **privately** via GitHub Security Advisories:
**Please do not open a public GitHub issue for security vulnerabilities.** Publicly disclosing a vulnerability can put the entire community at risk before a fix is available.
https://github.com/rustfs/rustfs/security/advisories/new
### How to Report
Do **not** open a public issue for security-sensitive bugs.
1. https://github.com/rustfs/rustfs/security/advisories/new
2. Please email us directly at: **security@rustfs.com**
In your email, please include:
1. **Description**: A detailed description of the vulnerability.
2. **Steps to Reproduce**: Steps or a script to reproduce the issue.
3. **Impact**: The potential impact of the vulnerability.
### Our Response Process
1. **Acknowledgment**: We will acknowledge your email within 48 hours.
2. **Assessment**: We will investigate the issue and determine its severity.
3. **Fix & Disclosure**: We will work on a patch. Once the patch is released, we will publicly announce the vulnerability and acknowledge your contribution (unless you prefer to remain anonymous).
Thank you for helping keep RustFS and its users safe!
You can expect an initial response within a reasonable timeframe. Further updates will be provided as the report is triaged.

View File

@@ -36,9 +36,6 @@ clen = "clen"
datas = "datas"
bre = "bre"
abd = "abd"
mak = "mak"
# s3-tests original test names (cannot be changed)
nonexisted = "nonexisted"
[files]
extend-exclude = []
extend-exclude = []

View File

@@ -348,7 +348,7 @@ impl ErasureSetHealer {
}
// save checkpoint periodically
if global_obj_idx.is_multiple_of(100) {
if global_obj_idx % 100 == 0 {
checkpoint_manager
.update_position(bucket_index, *current_object_index)
.await?;

View File

@@ -468,17 +468,14 @@ impl HealManager {
let active_heals = self.active_heals.clone();
let cancel_token = self.cancel_token.clone();
let storage = self.storage.clone();
let mut duration = {
let config = config.read().await;
config.heal_interval
};
if duration < Duration::from_secs(1) {
duration = Duration::from_secs(1);
}
info!("start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}", duration);
info!(
"start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}",
config.read().await.heal_interval
);
tokio::spawn(async move {
let mut interval = interval(duration);
let mut interval = interval(config.read().await.heal_interval);
loop {
tokio::select! {
@@ -492,11 +489,12 @@ impl HealManager {
for (_, disk_opt) in GLOBAL_LOCAL_DISK_MAP.read().await.iter() {
if let Some(disk) = disk_opt {
// detect unformatted disk via get_disk_id()
if let Err(err) = disk.get_disk_id().await
&& err == DiskError::UnformattedDisk {
if let Err(err) = disk.get_disk_id().await {
if err == DiskError::UnformattedDisk {
endpoints.push(disk.endpoint());
continue;
}
}
}
}

View File

@@ -541,10 +541,10 @@ impl ResumeUtils {
for entry in entries {
if entry.ends_with(&format!("_{RESUME_STATE_FILE}")) {
// Extract task ID from filename: {task_id}_ahm_resume_state.json
if let Some(task_id) = entry.strip_suffix(&format!("_{RESUME_STATE_FILE}"))
&& !task_id.is_empty()
{
task_ids.push(task_id.to_string());
if let Some(task_id) = entry.strip_suffix(&format!("_{RESUME_STATE_FILE}")) {
if !task_id.is_empty() {
task_ids.push(task_id.to_string());
}
}
}
}

View File

@@ -83,10 +83,10 @@ pub struct CheckpointManager {
impl CheckpointManager {
pub fn new(node_id: &str, data_dir: &Path) -> Self {
if !data_dir.exists()
&& let Err(e) = std::fs::create_dir_all(data_dir)
{
error!("create data dir failed {:?}: {}", data_dir, e);
if !data_dir.exists() {
if let Err(e) = std::fs::create_dir_all(data_dir) {
error!("create data dir failed {:?}: {}", data_dir, e);
}
}
let checkpoint_file = data_dir.join(format!("scanner_checkpoint_{node_id}.json"));

View File

@@ -30,7 +30,7 @@ use rustfs_ecstore::{
bucket::versioning::VersioningApi,
bucket::versioning_sys::BucketVersioningSys,
data_usage::{aggregate_local_snapshots, compute_bucket_usage, store_data_usage_in_backend},
disk::{DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
set_disk::SetDisks,
store_api::ObjectInfo,
};
@@ -401,10 +401,10 @@ impl Scanner {
let mut latest_update: Option<SystemTime> = None;
for snapshot in &outcome.snapshots {
if let Some(update) = snapshot.last_update
&& latest_update.is_none_or(|current| update > current)
{
latest_update = Some(update);
if let Some(update) = snapshot.last_update {
if latest_update.is_none_or(|current| update > current) {
latest_update = Some(update);
}
}
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
@@ -527,20 +527,28 @@ impl Scanner {
let (disks, _) = set_disks.get_online_disks_with_healing(false).await;
if let Some(disk) = disks.first() {
let bucket_path = disk.path().join(bucket_name);
if bucket_path.exists()
&& let Ok(entries) = std::fs::read_dir(&bucket_path)
{
for entry in entries.flatten() {
if let Ok(file_type) = entry.file_type()
&& file_type.is_dir()
&& let Some(object_name) = entry.file_name().to_str()
&& !object_name.starts_with('.')
{
debug!("Deep scanning object: {}/{}", bucket_name, object_name);
if let Err(e) = self.verify_object_integrity(bucket_name, object_name).await {
warn!("Object integrity verification failed for {}/{}: {}", bucket_name, object_name, e);
} else {
debug!("Object integrity verification passed for {}/{}", bucket_name, object_name);
if bucket_path.exists() {
if let Ok(entries) = std::fs::read_dir(&bucket_path) {
for entry in entries.flatten() {
if let Ok(file_type) = entry.file_type() {
if file_type.is_dir() {
if let Some(object_name) = entry.file_name().to_str() {
if !object_name.starts_with('.') {
debug!("Deep scanning object: {}/{}", bucket_name, object_name);
if let Err(e) = self.verify_object_integrity(bucket_name, object_name).await {
warn!(
"Object integrity verification failed for {}/{}: {}",
bucket_name, object_name, e
);
} else {
debug!(
"Object integrity verification passed for {}/{}",
bucket_name, object_name
);
}
}
}
}
}
}
}
@@ -851,10 +859,10 @@ impl Scanner {
// Phase 2: Minimal EC verification for critical objects only
// Note: The main scanning is now handled by NodeScanner in the background
if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn()
&& let Err(e) = self.minimal_ec_verification(&ecstore).await
{
error!("Minimal EC verification failed: {}", e);
if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() {
if let Err(e) = self.minimal_ec_verification(&ecstore).await {
error!("Minimal EC verification failed: {}", e);
}
}
// Update scan duration
@@ -942,12 +950,13 @@ impl Scanner {
}
// If there is still no data, try backend before persisting zeros
if data_usage.buckets_usage.is_empty()
&& let Ok(existing) = rustfs_ecstore::data_usage::load_data_usage_from_backend(ecstore.clone()).await
&& !existing.buckets_usage.is_empty()
{
info!("Using existing backend data usage during fallback backoff");
data_usage = existing;
if data_usage.buckets_usage.is_empty() {
if let Ok(existing) = rustfs_ecstore::data_usage::load_data_usage_from_backend(ecstore.clone()).await {
if !existing.buckets_usage.is_empty() {
info!("Using existing backend data usage during fallback backoff");
data_usage = existing;
}
}
}
// Avoid overwriting valid backend stats with zeros when fallback is throttled
@@ -1712,34 +1721,36 @@ impl Scanner {
// check disk status, if offline, submit erasure set heal task
if !metrics.is_online {
let enable_healing = self.config.read().await.enable_healing;
if enable_healing && let Some(heal_manager) = &self.heal_manager {
// Get bucket list for erasure set healing
let buckets = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for disk healing: {}", e);
return Err(Error::Storage(e));
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
// Get bucket list for erasure set healing
let buckets = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for disk healing: {}", e);
return Err(Error::Storage(e));
}
},
None => {
error!("No ECStore available for getting bucket list");
return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available")));
}
},
None => {
error!("No ECStore available for getting bucket list");
return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available")));
}
};
};
let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx);
let req = HealRequest::new(
crate::heal::task::HealType::ErasureSet { buckets, set_disk_id },
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::High,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("disk offline, submit erasure set heal task: {} {}", task_id, disk_path);
}
Err(e) => {
error!("disk offline, submit erasure set heal task failed: {} {}", disk_path, e);
let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx);
let req = HealRequest::new(
crate::heal::task::HealType::ErasureSet { buckets, set_disk_id },
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::High,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("disk offline, submit erasure set heal task: {} {}", task_id, disk_path);
}
Err(e) => {
error!("disk offline, submit erasure set heal task failed: {} {}", disk_path, e);
}
}
}
}
@@ -1767,34 +1778,36 @@ impl Scanner {
// disk access failed, submit erasure set heal task
let enable_healing = self.config.read().await.enable_healing;
if enable_healing && let Some(heal_manager) = &self.heal_manager {
// Get bucket list for erasure set healing
let buckets = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for disk healing: {}", e);
return Err(Error::Storage(e));
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
// Get bucket list for erasure set healing
let buckets = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for disk healing: {}", e);
return Err(Error::Storage(e));
}
},
None => {
error!("No ECStore available for getting bucket list");
return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available")));
}
},
None => {
error!("No ECStore available for getting bucket list");
return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available")));
}
};
};
let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx);
let req = HealRequest::new(
crate::heal::task::HealType::ErasureSet { buckets, set_disk_id },
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::Urgent,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("disk access failed, submit erasure set heal task: {} {}", task_id, disk_path);
}
Err(heal_err) => {
error!("disk access failed, submit erasure set heal task failed: {} {}", disk_path, heal_err);
let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx);
let req = HealRequest::new(
crate::heal::task::HealType::ErasureSet { buckets, set_disk_id },
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::Urgent,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("disk access failed, submit erasure set heal task: {} {}", task_id, disk_path);
}
Err(heal_err) => {
error!("disk access failed, submit erasure set heal task failed: {} {}", disk_path, heal_err);
}
}
}
}
@@ -1807,11 +1820,11 @@ impl Scanner {
let mut disk_objects = HashMap::new();
for volume in volumes {
// check cancel token
if let Some(cancel_token) = get_ahm_services_cancel_token()
&& cancel_token.is_cancelled()
{
info!("Cancellation requested, stopping disk scan");
break;
if let Some(cancel_token) = get_ahm_services_cancel_token() {
if cancel_token.is_cancelled() {
info!("Cancellation requested, stopping disk scan");
break;
}
}
match self.scan_volume(disk, &volume.name).await {
@@ -1942,96 +1955,104 @@ impl Scanner {
// object metadata damaged, submit metadata heal task
let enable_healing = self.config.read().await.enable_healing;
if enable_healing && let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("object metadata damaged, submit heal task: {} {} / {}", task_id, bucket, entry.name);
}
Err(e) => {
error!("object metadata damaged, submit heal task failed: {} / {} {}", bucket, entry.name, e);
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!(
"object metadata damaged, submit heal task: {} {} / {}",
task_id, bucket, entry.name
);
}
Err(e) => {
error!(
"object metadata damaged, submit heal task failed: {} / {} {}",
bucket, entry.name, e
);
}
}
}
}
} else {
// Apply lifecycle actions
if let Some(lifecycle_config) = &lifecycle_config
&& disk.is_local()
{
let vcfg = BucketVersioningSys::get(bucket).await.ok();
if let Some(lifecycle_config) = &lifecycle_config {
if let Disk::Local(_local_disk) = &**disk {
let vcfg = BucketVersioningSys::get(bucket).await.ok();
let mut scanner_item = ScannerItem {
bucket: bucket.to_string(),
object_name: entry.name.clone(),
lifecycle: Some(lifecycle_config.clone()),
versioning: versioning_config.clone(),
};
//ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) {
Ok(fivs) => fivs,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
let mut size_s = SizeSummary::default();
let obj_infos = match scanner_item.apply_versions_actions(&fivs.versions).await {
Ok(obj_infos) => obj_infos,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
let versioned = if let Some(vcfg) = vcfg.as_ref() {
vcfg.versioned(&scanner_item.object_name)
} else {
false
};
#[allow(unused_assignments)]
let mut obj_deleted = false;
for info in obj_infos.iter() {
let sz: i64;
(obj_deleted, sz) = scanner_item.apply_actions(info, &mut size_s).await;
if obj_deleted {
break;
}
let actual_sz = match info.get_actual_size() {
Ok(size) => size,
Err(_) => continue,
let mut scanner_item = ScannerItem {
bucket: bucket.to_string(),
object_name: entry.name.clone(),
lifecycle: Some(lifecycle_config.clone()),
versioning: versioning_config.clone(),
};
//ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) {
Ok(fivs) => fivs,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
let mut size_s = SizeSummary::default();
let obj_infos = match scanner_item.apply_versions_actions(&fivs.versions).await {
Ok(obj_infos) => obj_infos,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
if info.delete_marker {
size_s.delete_markers += 1;
let versioned = if let Some(vcfg) = vcfg.as_ref() {
vcfg.versioned(&scanner_item.object_name)
} else {
false
};
#[allow(unused_assignments)]
let mut obj_deleted = false;
for info in obj_infos.iter() {
let sz: i64;
(obj_deleted, sz) = scanner_item.apply_actions(info, &mut size_s).await;
if obj_deleted {
break;
}
let actual_sz = match info.get_actual_size() {
Ok(size) => size,
Err(_) => continue,
};
if info.delete_marker {
size_s.delete_markers += 1;
}
if info.version_id.is_some() && sz == actual_sz {
size_s.versions += 1;
}
size_s.total_size += sz as usize;
if info.delete_marker {
continue;
}
}
if info.version_id.is_some() && sz == actual_sz {
size_s.versions += 1;
for free_version in fivs.free_versions.iter() {
let _obj_info = rustfs_ecstore::store_api::ObjectInfo::from_file_info(
free_version,
&scanner_item.bucket,
&scanner_item.object_name,
versioned,
);
}
size_s.total_size += sz as usize;
if info.delete_marker {
continue;
}
// todo: global trace
/*if obj_deleted {
return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into());
}*/
}
for free_version in fivs.free_versions.iter() {
let _obj_info = rustfs_ecstore::store_api::ObjectInfo::from_file_info(
free_version,
&scanner_item.bucket,
&scanner_item.object_name,
versioned,
);
}
// todo: global trace
/*if obj_deleted {
return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into());
}*/
}
// Store object metadata for later analysis
@@ -2043,17 +2064,22 @@ impl Scanner {
// object metadata parse failed, submit metadata heal task
let enable_healing = self.config.read().await.enable_healing;
if enable_healing && let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("object metadata parse failed, submit heal task: {} {} / {}", task_id, bucket, entry.name);
}
Err(e) => {
error!(
"object metadata parse failed, submit heal task failed: {} / {} {}",
bucket, entry.name, e
);
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!(
"object metadata parse failed, submit heal task: {} {} / {}",
task_id, bucket, entry.name
);
}
Err(e) => {
error!(
"object metadata parse failed, submit heal task failed: {} / {} {}",
bucket, entry.name, e
);
}
}
}
}
@@ -2164,14 +2190,17 @@ impl Scanner {
// the delete marker, but we keep it conservative here.
let mut has_latest_delete_marker = false;
for &disk_idx in locations {
if let Some(bucket_map) = all_disk_objects.get(disk_idx)
&& let Some(file_map) = bucket_map.get(bucket)
&& let Some(fm) = file_map.get(object_name)
&& let Some(first_ver) = fm.versions.first()
&& first_ver.header.version_type == VersionType::Delete
{
has_latest_delete_marker = true;
break;
if let Some(bucket_map) = all_disk_objects.get(disk_idx) {
if let Some(file_map) = bucket_map.get(bucket) {
if let Some(fm) = file_map.get(object_name) {
if let Some(first_ver) = fm.versions.first() {
if first_ver.header.version_type == VersionType::Delete {
has_latest_delete_marker = true;
break;
}
}
}
}
}
}
if has_latest_delete_marker {
@@ -2219,26 +2248,28 @@ impl Scanner {
// submit heal task
let enable_healing = self.config.read().await.enable_healing;
if enable_healing && let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealPriority, HealRequest};
let req = HealRequest::new(
crate::heal::HealType::Object {
bucket: bucket.clone(),
object: object_name.clone(),
version_id: None,
},
crate::heal::HealOptions::default(),
HealPriority::High,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!(
"object missing, submit heal task: {} {} / {} (missing disks: {:?})",
task_id, bucket, object_name, missing_disks
);
}
Err(e) => {
error!("object missing, submit heal task failed: {} / {} {}", bucket, object_name, e);
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealPriority, HealRequest};
let req = HealRequest::new(
crate::heal::HealType::Object {
bucket: bucket.clone(),
object: object_name.clone(),
version_id: None,
},
crate::heal::HealOptions::default(),
HealPriority::High,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!(
"object missing, submit heal task: {} {} / {} (missing disks: {:?})",
task_id, bucket, object_name, missing_disks
);
}
Err(e) => {
error!("object missing, submit heal task failed: {} / {} {}", bucket, object_name, e);
}
}
}
}
@@ -2246,11 +2277,11 @@ impl Scanner {
// Step 3: Deep scan EC verification
let config = self.config.read().await;
if config.scan_mode == ScanMode::Deep
&& let Err(e) = self.verify_object_integrity(bucket, object_name).await
{
objects_with_ec_issues += 1;
warn!("Object integrity verification failed for object {}/{}: {}", bucket, object_name, e);
if config.scan_mode == ScanMode::Deep {
if let Err(e) = self.verify_object_integrity(bucket, object_name).await {
objects_with_ec_issues += 1;
warn!("Object integrity verification failed for object {}/{}: {}", bucket, object_name, e);
}
}
}
}
@@ -2262,10 +2293,10 @@ impl Scanner {
// Step 4: Collect data usage statistics if enabled
let config = self.config.read().await;
if config.enable_data_usage_stats
&& let Err(e) = self.collect_data_usage_statistics(all_disk_objects).await
{
error!("Failed to collect data usage statistics: {}", e);
if config.enable_data_usage_stats {
if let Err(e) = self.collect_data_usage_statistics(all_disk_objects).await {
error!("Failed to collect data usage statistics: {}", e);
}
}
drop(config);
@@ -2495,11 +2526,11 @@ impl Scanner {
info!("Starting legacy scan loop for backward compatibility");
loop {
if let Some(token) = get_ahm_services_cancel_token()
&& token.is_cancelled()
{
info!("Cancellation requested, exiting legacy scan loop");
break;
if let Some(token) = get_ahm_services_cancel_token() {
if token.is_cancelled() {
info!("Cancellation requested, exiting legacy scan loop");
break;
}
}
let (enable_data_usage_stats, scan_interval) = {
@@ -2507,8 +2538,10 @@ impl Scanner {
(config.enable_data_usage_stats, config.scan_interval)
};
if enable_data_usage_stats && let Err(e) = self.collect_and_persist_data_usage().await {
warn!("Background data usage collection failed: {}", e);
if enable_data_usage_stats {
if let Err(e) = self.collect_and_persist_data_usage().await {
warn!("Background data usage collection failed: {}", e);
}
}
// Update local stats in aggregator after latest scan
@@ -2623,10 +2656,10 @@ mod tests {
// create temp dir as 4 disks
let test_base_dir = test_dir.unwrap_or("/tmp/rustfs_ahm_test");
let temp_dir = std::path::PathBuf::from(test_base_dir);
if temp_dir.exists()
&& let Err(e) = fs::remove_dir_all(&temp_dir)
{
panic!("Failed to remove test directory: {e}");
if temp_dir.exists() {
if let Err(e) = fs::remove_dir_all(&temp_dir) {
panic!("Failed to remove test directory: {e}");
}
}
if let Err(e) = fs::create_dir_all(&temp_dir) {
panic!("Failed to create test directory: {e}");

View File

@@ -305,10 +305,10 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
has_live_object = true;
versions_count = versions_count.saturating_add(1);
if latest_file_info.is_none()
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false, false)
{
latest_file_info = Some(info);
if latest_file_info.is_none() {
if let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false) {
latest_file_info = Some(info);
}
}
}
}

View File

@@ -112,10 +112,10 @@ impl LocalStatsManager {
/// create new local stats manager
pub fn new(node_id: &str, data_dir: &Path) -> Self {
// ensure data directory exists
if !data_dir.exists()
&& let Err(e) = std::fs::create_dir_all(data_dir)
{
error!("create stats data directory failed {:?}: {}", data_dir, e);
if !data_dir.exists() {
if let Err(e) = std::fs::create_dir_all(data_dir) {
error!("create stats data directory failed {:?}: {}", data_dir, e);
}
}
let stats_file = data_dir.join(format!("scanner_stats_{node_id}.json"));

View File

@@ -436,10 +436,10 @@ impl NodeScanner {
/// create a new node scanner
pub fn new(node_id: String, config: NodeScannerConfig) -> Self {
// Ensure data directory exists
if !config.data_dir.exists()
&& let Err(e) = std::fs::create_dir_all(&config.data_dir)
{
error!("create data directory failed {:?}: {}", config.data_dir, e);
if !config.data_dir.exists() {
if let Err(e) = std::fs::create_dir_all(&config.data_dir) {
error!("create data directory failed {:?}: {}", config.data_dir, e);
}
}
let stats_manager = Arc::new(LocalStatsManager::new(&node_id, &config.data_dir));

View File

@@ -327,16 +327,16 @@ impl DecentralizedStatsAggregator {
);
// Check cache validity if timestamp is not initial value (UNIX_EPOCH)
if cache_timestamp != SystemTime::UNIX_EPOCH
&& let Ok(elapsed) = now.duration_since(cache_timestamp)
{
if elapsed < cache_ttl {
if let Some(cached) = self.cached_stats.read().await.as_ref() {
debug!("Returning cached aggregated stats, remaining TTL: {:?}", cache_ttl - elapsed);
return Ok(cached.clone());
if cache_timestamp != SystemTime::UNIX_EPOCH {
if let Ok(elapsed) = now.duration_since(cache_timestamp) {
if elapsed < cache_ttl {
if let Some(cached) = self.cached_stats.read().await.as_ref() {
debug!("Returning cached aggregated stats, remaining TTL: {:?}", cache_ttl - elapsed);
return Ok(cached.clone());
}
} else {
debug!("Cache expired: elapsed={:?} >= ttl={:?}", elapsed, cache_ttl);
}
} else {
debug!("Cache expired: elapsed={:?} >= ttl={:?}", elapsed, cache_ttl);
}
}

View File

@@ -21,11 +21,10 @@ use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetad
use rustfs_ecstore::endpoints::EndpointServerPools;
use rustfs_ecstore::store::ECStore;
use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI};
use std::sync::{Arc, Once};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::Level;
/// Build a minimal single-node ECStore over a temp directory and populate objects.
async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECStore>) {
@@ -75,22 +74,8 @@ async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECS
(temp_dir, store)
}
static INIT: Once = Once::new();
fn init_tracing(filter_level: Level) {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_max_level(filter_level)
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
.with_thread_names(true)
.try_init();
});
}
#[tokio::test]
async fn fallback_builds_full_counts_over_100_objects() {
init_tracing(Level::ERROR);
let (_tmp, store) = create_store_with_objects(1000).await;
let scanner = Scanner::new(None, None);

View File

@@ -38,13 +38,9 @@ use walkdir::WalkDir;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
static INIT: Once = Once::new();
pub fn init_tracing() {
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
.with_thread_names(true)
.try_init();
let _ = tracing_subscriber::fmt::try_init();
});
}
@@ -360,7 +356,7 @@ mod serial_tests {
// Create heal manager with faster interval
let cfg = HealConfig {
heal_interval: Duration::from_secs(1),
heal_interval: Duration::from_secs(2),
..Default::default()
};
let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg));

View File

@@ -421,86 +421,86 @@ mod serial_tests {
}
};
if let Some(lmdb_env) = GLOBAL_LMDB_ENV.get()
&& let Some(lmdb) = GLOBAL_LMDB_DB.get()
{
let mut wtxn = lmdb_env.write_txn().unwrap();
if let Some(lmdb_env) = GLOBAL_LMDB_ENV.get() {
if let Some(lmdb) = GLOBAL_LMDB_DB.get() {
let mut wtxn = lmdb_env.write_txn().unwrap();
/*if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
if let Ok(object_info) = ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config,
None,
None,
&object_info,
/*if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
if let Ok(object_info) = ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config,
None,
None,
&object_info,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&object_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
}
}*/
for record in records {
if !record.usage.has_live_object {
continue;
}
let object_info = convert_record_to_object_info(record);
println!("object_info2: {object_info:?}");
let mod_time = object_info.mod_time.unwrap_or(OffsetDateTime::now_utc());
let expiry_time = rustfs_ecstore::bucket::lifecycle::lifecycle::expected_expiry_time(mod_time, 1);
let version_id = if let Some(version_id) = object_info.version_id {
version_id.to_string()
} else {
"zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz".to_string()
};
lmdb.put(
&mut wtxn,
&expiry_time.unix_timestamp(),
&LifecycleContent {
ver_no: 0,
ver_id: version_id,
mod_time,
type_: LifecycleType::TransitionNoncurrent,
object_name: object_info.name,
},
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&object_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
}
}*/
for record in records {
if !record.usage.has_live_object {
continue;
.unwrap();
}
let object_info = convert_record_to_object_info(record);
println!("object_info2: {object_info:?}");
let mod_time = object_info.mod_time.unwrap_or(OffsetDateTime::now_utc());
let expiry_time = rustfs_ecstore::bucket::lifecycle::lifecycle::expected_expiry_time(mod_time, 1);
wtxn.commit().unwrap();
let version_id = if let Some(version_id) = object_info.version_id {
version_id.to_string()
} else {
"zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz".to_string()
};
lmdb.put(
&mut wtxn,
&expiry_time.unix_timestamp(),
&LifecycleContent {
ver_no: 0,
ver_id: version_id,
mod_time,
type_: LifecycleType::TransitionNoncurrent,
object_name: object_info.name,
},
)
.unwrap();
let mut wtxn = lmdb_env.write_txn().unwrap();
let iter = lmdb.iter_mut(&mut wtxn).unwrap();
//let _ = unsafe { iter.del_current().unwrap() };
for row in iter {
if let Ok(ref elm) = row {
let LifecycleContent {
ver_no,
ver_id,
mod_time,
type_,
object_name,
} = &elm.1;
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
}
println!("row:{row:?}");
}
//drop(iter);
wtxn.commit().unwrap();
}
wtxn.commit().unwrap();
let mut wtxn = lmdb_env.write_txn().unwrap();
let iter = lmdb.iter_mut(&mut wtxn).unwrap();
//let _ = unsafe { iter.del_current().unwrap() };
for row in iter {
if let Ok(ref elm) = row {
let LifecycleContent {
ver_no,
ver_id,
mod_time,
type_,
object_name,
} = &elm.1;
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
}
println!("row:{row:?}");
}
//drop(iter);
wtxn.commit().unwrap();
}
println!("Lifecycle cache test completed");

View File

@@ -415,28 +415,29 @@ mod serial_tests {
.await;
println!("Pending expiry tasks: {pending}");
if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await
&& let Ok(object_info) = ecstore
if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
if let Ok(object_info) = ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config,
None,
None,
&object_info,
)
.await;
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config,
None,
None,
&object_info,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&object_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&object_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
}
}
if !expired {
@@ -549,31 +550,32 @@ mod serial_tests {
.await;
println!("Pending expiry tasks: {pending}");
if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await
&& let Ok(obj_info) = ecstore
if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
if let Ok(obj_info) = ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config, None, None, &obj_info,
)
.await;
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config, None, None, &obj_info,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&obj_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&obj_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
deleted = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
deleted = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
if !deleted {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
if !deleted {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
}
}
}

View File

@@ -60,9 +60,8 @@ impl TargetFactory for WebhookTargetFactory {
let endpoint = config
.lookup(WEBHOOK_ENDPOINT)
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
let parsed_endpoint = endpoint.trim();
let endpoint_url = Url::parse(parsed_endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{parsed_endpoint}')")))?;
let endpoint_url = Url::parse(&endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{endpoint}')")))?;
let args = WebhookArgs {
enable: true, // If we are here, it's already enabled.
@@ -204,10 +203,10 @@ impl TargetFactory for MQTTTargetFactory {
if !std::path::Path::new(&queue_dir).is_absolute() {
return Err(TargetError::Configuration("MQTT queue directory must be an absolute path".to_string()));
}
if let Some(qos_str) = config.lookup(MQTT_QOS)
&& qos_str == "0"
{
warn!("Using queue_dir with QoS 0 may result in event loss");
if let Some(qos_str) = config.lookup(MQTT_QOS) {
if qos_str == "0" {
warn!("Using queue_dir with QoS 0 may result in event loss");
}
}
}

View File

@@ -21,7 +21,6 @@ use futures::stream::FuturesUnordered;
use hashbrown::{HashMap, HashSet};
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, audit::AUDIT_ROUTE_PREFIX};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::arn::TargetID;
use rustfs_targets::{Target, TargetError, target::ChannelTargetType};
use std::str::FromStr;
use std::sync::Arc;
@@ -139,11 +138,12 @@ impl AuditRegistry {
format!("{ENV_PREFIX}{AUDIT_ROUTE_PREFIX}{target_type}{DEFAULT_DELIMITER}{ENABLE_KEY}{DEFAULT_DELIMITER}")
.to_uppercase();
for (key, value) in &all_env {
if EnableState::from_str(value).ok().map(|s| s.is_enabled()).unwrap_or(false)
&& let Some(id) = key.strip_prefix(&enable_prefix)
&& !id.is_empty()
{
instance_ids_from_env.insert(id.to_lowercase());
if EnableState::from_str(value).ok().map(|s| s.is_enabled()).unwrap_or(false) {
if let Some(id) = key.strip_prefix(&enable_prefix) {
if !id.is_empty() {
instance_ids_from_env.insert(id.to_lowercase());
}
}
}
}
@@ -292,10 +292,10 @@ impl AuditRegistry {
for section in sections {
let mut section_map: std::collections::HashMap<String, KVS> = std::collections::HashMap::new();
// Add default item
if let Some(default_kvs) = section_defaults.get(&section)
&& !default_kvs.is_empty()
{
section_map.insert(DEFAULT_DELIMITER.to_string(), default_kvs.clone());
if let Some(default_kvs) = section_defaults.get(&section) {
if !default_kvs.is_empty() {
section_map.insert(DEFAULT_DELIMITER.to_string(), default_kvs.clone());
}
}
// Add successful instance item
@@ -393,80 +393,4 @@ impl AuditRegistry {
Ok(())
}
/// Creates a unique key for a target based on its type and ID
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `String` - The unique key for the target.
pub fn create_key(&self, target_type: &str, target_id: &str) -> String {
let key = TargetID::new(target_id.to_string(), target_type.to_string());
info!(target_type = %target_type, "Create key for {}", key);
key.to_string()
}
/// Enables a target (placeholder, assumes target exists)
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn enable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
if self.get_target(&key).is_some() {
info!("Target {}-{} enabled", target_type, target_id);
Ok(())
} else {
Err(AuditError::Configuration(
format!("Target not found: {}-{}", target_type, target_id),
None,
))
}
}
/// Disables a target (placeholder, assumes target exists)
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn disable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
if self.get_target(&key).is_some() {
info!("Target {}-{} disabled", target_type, target_id);
Ok(())
} else {
Err(AuditError::Configuration(
format!("Target not found: {}-{}", target_type, target_id),
None,
))
}
}
/// Upserts a target into the registry
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
/// * `target` - The target instance to be upserted.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn upsert_target(
&mut self,
target_type: &str,
target_id: &str,
target: Box<dyn Target<AuditEntry> + Send + Sync>,
) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
self.targets.insert(key, target);
Ok(())
}
}

View File

@@ -274,9 +274,9 @@ impl AuditSystem {
drop(state);
let registry = self.registry.lock().await;
let target_keys = registry.list_targets();
let target_ids = registry.list_targets();
if target_keys.is_empty() {
if target_ids.is_empty() {
warn!("No audit targets configured for dispatch");
return Ok(());
}
@@ -284,22 +284,22 @@ impl AuditSystem {
// Dispatch to all targets concurrently
let mut tasks = Vec::new();
for target_key in target_keys {
if let Some(target) = registry.get_target(&target_key) {
for target_id in target_ids {
if let Some(target) = registry.get_target(&target_id) {
let entry_clone = Arc::clone(&entry);
let target_key_clone = target_key.clone();
let target_id_clone = target_id.clone();
// Create EntityTarget for the audit log entry
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: entry.event, // Default, should be derived from entry
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
data: (*entry_clone).clone(),
};
let task = async move {
let result = target.save(Arc::new(entity_target)).await;
(target_key_clone, result)
(target_id_clone, result)
};
tasks.push(task);
@@ -312,14 +312,14 @@ impl AuditSystem {
let mut errors = Vec::new();
let mut success_count = 0;
for (target_key, result) in results {
for (target_id, result) in results {
match result {
Ok(_) => {
success_count += 1;
observability::record_target_success();
}
Err(e) => {
error!(target_id = %target_key, error = %e, "Failed to dispatch audit log to target");
error!(target_id = %target_id, error = %e, "Failed to dispatch audit log to target");
errors.push(e);
observability::record_target_failure();
}
@@ -360,18 +360,18 @@ impl AuditSystem {
drop(state);
let registry = self.registry.lock().await;
let target_keys = registry.list_targets();
let target_ids = registry.list_targets();
if target_keys.is_empty() {
if target_ids.is_empty() {
warn!("No audit targets configured for batch dispatch");
return Ok(());
}
let mut tasks = Vec::new();
for target_key in target_keys {
if let Some(target) = registry.get_target(&target_key) {
for target_id in target_ids {
if let Some(target) = registry.get_target(&target_id) {
let entries_clone: Vec<_> = entries.iter().map(Arc::clone).collect();
let target_key_clone = target_key.clone();
let target_id_clone = target_id.clone();
let task = async move {
let mut success_count = 0;
@@ -380,7 +380,7 @@ impl AuditSystem {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: entry.event,
event_name: rustfs_targets::EventName::ObjectCreatedPut,
data: (*entry).clone(),
};
match target.save(Arc::new(entity_target)).await {
@@ -388,7 +388,7 @@ impl AuditSystem {
Err(e) => errors.push(e),
}
}
(target_key_clone, success_count, errors)
(target_id_clone, success_count, errors)
};
tasks.push(task);
}
@@ -418,7 +418,6 @@ impl AuditSystem {
}
/// Starts the audit stream processing for a target with batching and retry logic
///
/// # Arguments
/// * `store` - The store from which to read audit entries
/// * `target` - The target to which audit entries will be sent
@@ -502,7 +501,7 @@ impl AuditSystem {
/// Enables a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to enable, TargetID to string
/// * `target_id` - The ID of the target to enable
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -521,7 +520,7 @@ impl AuditSystem {
/// Disables a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to disable, TargetID to string
/// * `target_id` - The ID of the target to disable
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -540,7 +539,7 @@ impl AuditSystem {
/// Removes a target from the system
///
/// # Arguments
/// * `target_id` - The ID of the target to remove, TargetID to string
/// * `target_id` - The ID of the target to remove
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -560,7 +559,7 @@ impl AuditSystem {
/// Updates or inserts a target
///
/// # Arguments
/// * `target_id` - The ID of the target to upsert, TargetID to string
/// * `target_id` - The ID of the target to upsert
/// * `target` - The target instance to insert or update
///
/// # Returns
@@ -574,10 +573,10 @@ impl AuditSystem {
}
// Remove existing target if present
if let Some(old_target) = registry.remove_target(&target_id)
&& let Err(e) = old_target.close().await
{
error!(target_id = %target_id, error = %e, "Failed to close old target during upsert");
if let Some(old_target) = registry.remove_target(&target_id) {
if let Err(e) = old_target.close().await {
error!(target_id = %target_id, error = %e, "Failed to close old target during upsert");
}
}
registry.add_target(target_id.clone(), target);
@@ -597,7 +596,7 @@ impl AuditSystem {
/// Gets information about a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to retrieve, TargetID to string
/// * `target_id` - The ID of the target to retrieve
///
/// # Returns
/// * `Option<String>` - Target ID if found

View File

@@ -39,4 +39,4 @@ path-clean = { workspace = true }
rmp-serde = { workspace = true }
async-trait = { workspace = true }
s3s = { workspace = true }
tracing = { workspace = true }
tracing = { workspace = true }

View File

@@ -605,12 +605,13 @@ impl DataUsageCache {
pub fn search_parent(&self, hash: &DataUsageHash) -> Option<DataUsageHash> {
let want = hash.key();
if let Some(last_index) = want.rfind('/')
&& let Some(v) = self.find(&want[0..last_index])
&& v.children.contains(&want)
{
let found = hash_path(&want[0..last_index]);
return Some(found);
if let Some(last_index) = want.rfind('/') {
if let Some(v) = self.find(&want[0..last_index]) {
if v.children.contains(&want) {
let found = hash_path(&want[0..last_index]);
return Some(found);
}
}
}
for (k, v) in self.cache.iter() {
@@ -1149,10 +1150,10 @@ impl DataUsageInfo {
self.buckets_count = self.buckets_usage.len() as u64;
// Update last update time
if let Some(other_update) = other.last_update
&& (self.last_update.is_none() || other_update > self.last_update.unwrap())
{
self.last_update = Some(other_update);
if let Some(other_update) = other.last_update {
if self.last_update.is_none() || other_update > self.last_update.unwrap() {
self.last_update = Some(other_update);
}
}
}
}

View File

@@ -14,7 +14,6 @@
#![allow(non_upper_case_globals)] // FIXME
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::LazyLock;
use tokio::sync::RwLock;
@@ -26,66 +25,18 @@ pub static GLOBAL_RUSTFS_PORT: LazyLock<RwLock<String>> = LazyLock::new(|| RwLoc
pub static GLOBAL_RUSTFS_ADDR: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new("".to_string()));
pub static GLOBAL_CONN_MAP: LazyLock<RwLock<HashMap<String, Channel>>> = LazyLock::new(|| RwLock::new(HashMap::new()));
pub static GLOBAL_ROOT_CERT: LazyLock<RwLock<Option<Vec<u8>>>> = LazyLock::new(|| RwLock::new(None));
pub static GLOBAL_MTLS_IDENTITY: LazyLock<RwLock<Option<MtlsIdentityPem>>> = LazyLock::new(|| RwLock::new(None));
/// Global initialization time of the RustFS node.
pub static GLOBAL_INIT_TIME: LazyLock<RwLock<Option<DateTime<Utc>>>> = LazyLock::new(|| RwLock::new(None));
/// Set the global local node name.
///
/// # Arguments
/// * `name` - A string slice representing the local node name.
pub async fn set_global_local_node_name(name: &str) {
*GLOBAL_LOCAL_NODE_NAME.write().await = name.to_string();
}
/// Set the global RustFS initialization time to the current UTC time.
pub async fn set_global_init_time_now() {
let now = Utc::now();
*GLOBAL_INIT_TIME.write().await = Some(now);
}
/// Get the global RustFS initialization time.
///
/// # Returns
/// * `Option<DateTime<Utc>>` - The initialization time if set.
pub async fn get_global_init_time() -> Option<DateTime<Utc>> {
*GLOBAL_INIT_TIME.read().await
}
/// Set the global RustFS address used for gRPC connections.
///
/// # Arguments
/// * `addr` - A string slice representing the RustFS address (e.g., "https://node1:9000").
pub async fn set_global_addr(addr: &str) {
*GLOBAL_RUSTFS_ADDR.write().await = addr.to_string();
}
/// Set the global root CA certificate for outbound gRPC clients.
/// This certificate is used to validate server TLS certificates.
/// When set to None, clients use the system default root CAs.
///
/// # Arguments
/// * `cert` - A vector of bytes representing the PEM-encoded root CA certificate.
pub async fn set_global_root_cert(cert: Vec<u8>) {
*GLOBAL_ROOT_CERT.write().await = Some(cert);
}
/// Set the global mTLS identity (cert+key PEM) for outbound gRPC clients.
/// When set, clients will present this identity to servers requesting/requiring mTLS.
/// When None, clients proceed with standard server-authenticated TLS.
///
/// # Arguments
/// * `identity` - An optional MtlsIdentityPem struct containing the cert and key PEM.
pub async fn set_global_mtls_identity(identity: Option<MtlsIdentityPem>) {
*GLOBAL_MTLS_IDENTITY.write().await = identity;
}
/// Evict a stale/dead connection from the global connection cache.
/// This is critical for cluster recovery when a node dies unexpectedly (e.g., power-off).
/// By removing the cached connection, subsequent requests will establish a fresh connection.
///
/// # Arguments
/// * `addr` - The address of the connection to evict.
pub async fn evict_connection(addr: &str) {
let removed = GLOBAL_CONN_MAP.write().await.remove(addr);
if removed.is_some() {
@@ -94,12 +45,6 @@ pub async fn evict_connection(addr: &str) {
}
/// Check if a connection exists in the cache for the given address.
///
/// # Arguments
/// * `addr` - The address to check.
///
/// # Returns
/// * `bool` - True if a cached connection exists, false otherwise.
pub async fn has_cached_connection(addr: &str) -> bool {
GLOBAL_CONN_MAP.read().await.contains_key(addr)
}
@@ -113,12 +58,3 @@ pub async fn clear_all_connections() {
tracing::warn!("Cleared {} cached connections from global map", count);
}
}
/// Optional client identity (cert+key PEM) for outbound mTLS.
///
/// When present, gRPC clients will present this identity to servers requesting/requiring mTLS.
/// When absent, clients proceed with standard server-authenticated TLS.
#[derive(Clone, Debug)]
pub struct MtlsIdentityPem {
pub cert_pem: Vec<u8>,
pub key_pem: Vec<u8>,
}

View File

@@ -403,10 +403,10 @@ fn lc_get_prefix(rule: &LifecycleRule) -> String {
} else if let Some(filter) = &rule.filter {
if let Some(p) = &filter.prefix {
return p.to_string();
} else if let Some(and) = &filter.and
&& let Some(p) = &and.prefix
{
return p.to_string();
} else if let Some(and) = &filter.and {
if let Some(p) = &and.prefix {
return p.to_string();
}
}
}
@@ -475,19 +475,21 @@ pub fn rep_has_active_rules(config: &ReplicationConfiguration, prefix: &str, rec
{
continue;
}
if !prefix.is_empty()
&& let Some(filter) = &rule.filter
&& let Some(r_prefix) = &filter.prefix
&& !r_prefix.is_empty()
{
// incoming prefix must be in rule prefix
if !recursive && !prefix.starts_with(r_prefix) {
continue;
}
// If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix
// does not match
if recursive && !r_prefix.starts_with(prefix) && !prefix.starts_with(r_prefix) {
continue;
if !prefix.is_empty() {
if let Some(filter) = &rule.filter {
if let Some(r_prefix) = &filter.prefix {
if !r_prefix.is_empty() {
// incoming prefix must be in rule prefix
if !recursive && !prefix.starts_with(r_prefix) {
continue;
}
// If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix
// does not match
if recursive && !r_prefix.starts_with(prefix) && !prefix.starts_with(r_prefix) {
continue;
}
}
}
}
}
return true;

View File

@@ -19,10 +19,6 @@ pub mod globals;
pub mod heal_channel;
pub mod last_minute;
pub mod metrics;
mod readiness;
pub use globals::*;
pub use readiness::{GlobalReadiness, SystemStage};
// is ','
pub static DEFAULT_DELIMITER: u8 = 44;

View File

@@ -18,7 +18,6 @@ use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use std::{
collections::HashMap,
fmt::Display,
future::Future,
pin::Pin,
sync::{
Arc, OnceLock,
@@ -116,7 +115,7 @@ pub enum Metric {
impl Metric {
/// Convert to string representation for metrics
pub fn as_str(&self) -> &'static str {
pub fn as_str(self) -> &'static str {
match self {
Self::ReadMetadata => "read_metadata",
Self::CheckMissing => "check_missing",
@@ -461,32 +460,27 @@ impl Metrics {
metrics.current_started = cycle.started;
}
// Replace default start time with global init time if it's the placeholder
if let Some(init_time) = crate::get_global_init_time().await {
metrics.current_started = init_time;
}
metrics.collected_at = Utc::now();
metrics.active_paths = self.get_current_paths().await;
// Lifetime operations
for i in 0..Metric::Last as usize {
let count = self.operations[i].load(Ordering::Relaxed);
if count > 0
&& let Some(metric) = Metric::from_index(i)
{
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
if count > 0 {
if let Some(metric) = Metric::from_index(i) {
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
}
}
}
// Last minute statistics for realtime metrics
for i in 0..Metric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
if last_min.n > 0
&& let Some(_metric) = Metric::from_index(i)
{
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
if last_min.n > 0 {
if let Some(_metric) = Metric::from_index(i) {
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
}
}
}
@@ -495,8 +489,8 @@ impl Metrics {
}
// Type aliases for compatibility with existing code
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
/// Create a current path updater for tracking scan progress
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
@@ -512,7 +506,7 @@ pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn,
let update_fn = {
let tracker = Arc::clone(&tracker);
Arc::new(move |path: &str| -> Pin<Box<dyn Future<Output = ()> + Send>> {
Arc::new(move |path: &str| -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let tracker = Arc::clone(&tracker);
let path = path.to_string();
Box::pin(async move {
@@ -523,7 +517,7 @@ pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn,
let done_fn = {
let disk_name = disk_name.clone();
Arc::new(move || -> Pin<Box<dyn Future<Output = ()> + Send>> {
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
global_metrics().current_paths.write().await.remove(&disk_name);

View File

@@ -1,136 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU8, Ordering};
/// Represents the various stages of system startup
#[repr(u8)]
pub enum SystemStage {
Booting = 0,
StorageReady = 1, // Disks online, Quorum met
IamReady = 2, // Users and Policies loaded into cache
FullReady = 3, // System ready to serve all traffic
}
/// Global readiness tracker for the service
/// This struct uses atomic operations to track the readiness status of various components
/// of the service in a thread-safe manner.
pub struct GlobalReadiness {
status: AtomicU8,
}
impl Default for GlobalReadiness {
fn default() -> Self {
Self::new()
}
}
impl GlobalReadiness {
/// Create a new GlobalReadiness instance with initial status as Starting
/// # Returns
/// A new instance of GlobalReadiness
pub fn new() -> Self {
Self {
status: AtomicU8::new(SystemStage::Booting as u8),
}
}
/// Update the system to a new stage
///
/// # Arguments
/// * `step` - The SystemStage step to mark as ready
pub fn mark_stage(&self, step: SystemStage) {
self.status.fetch_max(step as u8, Ordering::SeqCst);
}
/// Check if the service is fully ready
/// # Returns
/// `true` if the service is fully ready, `false` otherwise
pub fn is_ready(&self) -> bool {
self.status.load(Ordering::SeqCst) == SystemStage::FullReady as u8
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_initial_state() {
let readiness = GlobalReadiness::new();
assert!(!readiness.is_ready());
assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::Booting as u8);
}
#[test]
fn test_mark_stage_progression() {
let readiness = GlobalReadiness::new();
readiness.mark_stage(SystemStage::StorageReady);
assert!(!readiness.is_ready());
assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::StorageReady as u8);
readiness.mark_stage(SystemStage::IamReady);
assert!(!readiness.is_ready());
assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::IamReady as u8);
readiness.mark_stage(SystemStage::FullReady);
assert!(readiness.is_ready());
}
#[test]
fn test_no_regression() {
let readiness = GlobalReadiness::new();
readiness.mark_stage(SystemStage::FullReady);
readiness.mark_stage(SystemStage::IamReady); // Should not regress
assert!(readiness.is_ready());
}
#[test]
fn test_concurrent_marking() {
let readiness = Arc::new(GlobalReadiness::new());
let mut handles = vec![];
for _ in 0..10 {
let r = Arc::clone(&readiness);
handles.push(thread::spawn(move || {
r.mark_stage(SystemStage::StorageReady);
r.mark_stage(SystemStage::IamReady);
r.mark_stage(SystemStage::FullReady);
}));
}
for h in handles {
h.join().unwrap();
}
assert!(readiness.is_ready());
}
#[test]
fn test_is_ready_only_at_full_ready() {
let readiness = GlobalReadiness::new();
assert!(!readiness.is_ready());
readiness.mark_stage(SystemStage::StorageReady);
assert!(!readiness.is_ready());
readiness.mark_stage(SystemStage::IamReady);
assert!(!readiness.is_ready());
readiness.mark_stage(SystemStage::FullReady);
assert!(readiness.is_ready());
}
}

View File

@@ -49,6 +49,21 @@ pub const SERVICE_VERSION: &str = "1.0.0";
/// Default value: production
pub const ENVIRONMENT: &str = "production";
/// Default Access Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_ACCESS_KEY
/// Command line argument: --access-key
/// Example: RUSTFS_ACCESS_KEY=rustfsadmin
/// Example: --access-key rustfsadmin
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Default Secret Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_SECRET_KEY
/// Command line argument: --secret-key
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Default console enable
/// This is the default value for the console server.
/// It is used to enable or disable the console server.
@@ -210,6 +225,20 @@ mod tests {
);
}
#[test]
fn test_security_constants() {
// Test security related constants
assert_eq!(DEFAULT_ACCESS_KEY, "rustfsadmin");
assert!(DEFAULT_ACCESS_KEY.len() >= 8, "Access key should be at least 8 characters");
assert_eq!(DEFAULT_SECRET_KEY, "rustfsadmin");
assert!(DEFAULT_SECRET_KEY.len() >= 8, "Secret key should be at least 8 characters");
// In production environment, access key and secret key should be different
// These are default values, so being the same is acceptable, but should be warned in documentation
println!("Warning: Default access key and secret key are the same. Change them in production!");
}
#[test]
fn test_file_path_constants() {
assert_eq!(RUSTFS_TLS_KEY, "rustfs_key.pem");
@@ -271,6 +300,8 @@ mod tests {
DEFAULT_LOG_LEVEL,
SERVICE_VERSION,
ENVIRONMENT,
DEFAULT_ACCESS_KEY,
DEFAULT_SECRET_KEY,
RUSTFS_TLS_KEY,
RUSTFS_TLS_CERT,
DEFAULT_ADDRESS,
@@ -300,6 +331,29 @@ mod tests {
assert_ne!(DEFAULT_CONSOLE_PORT, 0, "Console port should not be zero");
}
#[test]
fn test_security_best_practices() {
// Test security best practices
// These are default values, should be changed in production environments
println!("Security Warning: Default credentials detected!");
println!("Access Key: {DEFAULT_ACCESS_KEY}");
println!("Secret Key: {DEFAULT_SECRET_KEY}");
println!("These should be changed in production environments!");
// Verify that key lengths meet minimum security requirements
assert!(DEFAULT_ACCESS_KEY.len() >= 8, "Access key should be at least 8 characters");
assert!(DEFAULT_SECRET_KEY.len() >= 8, "Secret key should be at least 8 characters");
// Check if default credentials contain common insecure patterns
let _insecure_patterns = ["admin", "password", "123456", "default"];
let _access_key_lower = DEFAULT_ACCESS_KEY.to_lowercase();
let _secret_key_lower = DEFAULT_SECRET_KEY.to_lowercase();
// Note: More security check logic can be added here
// For example, check if keys contain insecure patterns
}
#[test]
fn test_configuration_consistency() {
// Test configuration consistency

View File

@@ -20,7 +20,6 @@ pub(crate) mod env;
pub(crate) mod heal;
pub(crate) mod object;
pub(crate) mod profiler;
pub(crate) mod protocols;
pub(crate) mod runtime;
pub(crate) mod targets;
pub(crate) mod tls;

View File

@@ -1,40 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol server configuration constants
/// Default FTPS server bind address
pub const DEFAULT_FTPS_ADDRESS: &str = "0.0.0.0:8021";
/// Default SFTP server bind address
pub const DEFAULT_SFTP_ADDRESS: &str = "0.0.0.0:8022";
/// Default FTPS passive ports range (optional)
pub const DEFAULT_FTPS_PASSIVE_PORTS: Option<&str> = None;
/// Default FTPS external IP (auto-detected)
pub const DEFAULT_FTPS_EXTERNAL_IP: Option<&str> = None;
/// Environment variable names
pub const ENV_FTPS_ENABLE: &str = "RUSTFS_FTPS_ENABLE";
pub const ENV_FTPS_ADDRESS: &str = "RUSTFS_FTPS_ADDRESS";
pub const ENV_FTPS_CERTS_FILE: &str = "RUSTFS_FTPS_CERTS_FILE";
pub const ENV_FTPS_KEY_FILE: &str = "RUSTFS_FTPS_KEY_FILE";
pub const ENV_FTPS_PASSIVE_PORTS: &str = "RUSTFS_FTPS_PASSIVE_PORTS";
pub const ENV_FTPS_EXTERNAL_IP: &str = "RUSTFS_FTPS_EXTERNAL_IP";
pub const ENV_SFTP_ENABLE: &str = "RUSTFS_SFTP_ENABLE";
pub const ENV_SFTP_ADDRESS: &str = "RUSTFS_SFTP_ADDRESS";
pub const ENV_SFTP_HOST_KEY: &str = "RUSTFS_SFTP_HOST_KEY";
pub const ENV_SFTP_AUTHORIZED_KEYS: &str = "RUSTFS_SFTP_AUTHORIZED_KEYS";

View File

@@ -39,10 +39,3 @@ pub const DEFAULT_MAX_IO_EVENTS_PER_TICK: usize = 1024;
/// Event polling default (Tokio default 61)
pub const DEFAULT_EVENT_INTERVAL: u32 = 61;
pub const DEFAULT_RNG_SEED: Option<u64> = None; // None means random
/// Threshold for small object seek support in megabytes.
///
/// When an object is smaller than this size, rustfs will provide seek support.
///
/// Default is set to 10MB.
pub const DEFAULT_OBJECT_SEEK_SUPPORT_THRESHOLD: usize = 10 * 1024 * 1024;

View File

@@ -35,52 +35,3 @@ pub const ENV_TRUST_SYSTEM_CA: &str = "RUSTFS_TRUST_SYSTEM_CA";
/// By default, RustFS does not trust system CA certificates.
/// To change this behavior, set the environment variable RUSTFS_TRUST_SYSTEM_CA=1
pub const DEFAULT_TRUST_SYSTEM_CA: bool = false;
/// Environment variable to trust leaf certificates as CA
/// When set to "1", RustFS will treat leaf certificates as CA certificates for trust validation.
/// By default, this is disabled.
/// To enable, set the environment variable RUSTFS_TRUST_LEAF_CERT_AS_CA=1
pub const ENV_TRUST_LEAF_CERT_AS_CA: &str = "RUSTFS_TRUST_LEAF_CERT_AS_CA";
/// Default value for trusting leaf certificates as CA
/// By default, RustFS does not trust leaf certificates as CA.
/// To change this behavior, set the environment variable RUSTFS_TRUST_LEAF_CERT_AS_CA=1
pub const DEFAULT_TRUST_LEAF_CERT_AS_CA: bool = false;
/// Default filename for client CA certificate
/// client_ca.crt (CA bundle for verifying client certificates in server mTLS)
pub const RUSTFS_CLIENT_CA_CERT_FILENAME: &str = "client_ca.crt";
/// Environment variable for client certificate file path
/// RUSTFS_MTLS_CLIENT_CERT
/// Specifies the file path to the client certificate used for mTLS authentication.
/// If not set, RustFS will look for the default filename "client_cert.pem" in the current directory.
/// To set, use the environment variable RUSTFS_MTLS_CLIENT_CERT=/path/to/client_cert.pem
pub const ENV_MTLS_CLIENT_CERT: &str = "RUSTFS_MTLS_CLIENT_CERT";
/// Default filename for client certificate
/// client_cert.pem
pub const RUSTFS_CLIENT_CERT_FILENAME: &str = "client_cert.pem";
/// Environment variable for client private key file path
/// RUSTFS_MTLS_CLIENT_KEY
/// Specifies the file path to the client private key used for mTLS authentication.
/// If not set, RustFS will look for the default filename "client_key.pem" in the current directory.
/// To set, use the environment variable RUSTFS_MTLS_CLIENT_KEY=/path/to/client_key.pem
pub const ENV_MTLS_CLIENT_KEY: &str = "RUSTFS_MTLS_CLIENT_KEY";
/// Default filename for client private key
/// client_key.pem
pub const RUSTFS_CLIENT_KEY_FILENAME: &str = "client_key.pem";
/// RUSTFS_SERVER_MTLS_ENABLE
/// Environment variable to enable server mTLS
/// When set to "1", RustFS server will require client certificates for authentication.
/// By default, this is disabled.
/// To enable, set the environment variable RUSTFS_SERVER_MTLS_ENABLE=1
pub const ENV_SERVER_MTLS_ENABLE: &str = "RUSTFS_SERVER_MTLS_ENABLE";
/// Default value for enabling server mTLS
/// By default, RustFS server mTLS is disabled.
/// To change this behavior, set the environment variable RUSTFS_SERVER_MTLS_ENABLE=1
pub const DEFAULT_SERVER_MTLS_ENABLE: bool = false;

View File

@@ -31,8 +31,6 @@ pub use constants::object::*;
#[cfg(feature = "constants")]
pub use constants::profiler::*;
#[cfg(feature = "constants")]
pub use constants::protocols::*;
#[cfg(feature = "constants")]
pub use constants::runtime::*;
#[cfg(feature = "constants")]
pub use constants::targets::*;

View File

@@ -51,18 +51,6 @@ pub const ENV_NOTIFY_TARGET_STREAM_CONCURRENCY: &str = "RUSTFS_NOTIFY_TARGET_STR
/// Adjust this value based on your system's capabilities and expected load.
pub const DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY: usize = 20;
/// Name of the environment variable that configures send concurrency.
/// Controls how many send operations are processed in parallel by the notification system.
/// Defaults to [`DEFAULT_NOTIFY_SEND_CONCURRENCY`] if not set.
/// Example: `RUSTFS_NOTIFY_SEND_CONCURRENCY=64`.
pub const ENV_NOTIFY_SEND_CONCURRENCY: &str = "RUSTFS_NOTIFY_SEND_CONCURRENCY";
/// Default concurrency for send operations in the notification system
/// This value is used if the environment variable `RUSTFS_NOTIFY_SEND_CONCURRENCY` is not set.
/// It defines how many send operations can be processed in parallel by the notification system at any given time.
/// Adjust this value based on your system's capabilities and expected load.
pub const DEFAULT_NOTIFY_SEND_CONCURRENCY: usize = 64;
#[allow(dead_code)]
pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS];

View File

@@ -1,21 +0,0 @@
[package]
name = "rustfs-credentials"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
homepage.workspace = true
description = "Credentials management utilities for RustFS, enabling secure handling of authentication and authorization data."
keywords = ["rustfs", "Minio", "credentials", "authentication", "authorization"]
categories = ["web-programming", "development-tools", "data-structures", "security"]
[dependencies]
base64-simd = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json.workspace = true
time = { workspace = true, features = ["serde-human-readable"] }
[lints]
workspace = true

View File

@@ -1,44 +0,0 @@
[![RustFS](https://rustfs.com/images/rustfs-github.png)](https://rustfs.com)
# RustFS Credentials - Credential Management Module
<p align="center">
<strong>A module for managing credentials within the RustFS distributed object storage system.</strong>
</p>
<p align="center">
<a href="https://github.com/rustfs/rustfs/actions/workflows/ci.yml"><img alt="CI" src="https://github.com/rustfs/rustfs/actions/workflows/ci.yml/badge.svg" /></a>
<a href="https://docs.rustfs.com/">📖 Documentation</a>
· <a href="https://github.com/rustfs/rustfs/issues">🐛 Bug Reports</a>
· <a href="https://github.com/rustfs/rustfs/discussions">💬 Discussions</a>
</p>
---
This module provides a secure and efficient way to handle various types of credentials,
such as API keys, access tokens, and cryptographic keys, required for interacting with
the RustFS ecosystem and external services.
## 📖 Overview
**RustFS Credentials** is a module dedicated to managing credentials for the [RustFS](https://rustfs.com) distributed
object storage system. For the complete RustFS experience,
please visit the [main RustFS repository](https://github.com/rustfs/rustfs)
## ✨ Features
- Secure storage and retrieval of credentials
- Support for multiple credential types (API keys, tokens, etc.)
- Encryption of sensitive credential data
- Integration with external secret management systems
- Easy-to-use API for credential management
- Credential rotation and expiration handling
## 📚 Documentation
For comprehensive documentation, examples, and usage guides, please visit the
main [RustFS repository](https://github.com/rustfs/rustfs).
## 📄 License
This project is licensed under the Apache License 2.0 - see the [LICENSE](../../LICENSE) file for details.

View File

@@ -1,94 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Default Access Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_ACCESS_KEY
/// Command line argument: --access-key
/// Example: RUSTFS_ACCESS_KEY=rustfsadmin
/// Example: --access-key rustfsadmin
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Default Secret Key
/// Default value: rustfsadmin
/// Environment variable: RUSTFS_SECRET_KEY
/// Command line argument: --secret-key
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Environment variable for RPC authentication token
/// Used to set the authentication token for RPC communication
/// Example: RUSTFS_RPC_SECRET=your_token_here
/// Default value: No default value. RUSTFS_SECRET_KEY value is recommended.
pub const ENV_RPC_SECRET: &str = "RUSTFS_RPC_SECRET";
/// IAM Policy Types
/// Used to differentiate between embedded and inherited policies
/// Example: "embedded-policy" or "inherited-policy"
/// Default value: "embedded-policy"
pub const EMBEDDED_POLICY_TYPE: &str = "embedded-policy";
/// IAM Policy Types
/// Used to differentiate between embedded and inherited policies
/// Example: "embedded-policy" or "inherited-policy"
/// Default value: "inherited-policy"
pub const INHERITED_POLICY_TYPE: &str = "inherited-policy";
/// IAM Policy Claim Name for Service Account
/// Used to identify the service account policy claim in JWT tokens
/// Example: "sa-policy"
/// Default value: "sa-policy"
pub const IAM_POLICY_CLAIM_NAME_SA: &str = "sa-policy";
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_security_constants() {
// Test security related constants
assert_eq!(DEFAULT_ACCESS_KEY, "rustfsadmin");
assert!(DEFAULT_ACCESS_KEY.len() >= 8, "Access key should be at least 8 characters");
assert_eq!(DEFAULT_SECRET_KEY, "rustfsadmin");
assert!(DEFAULT_SECRET_KEY.len() >= 8, "Secret key should be at least 8 characters");
// In production environment, access key and secret key should be different
// These are default values, so being the same is acceptable, but should be warned in documentation
println!("Warning: Default access key and secret key are the same. Change them in production!");
}
#[test]
fn test_security_best_practices() {
// Test security best practices
// These are default values, should be changed in production environments
println!("Security Warning: Default credentials detected!");
println!("Access Key: {DEFAULT_ACCESS_KEY}");
println!("Secret Key: {DEFAULT_SECRET_KEY}");
println!("These should be changed in production environments!");
// Verify that key lengths meet minimum security requirements
assert!(DEFAULT_ACCESS_KEY.len() >= 8, "Access key should be at least 8 characters");
assert!(DEFAULT_SECRET_KEY.len() >= 8, "Secret key should be at least 8 characters");
// Check if default credentials contain common insecure patterns
let _insecure_patterns = ["admin", "password", "123456", "default"];
let _access_key_lower = DEFAULT_ACCESS_KEY.to_lowercase();
let _secret_key_lower = DEFAULT_SECRET_KEY.to_lowercase();
// Note: More security check logic can be added here
// For example, check if keys contain insecure patterns
}
}

View File

@@ -1,386 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{DEFAULT_SECRET_KEY, ENV_RPC_SECRET, IAM_POLICY_CLAIM_NAME_SA, INHERITED_POLICY_TYPE};
use rand::{Rng, RngCore};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::env;
use std::io::Error;
use std::sync::OnceLock;
use time::OffsetDateTime;
/// Global active credentials
static GLOBAL_ACTIVE_CRED: OnceLock<Credentials> = OnceLock::new();
/// Global RPC authentication token
pub static GLOBAL_RUSTFS_RPC_SECRET: OnceLock<String> = OnceLock::new();
/// Initialize the global action credentials
///
/// # Arguments
/// * `ak` - Optional access key
/// * `sk` - Optional secret key
///
/// # Returns
/// * `Result<(), Box<Credentials>>` - Ok if successful, Err with existing credentials if already initialized
///
/// # Panics
/// This function panics if automatic credential generation fails when `ak` or `sk`
/// are `None`, for example if the random number generator fails while calling
/// `gen_access_key` or `gen_secret_key`.
pub fn init_global_action_credentials(ak: Option<String>, sk: Option<String>) -> Result<(), Box<Credentials>> {
let ak = ak.unwrap_or_else(|| gen_access_key(20).expect("Failed to generate access key"));
let sk = sk.unwrap_or_else(|| gen_secret_key(32).expect("Failed to generate secret key"));
let cred = Credentials {
access_key: ak,
secret_key: sk,
..Default::default()
};
GLOBAL_ACTIVE_CRED.set(cred).map_err(|e| {
Box::new(Credentials {
access_key: e.access_key.clone(),
..Default::default()
})
})
}
/// Get the global action credentials
pub fn get_global_action_cred() -> Option<Credentials> {
GLOBAL_ACTIVE_CRED.get().cloned()
}
/// Get the global secret key
///
/// # Returns
/// * `Option<String>` - The global secret key, if set
///
pub fn get_global_secret_key_opt() -> Option<String> {
GLOBAL_ACTIVE_CRED.get().map(|cred| cred.secret_key.clone())
}
/// Get the global secret key
///
/// # Returns
/// * `String` - The global secret key, or empty string if not set
///
pub fn get_global_secret_key() -> String {
GLOBAL_ACTIVE_CRED
.get()
.map(|cred| cred.secret_key.clone())
.unwrap_or_default()
}
/// Get the global access key
///
/// # Returns
/// * `Option<String>` - The global access key, if set
///
pub fn get_global_access_key_opt() -> Option<String> {
GLOBAL_ACTIVE_CRED.get().map(|cred| cred.access_key.clone())
}
/// Get the global access key
///
/// # Returns
/// * `String` - The global access key, or empty string if not set
///
pub fn get_global_access_key() -> String {
GLOBAL_ACTIVE_CRED
.get()
.map(|cred| cred.access_key.clone())
.unwrap_or_default()
}
/// Generates a random access key of the specified length.
///
/// # Arguments
/// * `length` - The length of the access key to generate
///
/// # Returns
/// * `Result<String>` - A result containing the generated access key or an error if the length is too short
///
/// # Errors
/// This function will return an error if the specified length is less than 3.
///
/// Examples
/// ```no_run
/// use rustfs_credentials::gen_access_key;
///
/// let access_key = gen_access_key(16).unwrap();
/// println!("Generated access key: {}", access_key);
/// ```
///
pub fn gen_access_key(length: usize) -> std::io::Result<String> {
const ALPHA_NUMERIC_TABLE: [char; 36] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N',
'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
];
if length < 3 {
return Err(Error::other("access key length is too short"));
}
let mut result = String::with_capacity(length);
let mut rng = rand::rng();
for _ in 0..length {
result.push(ALPHA_NUMERIC_TABLE[rng.random_range(0..ALPHA_NUMERIC_TABLE.len())]);
}
Ok(result)
}
/// Generates a random secret key of the specified length.
///
/// # Arguments
/// * `length` - The length of the secret key to generate
///
/// # Returns
/// * `Result<String>` - A result containing the generated secret key or an error if the length is too short
///
/// # Errors
/// This function will return an error if the specified length is less than 8.
///
/// # Examples
/// ```no_run
/// use rustfs_credentials::gen_secret_key;
///
/// let secret_key = gen_secret_key(32).unwrap();
/// println!("Generated secret key: {}", secret_key);
/// ```
///
pub fn gen_secret_key(length: usize) -> std::io::Result<String> {
use base64_simd::URL_SAFE_NO_PAD;
if length < 8 {
return Err(Error::other("secret key length is too short"));
}
let mut rng = rand::rng();
let mut key = vec![0u8; URL_SAFE_NO_PAD.estimated_decoded_length(length)];
rng.fill_bytes(&mut key);
let encoded = URL_SAFE_NO_PAD.encode_to_string(&key);
let key_str = encoded.replace("/", "+");
Ok(key_str)
}
/// Get the RPC authentication token from environment variable
///
/// # Returns
/// * `String` - The RPC authentication token
///
pub fn get_rpc_token() -> String {
GLOBAL_RUSTFS_RPC_SECRET
.get_or_init(|| {
env::var(ENV_RPC_SECRET)
.unwrap_or_else(|_| get_global_secret_key_opt().unwrap_or_else(|| DEFAULT_SECRET_KEY.to_string()))
})
.clone()
}
/// Credentials structure
///
/// Fields:
/// - access_key: Access key string
/// - secret_key: Secret key string
/// - session_token: Session token string
/// - expiration: Optional expiration time as OffsetDateTime
/// - status: Status string (e.g., "active", "off")
/// - parent_user: Parent user string
/// - groups: Optional list of groups
/// - claims: Optional map of claims
/// - name: Optional name string
/// - description: Optional description string
///
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct Credentials {
pub access_key: String,
pub secret_key: String,
pub session_token: String,
pub expiration: Option<OffsetDateTime>,
pub status: String,
pub parent_user: String,
pub groups: Option<Vec<String>>,
pub claims: Option<HashMap<String, Value>>,
pub name: Option<String>,
pub description: Option<String>,
}
impl Credentials {
pub fn is_expired(&self) -> bool {
if self.expiration.is_none() {
return false;
}
self.expiration
.as_ref()
.map(|e| OffsetDateTime::now_utc() > *e)
.unwrap_or(false)
}
pub fn is_temp(&self) -> bool {
!self.session_token.is_empty() && !self.is_expired()
}
pub fn is_service_account(&self) -> bool {
self.claims
.as_ref()
.map(|x| x.get(IAM_POLICY_CLAIM_NAME_SA).is_some_and(|_| !self.parent_user.is_empty()))
.unwrap_or_default()
}
pub fn is_implied_policy(&self) -> bool {
if self.is_service_account() {
return self
.claims
.as_ref()
.map(|x| x.get(IAM_POLICY_CLAIM_NAME_SA).is_some_and(|v| v == INHERITED_POLICY_TYPE))
.unwrap_or_default();
}
false
}
pub fn is_valid(&self) -> bool {
if self.status == "off" {
return false;
}
self.access_key.len() >= 3 && self.secret_key.len() >= 8 && !self.is_expired()
}
pub fn is_owner(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{IAM_POLICY_CLAIM_NAME_SA, INHERITED_POLICY_TYPE};
use time::Duration;
#[test]
fn test_credentials_is_expired() {
let mut cred = Credentials::default();
assert!(!cred.is_expired());
cred.expiration = Some(OffsetDateTime::now_utc() + Duration::hours(1));
assert!(!cred.is_expired());
cred.expiration = Some(OffsetDateTime::now_utc() - Duration::hours(1));
assert!(cred.is_expired());
}
#[test]
fn test_credentials_is_temp() {
let mut cred = Credentials::default();
assert!(!cred.is_temp());
cred.session_token = "token".to_string();
assert!(cred.is_temp());
cred.expiration = Some(OffsetDateTime::now_utc() - Duration::hours(1));
assert!(!cred.is_temp());
}
#[test]
fn test_credentials_is_service_account() {
let mut cred = Credentials::default();
assert!(!cred.is_service_account());
let mut claims = HashMap::new();
claims.insert(IAM_POLICY_CLAIM_NAME_SA.to_string(), Value::String("policy".to_string()));
cred.claims = Some(claims);
cred.parent_user = "parent".to_string();
assert!(cred.is_service_account());
}
#[test]
fn test_credentials_is_implied_policy() {
let mut cred = Credentials::default();
assert!(!cred.is_implied_policy());
let mut claims = HashMap::new();
claims.insert(IAM_POLICY_CLAIM_NAME_SA.to_string(), Value::String(INHERITED_POLICY_TYPE.to_string()));
cred.claims = Some(claims);
cred.parent_user = "parent".to_string();
assert!(cred.is_implied_policy());
}
#[test]
fn test_credentials_is_valid() {
let mut cred = Credentials::default();
assert!(!cred.is_valid());
cred.access_key = "abc".to_string();
cred.secret_key = "12345678".to_string();
assert!(cred.is_valid());
cred.status = "off".to_string();
assert!(!cred.is_valid());
}
#[test]
fn test_credentials_is_owner() {
let cred = Credentials::default();
assert!(!cred.is_owner());
}
#[test]
fn test_global_credentials_flow() {
// Since OnceLock can only be set once, we put together all globally related tests
// If it has already been initialized (possibly from other tests), we verify the results directly
if get_global_action_cred().is_none() {
// Verify that the initial state is empty
assert!(get_global_access_key_opt().is_none());
assert_eq!(get_global_access_key(), "");
assert!(get_global_secret_key_opt().is_none());
assert_eq!(get_global_secret_key(), "");
// Initialize
let test_ak = "test_access_key".to_string();
let test_sk = "test_secret_key_123456".to_string();
init_global_action_credentials(Some(test_ak.clone()), Some(test_sk.clone())).ok();
}
// Verify the state after initialization
let cred = get_global_action_cred().expect("Global credentials should be set");
assert!(!cred.access_key.is_empty());
assert!(!cred.secret_key.is_empty());
assert!(get_global_access_key_opt().is_some());
assert!(!get_global_access_key().is_empty());
assert!(get_global_secret_key_opt().is_some());
assert!(!get_global_secret_key().is_empty());
}
#[test]
fn test_init_global_credentials_auto_gen() {
// If it hasn't already been initialized, the test automatically generates logic
if get_global_action_cred().is_none() {
init_global_action_credentials(None, None).ok();
let ak = get_global_access_key();
let sk = get_global_secret_key();
assert_eq!(ak.len(), 20);
assert_eq!(sk.len(), 32);
}
}
}

View File

@@ -1,19 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod constants;
mod credentials;
pub use constants::*;
pub use credentials::*;

View File

@@ -51,8 +51,3 @@ base64 = { workspace = true }
rand = { workspace = true }
chrono = { workspace = true }
md5 = { workspace = true }
suppaftp.workspace = true
rcgen.workspace = true
anyhow.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true

View File

@@ -34,8 +34,8 @@ use tracing::{error, info, warn};
use uuid::Uuid;
// Common constants for all E2E tests
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
pub const DEFAULT_ACCESS_KEY: &str = "minioadmin";
pub const DEFAULT_SECRET_KEY: &str = "minioadmin";
pub const TEST_BUCKET: &str = "e2e-test-bucket";
pub fn workspace_root() -> PathBuf {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@@ -165,7 +165,7 @@ impl RustFSTestEnvironment {
}
/// Find an available port for the test
pub async fn find_available_port() -> Result<u16, Box<dyn std::error::Error + Send + Sync>> {
async fn find_available_port() -> Result<u16, Box<dyn std::error::Error + Send + Sync>> {
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();
@@ -178,11 +178,11 @@ impl RustFSTestEnvironment {
info!("Cleaning up any existing RustFS processes");
let output = Command::new("pkill").args(["-f", "rustfs"]).output();
if let Ok(output) = output
&& output.status.success()
{
info!("Killed existing RustFS processes");
sleep(Duration::from_millis(1000)).await;
if let Ok(output) = output {
if output.status.success() {
info!("Killed existing RustFS processes");
sleep(Duration::from_millis(1000)).await;
}
}
Ok(())
}

View File

@@ -406,11 +406,11 @@ impl VaultTestEnvironment {
let port_check = TcpStream::connect(VAULT_ADDRESS).await.is_ok();
if port_check {
// Additional check by making a health request
if let Ok(response) = reqwest::get(&format!("{VAULT_URL}/v1/sys/health")).await
&& response.status().is_success()
{
info!("Vault server is ready after {} seconds", i);
return Ok(());
if let Ok(response) = reqwest::get(&format!("{VAULT_URL}/v1/sys/health")).await {
if response.status().is_success() {
info!("Vault server is ready after {} seconds", i);
return Ok(());
}
}
}

View File

@@ -18,9 +18,6 @@ mod reliant;
#[cfg(test)]
pub mod common;
#[cfg(test)]
mod version_id_regression_test;
// Data usage regression tests
#[cfg(test)]
mod data_usage_test;
@@ -40,6 +37,3 @@ mod content_encoding_test;
// Policy variables tests
#[cfg(test)]
mod policy;
#[cfg(test)]
mod protocols;

View File

@@ -1,44 +0,0 @@
# Protocol E2E Tests
FTPS and SFTP protocol end-to-end tests for RustFS.
## Prerequisites
### Required Tools
```bash
# Ubuntu/Debian
sudo apt-get install sshpass ssh-keygen
# RHEL/CentOS
sudo yum install sshpass openssh-clients
# macOS
brew install sshpass openssh
```
## Running Tests
Run all protocol tests:
```bash
cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture
```
Run only FTPS tests:
```bash
cargo test --package e2e_test test_ftps_core_operations -- --test-threads=1 --nocapture
```
## Test Coverage
### FTPS Tests
- mkdir bucket
- cd to bucket
- put file
- ls list objects
- cd . (stay in current directory)
- cd / (return to root)
- cd nonexistent bucket (should fail)
- delete object
- cdup
- rmdir delete bucket

View File

@@ -1,235 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Core FTPS tests
use crate::common::rustfs_binary_path;
use crate::protocols::test_env::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, ProtocolTestEnvironment};
use anyhow::Result;
use rcgen::generate_simple_self_signed;
use rustls::crypto::aws_lc_rs::default_provider;
use rustls::{ClientConfig, RootCertStore};
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Arc;
use suppaftp::RustlsConnector;
use suppaftp::RustlsFtpStream;
use tokio::process::Command;
use tracing::info;
// Fixed FTPS port for testing
const FTPS_PORT: u16 = 9021;
const FTPS_ADDRESS: &str = "127.0.0.1:9021";
/// Test FTPS: put, ls, mkdir, rmdir, delete operations
pub async fn test_ftps_core_operations() -> Result<()> {
let env = ProtocolTestEnvironment::new().map_err(|e| anyhow::anyhow!("{}", e))?;
// Generate and write certificate
let cert = generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])?;
let cert_path = PathBuf::from(&env.temp_dir).join("ftps.crt");
let key_path = PathBuf::from(&env.temp_dir).join("ftps.key");
let cert_pem = cert.cert.pem();
let key_pem = cert.signing_key.serialize_pem();
tokio::fs::write(&cert_path, &cert_pem).await?;
tokio::fs::write(&key_path, &key_pem).await?;
// Start server manually
info!("Starting FTPS server on {}", FTPS_ADDRESS);
let binary_path = rustfs_binary_path();
let mut server_process = Command::new(&binary_path)
.env("RUSTFS_FTPS_ENABLE", "true")
.env("RUSTFS_FTPS_ADDRESS", FTPS_ADDRESS)
.env("RUSTFS_FTPS_CERTS_FILE", cert_path.to_str().unwrap())
.env("RUSTFS_FTPS_KEY_FILE", key_path.to_str().unwrap())
.arg(&env.temp_dir)
.spawn()?;
// Ensure server is cleaned up even on failure
let result = async {
// Wait for server to be ready
ProtocolTestEnvironment::wait_for_port_ready(FTPS_PORT, 30)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
// Install the aws-lc-rs crypto provider
default_provider()
.install_default()
.map_err(|e| anyhow::anyhow!("Failed to install crypto provider: {:?}", e))?;
// Create a simple rustls config that accepts any certificate for testing
let mut root_store = RootCertStore::empty();
// Add the self-signed certificate to the trust store for e2e
// Note: In a real environment, you'd use proper root certificates
let cert_pem = cert.cert.pem();
let cert_der = rustls_pemfile::certs(&mut Cursor::new(cert_pem))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| anyhow::anyhow!("Failed to parse cert: {}", e))?;
root_store.add_parsable_certificates(cert_der);
let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
// Wrap in suppaftp's RustlsConnector
let tls_connector = RustlsConnector::from(Arc::new(config));
// Connect to FTPS server
let ftp_stream = RustlsFtpStream::connect(FTPS_ADDRESS).map_err(|e| anyhow::anyhow!("Failed to connect: {}", e))?;
// Upgrade to secure connection
let mut ftp_stream = ftp_stream
.into_secure(tls_connector, "127.0.0.1")
.map_err(|e| anyhow::anyhow!("Failed to upgrade to TLS: {}", e))?;
ftp_stream.login(DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY)?;
info!("Testing FTPS: mkdir bucket");
let bucket_name = "testbucket";
ftp_stream.mkdir(bucket_name)?;
info!("PASS: mkdir bucket '{}' successful", bucket_name);
info!("Testing FTPS: cd to bucket");
ftp_stream.cwd(bucket_name)?;
info!("PASS: cd to bucket '{}' successful", bucket_name);
info!("Testing FTPS: put file");
let filename = "test.txt";
let content = "Hello, FTPS!";
ftp_stream.put_file(filename, &mut Cursor::new(content.as_bytes()))?;
info!("PASS: put file '{}' ({} bytes) successful", filename, content.len());
info!("Testing FTPS: download file");
let downloaded_content = ftp_stream.retr(filename, |stream| {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).map_err(suppaftp::FtpError::ConnectionError)?;
Ok(buffer)
})?;
let downloaded_str = String::from_utf8(downloaded_content)?;
assert_eq!(downloaded_str, content, "Downloaded content should match uploaded content");
info!("PASS: download file '{}' successful, content matches", filename);
info!("Testing FTPS: ls list objects in bucket");
let list = ftp_stream.list(None)?;
assert!(list.iter().any(|line| line.contains(filename)), "File should appear in list");
info!("PASS: ls command successful, file '{}' found in bucket", filename);
info!("Testing FTPS: ls . (list current directory)");
let list_dot = ftp_stream.list(Some(".")).unwrap_or_else(|_| ftp_stream.list(None).unwrap());
assert!(list_dot.iter().any(|line| line.contains(filename)), "File should appear in ls .");
info!("PASS: ls . successful, file '{}' found", filename);
info!("Testing FTPS: ls / (list root directory)");
let list_root = ftp_stream.list(Some("/")).unwrap();
assert!(list_root.iter().any(|line| line.contains(bucket_name)), "Bucket should appear in ls /");
assert!(!list_root.iter().any(|line| line.contains(filename)), "File should not appear in ls /");
info!(
"PASS: ls / successful, bucket '{}' found, file '{}' not found in root",
bucket_name, filename
);
info!("Testing FTPS: ls /. (list root directory with /.)");
let list_root_dot = ftp_stream
.list(Some("/."))
.unwrap_or_else(|_| ftp_stream.list(Some("/")).unwrap());
assert!(
list_root_dot.iter().any(|line| line.contains(bucket_name)),
"Bucket should appear in ls /."
);
info!("PASS: ls /. successful, bucket '{}' found", bucket_name);
info!("Testing FTPS: ls /bucket (list bucket by absolute path)");
let list_bucket = ftp_stream.list(Some(&format!("/{}", bucket_name))).unwrap();
assert!(list_bucket.iter().any(|line| line.contains(filename)), "File should appear in ls /bucket");
info!("PASS: ls /{} successful, file '{}' found", bucket_name, filename);
info!("Testing FTPS: cd . (stay in current directory)");
ftp_stream.cwd(".")?;
info!("PASS: cd . successful (stays in current directory)");
info!("Testing FTPS: ls after cd . (should still see file)");
let list_after_dot = ftp_stream.list(None)?;
assert!(
list_after_dot.iter().any(|line| line.contains(filename)),
"File should still appear in list after cd ."
);
info!("PASS: ls after cd . successful, file '{}' still found in bucket", filename);
info!("Testing FTPS: cd / (go to root directory)");
ftp_stream.cwd("/")?;
info!("PASS: cd / successful (back to root directory)");
info!("Testing FTPS: ls after cd / (should see bucket only)");
let root_list_after = ftp_stream.list(None)?;
assert!(
!root_list_after.iter().any(|line| line.contains(filename)),
"File should not appear in root ls"
);
assert!(
root_list_after.iter().any(|line| line.contains(bucket_name)),
"Bucket should appear in root ls"
);
info!("PASS: ls after cd / successful, file not in root, bucket '{}' found in root", bucket_name);
info!("Testing FTPS: cd back to bucket");
ftp_stream.cwd(bucket_name)?;
info!("PASS: cd back to bucket '{}' successful", bucket_name);
info!("Testing FTPS: delete object");
ftp_stream.rm(filename)?;
info!("PASS: delete object '{}' successful", filename);
info!("Testing FTPS: ls verify object deleted");
let list_after = ftp_stream.list(None)?;
assert!(!list_after.iter().any(|line| line.contains(filename)), "File should be deleted");
info!("PASS: ls after delete successful, file '{}' is not found", filename);
info!("Testing FTPS: cd up to root directory");
ftp_stream.cdup()?;
info!("PASS: cd up to root directory successful");
info!("Testing FTPS: cd to nonexistent bucket (should fail)");
let nonexistent_bucket = "nonexistent-bucket";
let cd_result = ftp_stream.cwd(nonexistent_bucket);
assert!(cd_result.is_err(), "cd to nonexistent bucket should fail");
info!("PASS: cd to nonexistent bucket '{}' failed as expected", nonexistent_bucket);
info!("Testing FTPS: ls verify bucket exists in root");
let root_list = ftp_stream.list(None)?;
assert!(root_list.iter().any(|line| line.contains(bucket_name)), "Bucket should exist in root");
info!("PASS: ls root successful, bucket '{}' found in root", bucket_name);
info!("Testing FTPS: rmdir delete bucket");
ftp_stream.rmdir(bucket_name)?;
info!("PASS: rmdir bucket '{}' successful", bucket_name);
info!("Testing FTPS: ls verify bucket deleted");
let root_list_after = ftp_stream.list(None)?;
assert!(!root_list_after.iter().any(|line| line.contains(bucket_name)), "Bucket should be deleted");
info!("PASS: ls root after delete successful, bucket '{}' is not found", bucket_name);
ftp_stream.quit()?;
info!("FTPS core tests passed");
Ok(())
}
.await;
// Always cleanup server process
let _ = server_process.kill().await;
let _ = server_process.wait().await;
result
}

View File

@@ -1,19 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol tests for FTPS and SFTP
pub mod ftps_core;
pub mod test_env;
pub mod test_runner;

View File

@@ -1,72 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol test environment for FTPS and SFTP
use std::net::TcpStream;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
/// Default credentials
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Custom test environment that doesn't automatically stop servers
pub struct ProtocolTestEnvironment {
pub temp_dir: String,
}
impl ProtocolTestEnvironment {
/// Create a new test environment
/// This environment won't stop any server when dropped
pub fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let temp_dir = format!("/tmp/rustfs_protocol_test_{}", uuid::Uuid::new_v4());
std::fs::create_dir_all(&temp_dir)?;
Ok(Self { temp_dir })
}
/// Wait for server to be ready
pub async fn wait_for_port_ready(port: u16, max_attempts: u32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let address = format!("127.0.0.1:{}", port);
info!("Waiting for server to be ready on {}", address);
for i in 0..max_attempts {
if TcpStream::connect(&address).is_ok() {
info!("Server is ready after {} s", i + 1);
return Ok(());
}
if i == max_attempts - 1 {
return Err(format!("Server did not become ready within {} s", max_attempts).into());
}
sleep(Duration::from_secs(1)).await;
}
Ok(())
}
}
// Implement Drop trait that doesn't stop servers
impl Drop for ProtocolTestEnvironment {
fn drop(&mut self) {
// Clean up temp directory only, don't stop any server
if let Err(e) = std::fs::remove_dir_all(&self.temp_dir) {
warn!("Failed to clean up temp directory {}: {}", self.temp_dir, e);
}
}
}

View File

@@ -1,171 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol test runner
use crate::common::init_logging;
use crate::protocols::ftps_core::test_ftps_core_operations;
use std::time::Instant;
use tokio::time::{Duration, sleep};
use tracing::{error, info};
/// Test result
#[derive(Debug, Clone)]
pub struct TestResult {
pub test_name: String,
pub success: bool,
pub error_message: Option<String>,
}
impl TestResult {
pub fn success(test_name: String) -> Self {
Self {
test_name,
success: true,
error_message: None,
}
}
pub fn failure(test_name: String, error: String) -> Self {
Self {
test_name,
success: false,
error_message: Some(error),
}
}
}
/// Protocol test suite
pub struct ProtocolTestSuite {
tests: Vec<TestDefinition>,
}
#[derive(Debug, Clone)]
struct TestDefinition {
name: String,
}
impl ProtocolTestSuite {
/// Create default test suite
pub fn new() -> Self {
let tests = vec![
TestDefinition {
name: "test_ftps_core_operations".to_string(),
},
// TestDefinition { name: "test_sftp_core_operations".to_string() },
];
Self { tests }
}
/// Run test suite
pub async fn run_test_suite(&self) -> Vec<TestResult> {
init_logging();
info!("Starting Protocol test suite");
let start_time = Instant::now();
let mut results = Vec::new();
info!("Scheduled {} tests", self.tests.len());
// Run tests
for (i, test_def) in self.tests.iter().enumerate() {
let test_description = match test_def.name.as_str() {
"test_ftps_core_operations" => {
info!("=== Starting FTPS Module Test ===");
"FTPS core operations (put, ls, mkdir, rmdir, delete)"
}
"test_sftp_core_operations" => {
info!("=== Starting SFTP Module Test ===");
"SFTP core operations (put, ls, mkdir, rmdir, delete)"
}
_ => "",
};
info!("Test {}/{} - {}", i + 1, self.tests.len(), test_description);
info!("Running: {}", test_def.name);
let test_start = Instant::now();
let result = self.run_single_test(test_def).await;
let test_duration = test_start.elapsed();
match result {
Ok(_) => {
info!("Test passed: {} ({:.2}s)", test_def.name, test_duration.as_secs_f64());
results.push(TestResult::success(test_def.name.clone()));
}
Err(e) => {
error!("Test failed: {} ({:.2}s): {}", test_def.name, test_duration.as_secs_f64(), e);
results.push(TestResult::failure(test_def.name.clone(), e.to_string()));
}
}
// Delay between tests to avoid resource conflicts
if i < self.tests.len() - 1 {
sleep(Duration::from_secs(2)).await;
}
}
// Print summary
self.print_summary(&results, start_time.elapsed());
results
}
/// Run a single test
async fn run_single_test(&self, test_def: &TestDefinition) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match test_def.name.as_str() {
"test_ftps_core_operations" => test_ftps_core_operations().await.map_err(|e| e.into()),
// "test_sftp_core_operations" => test_sftp_core_operations().await.map_err(|e| e.into()),
_ => Err(format!("Test {} not implemented", test_def.name).into()),
}
}
/// Print test summary
fn print_summary(&self, results: &[TestResult], total_duration: Duration) {
info!("=== Test Suite Summary ===");
info!("Total duration: {:.2}s", total_duration.as_secs_f64());
info!("Total tests: {}", results.len());
let passed = results.iter().filter(|r| r.success).count();
let failed = results.len() - passed;
let success_rate = (passed as f64 / results.len() as f64) * 100.0;
info!("Passed: {} | Failed: {}", passed, failed);
info!("Success rate: {:.1}%", success_rate);
if failed > 0 {
error!("Failed tests:");
for result in results.iter().filter(|r| !r.success) {
error!(" - {}: {}", result.test_name, result.error_message.as_ref().unwrap());
}
}
}
}
/// Test suite
#[tokio::test]
async fn test_protocol_core_suite() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let suite = ProtocolTestSuite::new();
let results = suite.run_test_suite().await;
let failed = results.iter().filter(|r| !r.success).count();
if failed > 0 {
return Err(format!("Protocol tests failed: {failed} failures").into());
}
info!("All protocol tests passed");
Ok(())
}

View File

@@ -1,138 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Test for HeadObject on deleted objects with versioning enabled
//!
//! This test reproduces the issue where getting a deleted object returns
//! 200 OK instead of 404 NoSuchKey when versioning is enabled.
#![cfg(test)]
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
use bytes::Bytes;
use serial_test::serial;
use std::error::Error;
use tracing::info;
const ENDPOINT: &str = "http://localhost:9000";
const ACCESS_KEY: &str = "rustfsadmin";
const SECRET_KEY: &str = "rustfsadmin";
const BUCKET: &str = "test-head-deleted-versioning-bucket";
async fn create_aws_s3_client() -> Result<Client, Box<dyn Error>> {
let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1"));
let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region_provider)
.credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static"))
.endpoint_url(ENDPOINT)
.load()
.await;
let client = Client::from_conf(
aws_sdk_s3::Config::from(&shared_config)
.to_builder()
.force_path_style(true)
.build(),
);
Ok(client)
}
/// Setup test bucket, creating it if it doesn't exist, and enable versioning
async fn setup_test_bucket(client: &Client) -> Result<(), Box<dyn Error>> {
match client.create_bucket().bucket(BUCKET).send().await {
Ok(_) => {}
Err(SdkError::ServiceError(e)) => {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
if !error_code.eq("BucketAlreadyExists") && !error_code.eq("BucketAlreadyOwnedByYou") {
return Err(e.into());
}
}
Err(e) => {
return Err(e.into());
}
}
// Enable versioning
client
.put_bucket_versioning()
.bucket(BUCKET)
.versioning_configuration(
VersioningConfiguration::builder()
.status(BucketVersioningStatus::Enabled)
.build(),
)
.send()
.await?;
Ok(())
}
/// Test that HeadObject on a deleted object returns NoSuchKey when versioning is enabled
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_head_deleted_object_versioning_returns_nosuchkey() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
info!("🧪 Starting test_head_deleted_object_versioning_returns_nosuchkey");
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let key = "test-head-deleted-versioning.txt";
let content = b"Test content for HeadObject with versioning";
// Upload and verify
client
.put_object()
.bucket(BUCKET)
.key(key)
.body(Bytes::from_static(content).into())
.send()
.await?;
// Delete the object (creates a delete marker)
client.delete_object().bucket(BUCKET).key(key).send().await?;
// Try to head the deleted object (latest version is delete marker)
let head_result = client.head_object().bucket(BUCKET).key(key).send().await;
assert!(head_result.is_err(), "HeadObject on deleted object should return an error");
match head_result.unwrap_err() {
SdkError::ServiceError(service_err) => {
let s3_err = service_err.into_err();
assert!(
s3_err.meta().code() == Some("NoSuchKey")
|| s3_err.meta().code() == Some("NotFound")
|| s3_err.meta().code() == Some("404"),
"Error should be NoSuchKey or NotFound, got: {s3_err:?}"
);
info!("✅ HeadObject correctly returns NoSuchKey/NotFound");
}
other_err => {
panic!("Expected ServiceError but got: {other_err:?}");
}
}
Ok(())
}

View File

@@ -15,12 +15,11 @@
use async_trait::async_trait;
use rustfs_ecstore::disk::endpoint::Endpoint;
use rustfs_ecstore::rpc::RemoteClient;
use rustfs_lock::client::{LockClient, local::LocalClient};
use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient};
use rustfs_lock::types::{LockInfo, LockResponse, LockStats};
use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType};
use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager};
use rustfs_protos::proto_gen::node_service::GenerallyLockRequest;
use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
use serial_test::serial;
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration};
use tokio::time::sleep;
@@ -157,7 +156,7 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
};
let args = serde_json::to_string(&args)?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
println!("got client");
let request = Request::new(GenerallyLockRequest { args: args.clone() });
@@ -615,7 +614,7 @@ async fn test_rpc_read_lock() -> Result<(), Box<dyn Error>> {
};
let args_str = serde_json::to_string(&args)?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
// First read lock
let request = Request::new(GenerallyLockRequest { args: args_str.clone() });
@@ -670,7 +669,7 @@ async fn test_lock_refresh() -> Result<(), Box<dyn Error>> {
};
let args_str = serde_json::to_string(&args)?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
// Acquire lock
let request = Request::new(GenerallyLockRequest { args: args_str.clone() });
@@ -714,7 +713,7 @@ async fn test_force_unlock() -> Result<(), Box<dyn Error>> {
};
let args_str = serde_json::to_string(&args)?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
// Acquire lock
let request = Request::new(GenerallyLockRequest { args: args_str.clone() });

View File

@@ -14,7 +14,6 @@
mod conditional_writes;
mod get_deleted_object_test;
mod head_deleted_object_versioning_test;
mod lifecycle;
mod lock;
mod node_interact_test;

View File

@@ -17,11 +17,11 @@ use crate::common::workspace_root;
use futures::future::join_all;
use rmp_serde::{Deserializer, Serializer};
use rustfs_ecstore::disk::{VolumeInfo, WalkDirOptions};
use rustfs_ecstore::rpc::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use rustfs_filemeta::{MetaCacheEntry, MetacacheReader, MetacacheWriter};
use rustfs_protos::proto_gen::node_service::WalkDirRequest;
use rustfs_protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{
ListVolumesRequest, LocalStorageInfoRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
},
@@ -53,9 +53,7 @@ async fn ping() -> Result<(), Box<dyn Error>> {
assert!(decoded_payload.is_ok());
// Create client
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
// Construct PingRequest
let request = Request::new(PingRequest {
@@ -80,9 +78,7 @@ async fn ping() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn make_volume() -> Result<(), Box<dyn Error>> {
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(MakeVolumeRequest {
disk: "data".to_string(),
volume: "dandan".to_string(),
@@ -100,9 +96,7 @@ async fn make_volume() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn list_volumes() -> Result<(), Box<dyn Error>> {
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(ListVolumesRequest {
disk: "data".to_string(),
});
@@ -132,9 +126,7 @@ async fn walk_dir() -> Result<(), Box<dyn Error>> {
let (rd, mut wr) = tokio::io::duplex(1024);
let mut buf = Vec::new();
opts.serialize(&mut Serializer::new(&mut buf))?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let disk_path = std::env::var_os("RUSTFS_DISK_PATH").map(PathBuf::from).unwrap_or_else(|| {
let mut path = workspace_root();
path.push("target");
@@ -187,9 +179,7 @@ async fn walk_dir() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn read_all() -> Result<(), Box<dyn Error>> {
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(ReadAllRequest {
disk: "data".to_string(),
volume: "ff".to_string(),
@@ -207,9 +197,7 @@ async fn read_all() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn storage_info() -> Result<(), Box<dyn Error>> {
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(LocalStorageInfoRequest { metrics: true });
let response = client.local_storage_info(request).await?.into_inner();

View File

@@ -1,398 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Regression test for Issue #1066: Veeam VBR - S3 returned empty versionId
//!
//! This test verifies that:
//! 1. PutObject returns version_id when versioning is enabled
//! 2. CopyObject returns version_id when versioning is enabled
//! 3. CompleteMultipartUpload returns version_id when versioning is enabled
//! 4. Basic S3 operations still work correctly (no regression)
//! 5. Operations on non-versioned buckets work as expected
#[cfg(test)]
mod tests {
use crate::common::{RustFSTestEnvironment, init_logging};
use aws_sdk_s3::Client;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{BucketVersioningStatus, CompletedMultipartUpload, CompletedPart, VersioningConfiguration};
use serial_test::serial;
use tracing::info;
fn create_s3_client(env: &RustFSTestEnvironment) -> Client {
env.create_s3_client()
}
async fn create_bucket(client: &Client, bucket: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match client.create_bucket().bucket(bucket).send().await {
Ok(_) => {
info!("✅ Bucket {} created successfully", bucket);
Ok(())
}
Err(e) => {
if e.to_string().contains("BucketAlreadyOwnedByYou") || e.to_string().contains("BucketAlreadyExists") {
info!(" Bucket {} already exists", bucket);
Ok(())
} else {
Err(Box::new(e))
}
}
}
}
async fn enable_versioning(client: &Client, bucket: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let versioning_config = VersioningConfiguration::builder()
.status(BucketVersioningStatus::Enabled)
.build();
client
.put_bucket_versioning()
.bucket(bucket)
.versioning_configuration(versioning_config)
.send()
.await?;
info!("✅ Versioning enabled for bucket {}", bucket);
Ok(())
}
/// Test 1: PutObject should return version_id when versioning is enabled
/// This directly addresses the Veeam issue from #1066
#[tokio::test]
#[serial]
async fn test_put_object_returns_version_id_with_versioning() {
init_logging();
info!("🧪 TEST: PutObject returns version_id with versioning enabled");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = create_s3_client(&env);
let bucket = "test-put-version-id";
create_bucket(&client, bucket).await.expect("Failed to create bucket");
enable_versioning(&client, bucket).await.expect("Failed to enable versioning");
let key = "test-file.txt";
let content = b"Test content for version ID test";
info!("📤 Uploading object with key: {}", key);
let result = client
.put_object()
.bucket(bucket)
.key(key)
.body(ByteStream::from_static(content))
.send()
.await;
assert!(result.is_ok(), "PutObject failed: {:?}", result.err());
let output = result.unwrap();
info!("📥 PutObject response - version_id: {:?}", output.version_id);
assert!(
output.version_id.is_some(),
"❌ FAILED: version_id should be present when versioning is enabled"
);
assert!(
!output.version_id.as_ref().unwrap().is_empty(),
"❌ FAILED: version_id should not be empty"
);
info!("✅ PASSED: PutObject correctly returns version_id");
}
/// Test 2: CopyObject should return version_id when versioning is enabled
#[tokio::test]
#[serial]
async fn test_copy_object_returns_version_id_with_versioning() {
init_logging();
info!("🧪 TEST: CopyObject returns version_id with versioning enabled");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = create_s3_client(&env);
let bucket = "test-copy-version-id";
create_bucket(&client, bucket).await.expect("Failed to create bucket");
enable_versioning(&client, bucket).await.expect("Failed to enable versioning");
let source_key = "source-file.txt";
let dest_key = "dest-file.txt";
let content = b"Content to copy";
// First, create source object
client
.put_object()
.bucket(bucket)
.key(source_key)
.body(ByteStream::from_static(content))
.send()
.await
.expect("Failed to create source object");
info!("📤 Copying object from {} to {}", source_key, dest_key);
let copy_result = client
.copy_object()
.bucket(bucket)
.key(dest_key)
.copy_source(format!("{}/{}", bucket, source_key))
.send()
.await;
assert!(copy_result.is_ok(), "CopyObject failed: {:?}", copy_result.err());
let output = copy_result.unwrap();
info!("📥 CopyObject response - version_id: {:?}", output.version_id);
assert!(
output.version_id.is_some(),
"❌ FAILED: version_id should be present when versioning is enabled"
);
assert!(
!output.version_id.as_ref().unwrap().is_empty(),
"❌ FAILED: version_id should not be empty"
);
info!("✅ PASSED: CopyObject correctly returns version_id");
}
/// Test 3: CompleteMultipartUpload should return version_id when versioning is enabled
#[tokio::test]
#[serial]
async fn test_multipart_upload_returns_version_id_with_versioning() {
init_logging();
info!("🧪 TEST: CompleteMultipartUpload returns version_id with versioning enabled");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = create_s3_client(&env);
let bucket = "test-multipart-version-id";
create_bucket(&client, bucket).await.expect("Failed to create bucket");
enable_versioning(&client, bucket).await.expect("Failed to enable versioning");
let key = "multipart-file.txt";
let content = b"Part 1 content for multipart upload test";
info!("📤 Creating multipart upload for key: {}", key);
let create_result = client
.create_multipart_upload()
.bucket(bucket)
.key(key)
.send()
.await
.expect("Failed to create multipart upload");
let upload_id = create_result.upload_id().expect("No upload_id returned");
info!("📤 Uploading part 1");
let upload_part_result = client
.upload_part()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.part_number(1)
.body(ByteStream::from_static(content))
.send()
.await
.expect("Failed to upload part");
let etag = upload_part_result.e_tag().expect("No etag returned").to_string();
let completed_part = CompletedPart::builder().part_number(1).e_tag(etag).build();
let completed_upload = CompletedMultipartUpload::builder().parts(completed_part).build();
info!("📤 Completing multipart upload");
let complete_result = client
.complete_multipart_upload()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.multipart_upload(completed_upload)
.send()
.await;
assert!(complete_result.is_ok(), "CompleteMultipartUpload failed: {:?}", complete_result.err());
let output = complete_result.unwrap();
info!("📥 CompleteMultipartUpload response - version_id: {:?}", output.version_id);
assert!(
output.version_id.is_some(),
"❌ FAILED: version_id should be present when versioning is enabled"
);
assert!(
!output.version_id.as_ref().unwrap().is_empty(),
"❌ FAILED: version_id should not be empty"
);
info!("✅ PASSED: CompleteMultipartUpload correctly returns version_id");
}
/// Test 4: PutObject should NOT return version_id when versioning is NOT enabled
/// This ensures we didn't break non-versioned buckets
#[tokio::test]
#[serial]
async fn test_put_object_without_versioning() {
init_logging();
info!("🧪 TEST: PutObject behavior without versioning (no regression)");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = create_s3_client(&env);
let bucket = "test-no-versioning";
create_bucket(&client, bucket).await.expect("Failed to create bucket");
// Note: NOT enabling versioning here
let key = "test-file.txt";
let content = b"Test content without versioning";
info!("📤 Uploading object to non-versioned bucket");
let result = client
.put_object()
.bucket(bucket)
.key(key)
.body(ByteStream::from_static(content))
.send()
.await;
assert!(result.is_ok(), "PutObject failed: {:?}", result.err());
let output = result.unwrap();
info!("📥 PutObject response - version_id: {:?}", output.version_id);
// version_id can be None or Some("null") for non-versioned buckets
info!("✅ PASSED: PutObject works correctly without versioning");
}
/// Test 5: Basic S3 operations still work correctly (no regression)
#[tokio::test]
#[serial]
async fn test_basic_s3_operations_no_regression() {
init_logging();
info!("🧪 TEST: Basic S3 operations work correctly (no regression)");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = create_s3_client(&env);
let bucket = "test-basic-operations";
create_bucket(&client, bucket).await.expect("Failed to create bucket");
enable_versioning(&client, bucket).await.expect("Failed to enable versioning");
let key = "test-basic-file.txt";
let content = b"Basic operations test content";
// Test PUT
info!("📤 Testing PUT operation");
let put_result = client
.put_object()
.bucket(bucket)
.key(key)
.body(ByteStream::from_static(content))
.send()
.await;
assert!(put_result.is_ok(), "PUT operation failed");
let _version_id = put_result.unwrap().version_id;
// Test GET
info!("📥 Testing GET operation");
let get_result = client.get_object().bucket(bucket).key(key).send().await;
assert!(get_result.is_ok(), "GET operation failed");
let body = get_result.unwrap().body.collect().await.unwrap().to_vec();
assert_eq!(body, content, "Content mismatch after GET");
// Test HEAD
info!("📋 Testing HEAD operation");
let head_result = client.head_object().bucket(bucket).key(key).send().await;
assert!(head_result.is_ok(), "HEAD operation failed");
// Test LIST
info!("📝 Testing LIST operation");
let list_result = client.list_objects_v2().bucket(bucket).send().await;
assert!(list_result.is_ok(), "LIST operation failed");
let list_output = list_result.unwrap();
let objects = list_output.contents();
assert!(objects.iter().any(|obj| obj.key() == Some(key)), "Object not found in LIST");
// Test DELETE
info!("🗑️ Testing DELETE operation");
let delete_result = client.delete_object().bucket(bucket).key(key).send().await;
assert!(delete_result.is_ok(), "DELETE operation failed");
// Verify object is deleted (should return NoSuchKey or version marker)
let get_after_delete = client.get_object().bucket(bucket).key(key).send().await;
assert!(
get_after_delete.is_err() || get_after_delete.unwrap().delete_marker == Some(true),
"Object should be deleted or have delete marker"
);
info!("✅ PASSED: All basic S3 operations work correctly");
}
/// Test 6: Veeam-specific scenario simulation
/// Simulates the exact workflow that Veeam uses when backing up data
#[tokio::test]
#[serial]
async fn test_veeam_backup_workflow_simulation() {
init_logging();
info!("🧪 TEST: Veeam VBR backup workflow simulation (Issue #1066)");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = create_s3_client(&env);
let bucket = "veeam-backup-test";
create_bucket(&client, bucket).await.expect("Failed to create bucket");
enable_versioning(&client, bucket).await.expect("Failed to enable versioning");
// Veeam typically creates multiple objects in a backup session
let test_paths = vec![
"Veeam/Backup/Clients/test-client-id/test-backup-id/CloudStg/Meta/Blocks/History/CheckpointHistory.dat",
"Veeam/Backup/Clients/test-client-id/test-backup-id/Metadata/Lock/create.checkpoint/declare",
];
for path in test_paths {
info!("📤 Simulating Veeam upload to: {}", path);
let content = format!("Veeam backup data for {}", path);
let put_result = client
.put_object()
.bucket(bucket)
.key(path)
.body(ByteStream::from(content.into_bytes()))
.send()
.await;
assert!(put_result.is_ok(), "Veeam upload failed for path: {}", path);
let output = put_result.unwrap();
info!("📥 Response version_id: {:?}", output.version_id);
assert!(output.version_id.is_some(), "❌ FAILED: Veeam expects version_id for path: {}", path);
assert!(
!output.version_id.as_ref().unwrap().is_empty(),
"❌ FAILED: version_id should not be empty for path: {}",
path
);
info!("✅ Veeam upload successful with version_id for: {}", path);
}
info!("✅ PASSED: Veeam backup workflow simulation completed successfully");
}
}

View File

@@ -34,19 +34,12 @@ workspace = true
default = []
[dependencies]
rustfs-filemeta.workspace = true
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-rio.workspace = true
rustfs-signer.workspace = true
rustfs-checksums.workspace = true
rustfs-config = { workspace = true, features = ["constants", "notify", "audit"] }
rustfs-credentials = { workspace = true }
rustfs-common.workspace = true
rustfs-policy.workspace = true
rustfs-protos.workspace = true
async-trait.workspace = true
bytes.workspace = true
byteorder = { workspace = true }
rustfs-common.workspace = true
rustfs-policy.workspace = true
chrono.workspace = true
glob = { workspace = true }
thiserror.workspace = true
@@ -67,6 +60,7 @@ lazy_static.workspace = true
rustfs-lock.workspace = true
regex = { workspace = true }
path-absolutize = { workspace = true }
rustfs-protos.workspace = true
rmp.workspace = true
rmp-serde.workspace = true
tokio-util = { workspace = true, features = ["io", "compat"] }
@@ -97,6 +91,11 @@ aws-sdk-s3 = { workspace = true }
urlencoding = { workspace = true }
smallvec = { workspace = true }
shadow-rs.workspace = true
rustfs-filemeta.workspace = true
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-rio.workspace = true
rustfs-signer.workspace = true
rustfs-checksums.workspace = true
async-recursion.workspace = true
aws-credential-types = { workspace = true }
aws-smithy-types = { workspace = true }
@@ -108,14 +107,12 @@ google-cloud-storage = { workspace = true }
google-cloud-auth = { workspace = true }
aws-config = { workspace = true }
faster-hex = { workspace = true }
dunce = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
criterion = { workspace = true, features = ["html_reports"] }
temp-env = { workspace = true }
tracing-subscriber = { workspace = true }
[build-dependencies]
shadow-rs = { workspace = true, features = ["build", "metadata"] }

View File

@@ -0,0 +1,19 @@
# ECStore - Erasure Coding Storage
ECStore provides erasure coding functionality for the RustFS project, using high-performance Reed-Solomon SIMD implementation for optimal performance.
## Features
- **Reed-Solomon Implementation**: High-performance SIMD-optimized erasure coding
- **Cross-Platform Compatibility**: Support for x86_64, aarch64, and other architectures
- **Performance Optimized**: SIMD instructions for maximum throughput
- **Thread Safety**: Safe concurrent access with caching optimizations
- **Scalable**: Excellent performance for high-throughput scenarios
## Documentation
For complete documentation, examples, and usage information, please visit the main [RustFS repository](https://github.com/rustfs/rustfs).
## License
This project is licensed under the Apache License, Version 2.0.

View File

@@ -14,7 +14,6 @@
use crate::data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, load_data_usage_from_backend};
use crate::error::{Error, Result};
use crate::rpc::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use crate::{
disk::endpoint::Endpoint,
global::{GLOBAL_BOOT_TIME, GLOBAL_Endpoints},
@@ -24,12 +23,13 @@ use crate::{
};
use crate::data_usage::load_data_usage_cache;
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState};
use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState};
use rustfs_madmin::{
BackendDisks, Disk, ErasureSetInfo, ITEM_INITIALIZING, ITEM_OFFLINE, ITEM_ONLINE, InfoMessage, ServerProperties,
};
use rustfs_protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{PingRequest, PingResponse},
};
use std::{
@@ -101,9 +101,9 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let mut client = node_service_time_out_client(&addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
let mut client = node_service_time_out_client(&addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
.map_err(|err| Error::other(err.to_string()))?;
let request = Request::new(PingRequest {
version: 1,

View File

@@ -498,19 +498,19 @@ impl BucketTargetSys {
bucket: bucket.to_string(),
})?;
if arn.arn_type == BucketTargetType::ReplicationService
&& let Ok((config, _)) = get_replication_config(bucket).await
{
for rule in config.filter_target_arns(&ObjectOpts {
op_type: ReplicationType::All,
..Default::default()
}) {
if rule == arn_str || config.role == arn_str {
let arn_remotes_map = self.arn_remotes_map.read().await;
if arn_remotes_map.get(arn_str).is_some() {
return Err(BucketTargetError::BucketRemoteRemoveDisallowed {
bucket: bucket.to_string(),
});
if arn.arn_type == BucketTargetType::ReplicationService {
if let Ok((config, _)) = get_replication_config(bucket).await {
for rule in config.filter_target_arns(&ObjectOpts {
op_type: ReplicationType::All,
..Default::default()
}) {
if rule == arn_str || config.role == arn_str {
let arn_remotes_map = self.arn_remotes_map.read().await;
if arn_remotes_map.get(arn_str).is_some() {
return Err(BucketTargetError::BucketRemoteRemoveDisallowed {
bucket: bucket.to_string(),
});
}
}
}
}
@@ -691,22 +691,22 @@ impl BucketTargetSys {
}
// Add new targets
if let Some(new_targets) = targets
&& !new_targets.is_empty()
{
for target in &new_targets.targets {
if let Ok(client) = self.get_remote_target_client_internal(target).await {
arn_remotes_map.insert(
target.arn.clone(),
ArnTarget {
client: Some(Arc::new(client)),
last_refresh: OffsetDateTime::now_utc(),
},
);
self.update_bandwidth_limit(bucket, &target.arn, target.bandwidth_limit);
if let Some(new_targets) = targets {
if !new_targets.is_empty() {
for target in &new_targets.targets {
if let Ok(client) = self.get_remote_target_client_internal(target).await {
arn_remotes_map.insert(
target.arn.clone(),
ArnTarget {
client: Some(Arc::new(client)),
last_refresh: OffsetDateTime::now_utc(),
},
);
self.update_bandwidth_limit(bucket, &target.arn, target.bandwidth_limit);
}
}
targets_map.insert(bucket.to_string(), new_targets.targets.clone());
}
targets_map.insert(bucket.to_string(), new_targets.targets.clone());
}
}

View File

@@ -31,10 +31,10 @@ impl BucketObjectLockSys {
}
pub async fn get(bucket: &str) -> Option<DefaultRetention> {
if let Ok(object_lock_config) = get_object_lock_config(bucket).await
&& let Some(object_lock_rule) = object_lock_config.0.rule
{
return object_lock_rule.default_retention;
if let Ok(object_lock_config) = get_object_lock_config(bucket).await {
if let Some(object_lock_rule) = object_lock_config.0.rule {
return object_lock_rule.default_retention;
}
}
None
}

View File

@@ -55,10 +55,10 @@ impl ReplicationConfigurationExt for ReplicationConfiguration {
if !has_arn {
has_arn = true;
}
if let Some(status) = &rule.existing_object_replication
&& status.status == ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED)
{
return (true, true);
if let Some(status) = &rule.existing_object_replication {
if status.status == ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED) {
return (true, true);
}
}
}
}
@@ -86,11 +86,12 @@ impl ReplicationConfigurationExt for ReplicationConfiguration {
continue;
}
if let Some(status) = &rule.existing_object_replication
&& obj.existing_object
&& status.status == ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::DISABLED)
{
continue;
if let Some(status) = &rule.existing_object_replication {
if obj.existing_object
&& status.status == ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::DISABLED)
{
continue;
}
}
if !obj.name.starts_with(rule.prefix()) {
@@ -144,11 +145,12 @@ impl ReplicationConfigurationExt for ReplicationConfiguration {
continue;
}
if let Some(status) = &rule.existing_object_replication
&& obj.existing_object
&& status.status == ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::DISABLED)
{
return false;
if let Some(status) = &rule.existing_object_replication {
if obj.existing_object
&& status.status == ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::DISABLED)
{
return false;
}
}
if obj.op_type == ReplicationType::Delete {
@@ -184,20 +186,20 @@ impl ReplicationConfigurationExt for ReplicationConfiguration {
continue;
}
if let Some(filter) = &rule.filter
&& let Some(filter_prefix) = &filter.prefix
{
if !prefix.is_empty() && !filter_prefix.is_empty() {
// The provided prefix must fall within the rule prefix
if !recursive && !prefix.starts_with(filter_prefix) {
if let Some(filter) = &rule.filter {
if let Some(filter_prefix) = &filter.prefix {
if !prefix.is_empty() && !filter_prefix.is_empty() {
// The provided prefix must fall within the rule prefix
if !recursive && !prefix.starts_with(filter_prefix) {
continue;
}
}
// When recursive, skip this rule if it does not match the test prefix or hierarchy
if recursive && !rule.prefix().starts_with(prefix) && !prefix.starts_with(rule.prefix()) {
continue;
}
}
// When recursive, skip this rule if it does not match the test prefix or hierarchy
if recursive && !rule.prefix().starts_with(prefix) && !prefix.starts_with(rule.prefix()) {
continue;
}
}
return true;
}

View File

@@ -512,20 +512,20 @@ impl<S: StorageAPI> ReplicationPool<S> {
if !lrg_workers.is_empty() {
let index = (hash as usize) % lrg_workers.len();
if let Some(worker) = lrg_workers.get(index)
&& worker.try_send(ReplicationOperation::Object(Box::new(ri.clone()))).is_err()
{
// Queue to MRF if worker is busy
let _ = self.mrf_save_tx.try_send(ri.to_mrf_entry());
if let Some(worker) = lrg_workers.get(index) {
if worker.try_send(ReplicationOperation::Object(Box::new(ri.clone()))).is_err() {
// Queue to MRF if worker is busy
let _ = self.mrf_save_tx.try_send(ri.to_mrf_entry());
// Try to add more workers if possible
let max_l_workers = *self.max_l_workers.read().await;
let existing = lrg_workers.len();
if self.active_lrg_workers() < std::cmp::min(max_l_workers, LARGE_WORKER_COUNT) as i32 {
let workers = std::cmp::min(existing + 1, max_l_workers);
// Try to add more workers if possible
let max_l_workers = *self.max_l_workers.read().await;
let existing = lrg_workers.len();
if self.active_lrg_workers() < std::cmp::min(max_l_workers, LARGE_WORKER_COUNT) as i32 {
let workers = std::cmp::min(existing + 1, max_l_workers);
drop(lrg_workers);
self.resize_lrg_workers(workers, existing).await;
drop(lrg_workers);
self.resize_lrg_workers(workers, existing).await;
}
}
}
}
@@ -539,45 +539,47 @@ impl<S: StorageAPI> ReplicationPool<S> {
_ => self.get_worker_ch(&ri.bucket, &ri.name, ri.size).await,
};
if let Some(channel) = ch
&& channel.try_send(ReplicationOperation::Object(Box::new(ri.clone()))).is_err()
{
// Queue to MRF if all workers are busy
let _ = self.mrf_save_tx.try_send(ri.to_mrf_entry());
if let Some(channel) = ch {
if channel.try_send(ReplicationOperation::Object(Box::new(ri.clone()))).is_err() {
// Queue to MRF if all workers are busy
let _ = self.mrf_save_tx.try_send(ri.to_mrf_entry());
// Try to scale up workers based on priority
let priority = self.priority.read().await.clone();
let max_workers = *self.max_workers.read().await;
// Try to scale up workers based on priority
let priority = self.priority.read().await.clone();
let max_workers = *self.max_workers.read().await;
match priority {
ReplicationPriority::Fast => {
// Log warning about unable to keep up
info!("Warning: Unable to keep up with incoming traffic");
}
ReplicationPriority::Slow => {
info!("Warning: Unable to keep up with incoming traffic - recommend increasing replication priority to auto");
}
ReplicationPriority::Auto => {
let max_w = std::cmp::min(max_workers, WORKER_MAX_LIMIT);
let active_workers = self.active_workers();
if active_workers < max_w as i32 {
let workers = self.workers.read().await;
let new_count = std::cmp::min(workers.len() + 1, max_w);
let existing = workers.len();
drop(workers);
self.resize_workers(new_count, existing).await;
match priority {
ReplicationPriority::Fast => {
// Log warning about unable to keep up
info!("Warning: Unable to keep up with incoming traffic");
}
ReplicationPriority::Slow => {
info!(
"Warning: Unable to keep up with incoming traffic - recommend increasing replication priority to auto"
);
}
ReplicationPriority::Auto => {
let max_w = std::cmp::min(max_workers, WORKER_MAX_LIMIT);
let active_workers = self.active_workers();
let max_mrf_workers = std::cmp::min(max_workers, MRF_WORKER_MAX_LIMIT);
let active_mrf = self.active_mrf_workers();
if active_workers < max_w as i32 {
let workers = self.workers.read().await;
let new_count = std::cmp::min(workers.len() + 1, max_w);
let existing = workers.len();
if active_mrf < max_mrf_workers as i32 {
let current_mrf = self.mrf_worker_size.load(Ordering::SeqCst);
let new_mrf = std::cmp::min(current_mrf + 1, max_mrf_workers as i32);
drop(workers);
self.resize_workers(new_count, existing).await;
}
self.resize_failed_workers(new_mrf).await;
let max_mrf_workers = std::cmp::min(max_workers, MRF_WORKER_MAX_LIMIT);
let active_mrf = self.active_mrf_workers();
if active_mrf < max_mrf_workers as i32 {
let current_mrf = self.mrf_worker_size.load(Ordering::SeqCst);
let new_mrf = std::cmp::min(current_mrf + 1, max_mrf_workers as i32);
self.resize_failed_workers(new_mrf).await;
}
}
}
}
@@ -591,29 +593,31 @@ impl<S: StorageAPI> ReplicationPool<S> {
_ => self.get_worker_ch(&doi.bucket, &doi.delete_object.object_name, 0).await,
};
if let Some(channel) = ch
&& channel.try_send(ReplicationOperation::Delete(Box::new(doi.clone()))).is_err()
{
let _ = self.mrf_save_tx.try_send(doi.to_mrf_entry());
if let Some(channel) = ch {
if channel.try_send(ReplicationOperation::Delete(Box::new(doi.clone()))).is_err() {
let _ = self.mrf_save_tx.try_send(doi.to_mrf_entry());
let priority = self.priority.read().await.clone();
let max_workers = *self.max_workers.read().await;
let priority = self.priority.read().await.clone();
let max_workers = *self.max_workers.read().await;
match priority {
ReplicationPriority::Fast => {
info!("Warning: Unable to keep up with incoming deletes");
}
ReplicationPriority::Slow => {
info!("Warning: Unable to keep up with incoming deletes - recommend increasing replication priority to auto");
}
ReplicationPriority::Auto => {
let max_w = std::cmp::min(max_workers, WORKER_MAX_LIMIT);
if self.active_workers() < max_w as i32 {
let workers = self.workers.read().await;
let new_count = std::cmp::min(workers.len() + 1, max_w);
let existing = workers.len();
drop(workers);
self.resize_workers(new_count, existing).await;
match priority {
ReplicationPriority::Fast => {
info!("Warning: Unable to keep up with incoming deletes");
}
ReplicationPriority::Slow => {
info!(
"Warning: Unable to keep up with incoming deletes - recommend increasing replication priority to auto"
);
}
ReplicationPriority::Auto => {
let max_w = std::cmp::min(max_workers, WORKER_MAX_LIMIT);
if self.active_workers() < max_w as i32 {
let workers = self.workers.read().await;
let new_count = std::cmp::min(workers.len() + 1, max_w);
let existing = workers.len();
drop(workers);
self.resize_workers(new_count, existing).await;
}
}
}
}

View File

@@ -242,10 +242,11 @@ impl ReplicationResyncer {
if let Some(last_update) = status.last_update
&& last_update > *last_update_times.get(bucket).unwrap_or(&OffsetDateTime::UNIX_EPOCH) {
if let Some(last_update) = status.last_update {
if last_update > *last_update_times.get(bucket).unwrap_or(&OffsetDateTime::UNIX_EPOCH) {
update = true;
}
}
if update {
if let Err(err) = save_resync_status(bucket, status, api.clone()).await {
@@ -344,12 +345,13 @@ impl ReplicationResyncer {
return;
};
if !heal
&& let Err(e) = self
if !heal {
if let Err(e) = self
.mark_status(ResyncStatusType::ResyncStarted, opts.clone(), storage.clone())
.await
{
error!("Failed to mark resync status: {}", e);
{
error!("Failed to mark resync status: {}", e);
}
}
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
@@ -1461,18 +1463,21 @@ async fn replicate_delete_to_target(dobj: &DeletedObjectReplicationInfo, tgt_cli
Some(version_id.to_string())
};
if dobj.delete_object.delete_marker_version_id.is_some()
&& let Err(e) = tgt_client
if dobj.delete_object.delete_marker_version_id.is_some() {
if let Err(e) = tgt_client
.head_object(&tgt_client.bucket, &dobj.delete_object.object_name, version_id.clone())
.await
&& let SdkError::ServiceError(service_err) = &e
&& !service_err.err().is_not_found()
{
rinfo.replication_status = ReplicationStatusType::Failed;
rinfo.error = Some(e.to_string());
{
if let SdkError::ServiceError(service_err) = &e {
if !service_err.err().is_not_found() {
rinfo.replication_status = ReplicationStatusType::Failed;
rinfo.error = Some(e.to_string());
return rinfo;
};
return rinfo;
}
}
};
}
match tgt_client
.remove_object(

View File

@@ -49,13 +49,13 @@ impl ExponentialMovingAverage {
pub fn update_exponential_moving_average(&self, now: SystemTime) {
if let Ok(mut last_update_guard) = self.last_update.try_lock() {
let last_update = *last_update_guard;
if let Ok(duration) = now.duration_since(last_update)
&& duration.as_secs() > 0
{
let decay = (-duration.as_secs_f64() / 60.0).exp(); // 1 minute decay
let current_value = f64::from_bits(self.value.load(AtomicOrdering::Relaxed));
self.value.store((current_value * decay).to_bits(), AtomicOrdering::Relaxed);
*last_update_guard = now;
if let Ok(duration) = now.duration_since(last_update) {
if duration.as_secs() > 0 {
let decay = (-duration.as_secs_f64() / 60.0).exp(); // 1 minute decay
let current_value = f64::from_bits(self.value.load(AtomicOrdering::Relaxed));
self.value.store((current_value * decay).to_bits(), AtomicOrdering::Relaxed);
*last_update_guard = now;
}
}
}
}
@@ -757,10 +757,10 @@ impl ReplicationStats {
/// Check if bucket replication statistics have usage
pub fn has_replication_usage(&self, bucket: &str) -> bool {
if let Ok(cache) = self.cache.try_read()
&& let Some(stats) = cache.get(bucket)
{
return stats.has_replication_usage();
if let Ok(cache) = self.cache.try_read() {
if let Some(stats) = cache.get(bucket) {
return stats.has_replication_usage();
}
}
false
}

View File

@@ -13,8 +13,7 @@
// limitations under the License.
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result, StorageError};
use rustfs_utils::path::SLASH_SEPARATOR;
use crate::error::{Error, Result};
use s3s::xml;
pub fn is_meta_bucketname(name: &str) -> bool {
@@ -22,7 +21,6 @@ pub fn is_meta_bucketname(name: &str) -> bool {
}
use regex::Regex;
use tracing::instrument;
lazy_static::lazy_static! {
static ref VALID_BUCKET_NAME: Regex = Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\.\-\_\:]{1,61}[A-Za-z0-9]$").unwrap();
@@ -115,420 +113,3 @@ pub fn serialize<T: xml::Serialize>(val: &T) -> xml::SerResult<Vec<u8>> {
}
Ok(buf)
}
pub fn has_bad_path_component(path: &str) -> bool {
let n = path.len();
if n > 32 << 10 {
// At 32K we are beyond reasonable.
return true;
}
let bytes = path.as_bytes();
let mut i = 0;
// Skip leading slashes (for sake of Windows \ is included as well)
while i < n && (bytes[i] == b'/' || bytes[i] == b'\\') {
i += 1;
}
while i < n {
// Find the next segment
let start = i;
while i < n && bytes[i] != b'/' && bytes[i] != b'\\' {
i += 1;
}
// Trim whitespace of segment
let mut segment_start = start;
let mut segment_end = i;
while segment_start < segment_end && bytes[segment_start].is_ascii_whitespace() {
segment_start += 1;
}
while segment_end > segment_start && bytes[segment_end - 1].is_ascii_whitespace() {
segment_end -= 1;
}
// Check for ".." or "."
match segment_end - segment_start {
2 if segment_start + 1 < n && bytes[segment_start] == b'.' && bytes[segment_start + 1] == b'.' => {
return true;
}
1 if bytes[segment_start] == b'.' => {
return true;
}
_ => {}
}
if i < n {
i += 1;
}
}
false
}
pub fn is_valid_object_prefix(object: &str) -> bool {
if has_bad_path_component(object) {
return false;
}
if !object.is_char_boundary(0) || std::str::from_utf8(object.as_bytes()).is_err() {
return false;
}
if object.contains("//") {
return false;
}
// This is valid for AWS S3 but it will never
// work with file systems, we will reject here
// to return object name invalid rather than
// a cryptic error from the file system.
!object.contains('\0')
}
pub fn is_valid_object_name(object: &str) -> bool {
// Implement object name validation
if object.is_empty() {
return false;
}
if object.ends_with(SLASH_SEPARATOR) {
return false;
}
is_valid_object_prefix(object)
}
pub fn check_object_name_for_length_and_slash(bucket: &str, object: &str) -> Result<()> {
if object.len() > 1024 {
return Err(StorageError::ObjectNameTooLong(bucket.to_owned(), object.to_owned()));
}
if object.starts_with(SLASH_SEPARATOR) {
return Err(StorageError::ObjectNamePrefixAsSlash(bucket.to_owned(), object.to_owned()));
}
#[cfg(target_os = "windows")]
{
if object.contains(':')
|| object.contains('*')
|| object.contains('?')
|| object.contains('"')
|| object.contains('|')
|| object.contains('<')
|| object.contains('>')
// || object.contains('\\')
{
return Err(StorageError::ObjectNameInvalid(bucket.to_owned(), object.to_owned()));
}
}
Ok(())
}
pub fn check_copy_obj_args(bucket: &str, object: &str) -> Result<()> {
check_bucket_and_object_names(bucket, object)
}
pub fn check_get_obj_args(bucket: &str, object: &str) -> Result<()> {
check_bucket_and_object_names(bucket, object)
}
pub fn check_del_obj_args(bucket: &str, object: &str) -> Result<()> {
check_bucket_and_object_names(bucket, object)
}
pub fn check_bucket_and_object_names(bucket: &str, object: &str) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(StorageError::BucketNameInvalid(bucket.to_string()));
}
if object.is_empty() {
return Err(StorageError::ObjectNameInvalid(bucket.to_string(), object.to_string()));
}
if !is_valid_object_prefix(object) {
return Err(StorageError::ObjectNameInvalid(bucket.to_string(), object.to_string()));
}
// if cfg!(target_os = "windows") && object.contains('\\') {
// return Err(StorageError::ObjectNameInvalid(bucket.to_string(), object.to_string()));
// }
Ok(())
}
pub fn check_list_objs_args(bucket: &str, prefix: &str, _marker: &Option<String>) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(StorageError::BucketNameInvalid(bucket.to_string()));
}
if !is_valid_object_prefix(prefix) {
return Err(StorageError::ObjectNameInvalid(bucket.to_string(), prefix.to_string()));
}
Ok(())
}
pub fn check_list_multipart_args(
bucket: &str,
prefix: &str,
key_marker: &Option<String>,
upload_id_marker: &Option<String>,
_delimiter: &Option<String>,
) -> Result<()> {
check_list_objs_args(bucket, prefix, key_marker)?;
if let Some(upload_id_marker) = upload_id_marker {
if let Some(key_marker) = key_marker
&& key_marker.ends_with('/')
{
return Err(StorageError::InvalidUploadIDKeyCombination(
upload_id_marker.to_string(),
key_marker.to_string(),
));
}
if let Err(_e) = base64_simd::URL_SAFE_NO_PAD.decode_to_vec(upload_id_marker.as_bytes()) {
return Err(StorageError::MalformedUploadID(upload_id_marker.to_owned()));
}
}
Ok(())
}
pub fn check_object_args(bucket: &str, object: &str) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(StorageError::BucketNameInvalid(bucket.to_string()));
}
check_object_name_for_length_and_slash(bucket, object)?;
if !is_valid_object_name(object) {
return Err(StorageError::ObjectNameInvalid(bucket.to_string(), object.to_string()));
}
Ok(())
}
pub fn check_new_multipart_args(bucket: &str, object: &str) -> Result<()> {
check_object_args(bucket, object)
}
pub fn check_multipart_object_args(bucket: &str, object: &str, upload_id: &str) -> Result<()> {
if let Err(e) = base64_simd::URL_SAFE_NO_PAD.decode_to_vec(upload_id.as_bytes()) {
return Err(StorageError::MalformedUploadID(format!("{bucket}/{object}-{upload_id},err:{e}")));
};
check_object_args(bucket, object)
}
pub fn check_put_object_part_args(bucket: &str, object: &str, upload_id: &str) -> Result<()> {
check_multipart_object_args(bucket, object, upload_id)
}
pub fn check_list_parts_args(bucket: &str, object: &str, upload_id: &str) -> Result<()> {
check_multipart_object_args(bucket, object, upload_id)
}
pub fn check_complete_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Result<()> {
check_multipart_object_args(bucket, object, upload_id)
}
pub fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Result<()> {
check_multipart_object_args(bucket, object, upload_id)
}
#[instrument(level = "debug")]
pub fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(StorageError::BucketNameInvalid(bucket.to_string()));
}
check_object_name_for_length_and_slash(bucket, object)?;
if object.is_empty() || !is_valid_object_prefix(object) {
return Err(StorageError::ObjectNameInvalid(bucket.to_string(), object.to_string()));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// Test validation functions
#[test]
fn test_is_valid_object_name() {
// Valid cases
assert!(is_valid_object_name("valid-object-name"));
assert!(is_valid_object_name("object/with/slashes"));
assert!(is_valid_object_name("object with spaces"));
assert!(is_valid_object_name("object_with_underscores"));
assert!(is_valid_object_name("object.with.dots"));
assert!(is_valid_object_name("single"));
assert!(is_valid_object_name("file.txt"));
assert!(is_valid_object_name("path/to/file.txt"));
assert!(is_valid_object_name("a/b/c/d/e/f"));
assert!(is_valid_object_name("object-123"));
assert!(is_valid_object_name("object(1)"));
assert!(is_valid_object_name("object[1]"));
assert!(is_valid_object_name("object@domain.com"));
// Invalid cases - empty string
assert!(!is_valid_object_name(""));
// Invalid cases - ends with slash (object names cannot end with slash)
assert!(!is_valid_object_name("object/"));
assert!(!is_valid_object_name("path/to/file/"));
assert!(!is_valid_object_name("ends/with/slash/"));
// Invalid cases - bad path components (inherited from is_valid_object_prefix)
assert!(!is_valid_object_name("."));
assert!(!is_valid_object_name(".."));
assert!(!is_valid_object_name("object/.."));
assert!(!is_valid_object_name("object/."));
assert!(!is_valid_object_name("../object"));
assert!(!is_valid_object_name("./object"));
assert!(!is_valid_object_name("path/../other"));
assert!(!is_valid_object_name("path/./other"));
assert!(!is_valid_object_name("a/../b/../c"));
assert!(!is_valid_object_name("a/./b/./c"));
// Invalid cases - double slashes
assert!(!is_valid_object_name("object//with//double//slashes"));
assert!(!is_valid_object_name("//leading/double/slash"));
assert!(!is_valid_object_name("trailing/double/slash//"));
// Invalid cases - null characters
assert!(!is_valid_object_name("object\x00with\x00null"));
assert!(!is_valid_object_name("object\x00"));
assert!(!is_valid_object_name("\x00object"));
// Invalid cases - overly long path (>32KB)
let long_path = "a/".repeat(16385); // 16385 * 2 = 32770 bytes, over 32KB (32768)
assert!(!is_valid_object_name(&long_path));
// Valid cases - prefixes that are valid for object names too
assert!(is_valid_object_name("prefix"));
assert!(is_valid_object_name("deep/nested/object"));
assert!(is_valid_object_name("normal_object"));
}
#[test]
fn test_is_valid_object_prefix() {
// Valid cases
assert!(is_valid_object_prefix("valid-prefix"));
assert!(is_valid_object_prefix(""));
assert!(is_valid_object_prefix("prefix/with/slashes"));
assert!(is_valid_object_prefix("prefix/"));
assert!(is_valid_object_prefix("deep/nested/prefix/"));
assert!(is_valid_object_prefix("normal-prefix"));
assert!(is_valid_object_prefix("prefix_with_underscores"));
assert!(is_valid_object_prefix("prefix.with.dots"));
// Invalid cases - bad path components
assert!(!is_valid_object_prefix("."));
assert!(!is_valid_object_prefix(".."));
assert!(!is_valid_object_prefix("prefix/.."));
assert!(!is_valid_object_prefix("prefix/."));
assert!(!is_valid_object_prefix("../prefix"));
assert!(!is_valid_object_prefix("./prefix"));
assert!(!is_valid_object_prefix("prefix/../other"));
assert!(!is_valid_object_prefix("prefix/./other"));
assert!(!is_valid_object_prefix("a/../b/../c"));
assert!(!is_valid_object_prefix("a/./b/./c"));
// Invalid cases - double slashes
assert!(!is_valid_object_prefix("prefix//with//double//slashes"));
assert!(!is_valid_object_prefix("//leading/double/slash"));
assert!(!is_valid_object_prefix("trailing/double/slash//"));
// Invalid cases - null characters
assert!(!is_valid_object_prefix("prefix\x00with\x00null"));
assert!(!is_valid_object_prefix("prefix\x00"));
assert!(!is_valid_object_prefix("\x00prefix"));
// Invalid cases - overly long path (>32KB)
let long_path = "a/".repeat(16385); // 16385 * 2 = 32770 bytes, over 32KB (32768)
assert!(!is_valid_object_prefix(&long_path));
}
#[test]
fn test_check_bucket_and_object_names() {
// Valid names
assert!(check_bucket_and_object_names("valid-bucket", "valid-object").is_ok());
// Invalid bucket names
assert!(check_bucket_and_object_names("", "valid-object").is_err());
assert!(check_bucket_and_object_names("INVALID", "valid-object").is_err());
// Invalid object names
assert!(check_bucket_and_object_names("valid-bucket", "").is_err());
}
#[test]
fn test_check_list_objs_args() {
assert!(check_list_objs_args("valid-bucket", "", &None).is_ok());
assert!(check_list_objs_args("", "", &None).is_err());
assert!(check_list_objs_args("INVALID", "", &None).is_err());
}
#[test]
fn test_check_multipart_args() {
assert!(check_new_multipart_args("valid-bucket", "valid-object").is_ok());
assert!(check_new_multipart_args("", "valid-object").is_err());
assert!(check_new_multipart_args("valid-bucket", "").is_err());
// Use valid base64 encoded upload_id
let valid_upload_id = "dXBsb2FkLWlk"; // base64 encoded "upload-id"
assert!(check_multipart_object_args("valid-bucket", "valid-object", valid_upload_id).is_ok());
assert!(check_multipart_object_args("", "valid-object", valid_upload_id).is_err());
assert!(check_multipart_object_args("valid-bucket", "", valid_upload_id).is_err());
// Empty string is valid base64 (decodes to empty vec), so this should pass bucket/object validation
// but fail on empty upload_id check in the function logic
assert!(check_multipart_object_args("valid-bucket", "valid-object", "").is_ok());
assert!(check_multipart_object_args("valid-bucket", "valid-object", "invalid-base64!").is_err());
}
#[test]
fn test_validation_functions_comprehensive() {
// Test object name validation edge cases
assert!(!is_valid_object_name(""));
assert!(is_valid_object_name("a"));
assert!(is_valid_object_name("test.txt"));
assert!(is_valid_object_name("folder/file.txt"));
assert!(is_valid_object_name("very-long-object-name-with-many-characters"));
// Test prefix validation
assert!(is_valid_object_prefix(""));
assert!(is_valid_object_prefix("prefix"));
assert!(is_valid_object_prefix("prefix/"));
assert!(is_valid_object_prefix("deep/nested/prefix/"));
}
#[test]
fn test_argument_validation_comprehensive() {
// Test bucket and object name validation
assert!(check_bucket_and_object_names("test-bucket", "test-object").is_ok());
assert!(check_bucket_and_object_names("test-bucket", "folder/test-object").is_ok());
// Test list objects arguments
assert!(check_list_objs_args("test-bucket", "prefix", &Some("marker".to_string())).is_ok());
assert!(check_list_objs_args("test-bucket", "", &None).is_ok());
// Test multipart upload arguments with valid base64 upload_id
let valid_upload_id = "dXBsb2FkLWlk"; // base64 encoded "upload-id"
assert!(check_put_object_part_args("test-bucket", "test-object", valid_upload_id).is_ok());
assert!(check_list_parts_args("test-bucket", "test-object", valid_upload_id).is_ok());
assert!(check_complete_multipart_args("test-bucket", "test-object", valid_upload_id).is_ok());
assert!(check_abort_multipart_args("test-bucket", "test-object", valid_upload_id).is_ok());
// Test put object arguments
assert!(check_put_object_args("test-bucket", "test-object").is_ok());
assert!(check_put_object_args("", "test-object").is_err());
assert!(check_put_object_args("test-bucket", "").is_err());
}
}

View File

@@ -37,11 +37,10 @@ impl VersioningApi for VersioningConfiguration {
return true;
}
if let Some(exclude_folders) = self.exclude_folders
&& exclude_folders
&& prefix.ends_with('/')
{
return false;
if let Some(exclude_folders) = self.exclude_folders {
if exclude_folders && prefix.ends_with('/') {
return false;
}
}
if let Some(ref excluded_prefixes) = self.excluded_prefixes {
@@ -68,11 +67,10 @@ impl VersioningApi for VersioningConfiguration {
return false;
}
if let Some(exclude_folders) = self.exclude_folders
&& exclude_folders
&& prefix.ends_with('/')
{
return true;
if let Some(exclude_folders) = self.exclude_folders {
if exclude_folders && prefix.ends_with('/') {
return true;
}
}
if let Some(ref excluded_prefixes) = self.excluded_prefixes {

View File

@@ -16,7 +16,7 @@ use crate::disk::error::DiskError;
use crate::disk::{self, DiskAPI, DiskStore, WalkDirOptions};
use futures::future::join_all;
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof};
use std::{future::Future, pin::Pin};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::spawn;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
@@ -71,14 +71,14 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
let mut jobs: Vec<tokio::task::JoinHandle<std::result::Result<(), DiskError>>> = Vec::new();
let mut readers = Vec::with_capacity(opts.disks.len());
let fds = opts.fallback_disks.iter().flatten().cloned().collect::<Vec<_>>();
let fds = Arc::new(opts.fallback_disks.clone());
let cancel_rx = CancellationToken::new();
for disk in opts.disks.iter() {
let opdisk = disk.clone();
let opts_clone = opts.clone();
let mut fds_clone = fds.clone();
let fds_clone = fds.clone();
let cancel_rx_clone = cancel_rx.clone();
let (rd, mut wr) = tokio::io::duplex(64);
readers.push(MetacacheReader::new(rd));
@@ -113,20 +113,21 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
}
while need_fallback {
let disk_op = {
if fds_clone.is_empty() {
None
} else {
let disk = fds_clone.remove(0);
if disk.is_online().await { Some(disk.clone()) } else { None }
// warn!("list_path_raw: while need_fallback start");
let disk = match fds_clone.iter().find(|d| d.is_some()) {
Some(d) => {
if let Some(disk) = d.clone() {
disk
} else {
warn!("list_path_raw: fallback disk is none");
break;
}
}
None => {
warn!("list_path_raw: fallback disk is none2");
break;
}
};
let Some(disk) = disk_op else {
warn!("list_path_raw: fallback disk is none");
break;
};
match disk
.as_ref()
.walk_dir(
@@ -308,10 +309,12 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
// Break if all at EOF or error.
if at_eof + has_err == readers.len() {
if has_err > 0
&& let Some(finished_fn) = opts.finished.as_ref()
{
finished_fn(&errs).await;
if has_err > 0 {
if let Some(finished_fn) = opts.finished.as_ref() {
if has_err > 0 {
finished_fn(&errs).await;
}
}
}
// error!("list_path_raw: at_eof + has_err == readers.len() break {:?}", &errs);

View File

@@ -0,0 +1,350 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use lazy_static::lazy_static;
use rustfs_checksums::ChecksumAlgorithm;
use std::collections::HashMap;
use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart};
use crate::{disk::DiskAPI, store_api::GetObjectReader};
use rustfs_utils::crypto::{base64_decode, base64_encode};
use s3s::header::{
X_AMZ_CHECKSUM_ALGORITHM, X_AMZ_CHECKSUM_CRC32, X_AMZ_CHECKSUM_CRC32C, X_AMZ_CHECKSUM_SHA1, X_AMZ_CHECKSUM_SHA256,
};
use enumset::{EnumSet, EnumSetType, enum_set};
#[derive(Debug, EnumSetType, Default)]
#[enumset(repr = "u8")]
pub enum ChecksumMode {
#[default]
ChecksumNone,
ChecksumSHA256,
ChecksumSHA1,
ChecksumCRC32,
ChecksumCRC32C,
ChecksumCRC64NVME,
ChecksumFullObject,
}
lazy_static! {
static ref C_ChecksumMask: EnumSet<ChecksumMode> = {
let mut s = EnumSet::all();
s.remove(ChecksumMode::ChecksumFullObject);
s
};
static ref C_ChecksumFullObjectCRC32: EnumSet<ChecksumMode> =
enum_set!(ChecksumMode::ChecksumCRC32 | ChecksumMode::ChecksumFullObject);
static ref C_ChecksumFullObjectCRC32C: EnumSet<ChecksumMode> =
enum_set!(ChecksumMode::ChecksumCRC32C | ChecksumMode::ChecksumFullObject);
}
const AMZ_CHECKSUM_CRC64NVME: &str = "x-amz-checksum-crc64nvme";
impl ChecksumMode {
//pub const CRC64_NVME_POLYNOMIAL: i64 = 0xad93d23594c93659;
pub fn base(&self) -> ChecksumMode {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
1_u8 => ChecksumMode::ChecksumNone,
2_u8 => ChecksumMode::ChecksumSHA256,
4_u8 => ChecksumMode::ChecksumSHA1,
8_u8 => ChecksumMode::ChecksumCRC32,
16_u8 => ChecksumMode::ChecksumCRC32C,
32_u8 => ChecksumMode::ChecksumCRC64NVME,
_ => panic!("enum err."),
}
}
pub fn is(&self, t: ChecksumMode) -> bool {
*self & t == t
}
pub fn key(&self) -> String {
//match c & checksumMask {
match self {
ChecksumMode::ChecksumCRC32 => {
return X_AMZ_CHECKSUM_CRC32.to_string();
}
ChecksumMode::ChecksumCRC32C => {
return X_AMZ_CHECKSUM_CRC32C.to_string();
}
ChecksumMode::ChecksumSHA1 => {
return X_AMZ_CHECKSUM_SHA1.to_string();
}
ChecksumMode::ChecksumSHA256 => {
return X_AMZ_CHECKSUM_SHA256.to_string();
}
ChecksumMode::ChecksumCRC64NVME => {
return AMZ_CHECKSUM_CRC64NVME.to_string();
}
_ => {
return "".to_string();
}
}
}
pub fn can_composite(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
2_u8 => true,
4_u8 => true,
8_u8 => true,
16_u8 => true,
_ => false,
}
}
pub fn can_merge_crc(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
8_u8 => true,
16_u8 => true,
32_u8 => true,
_ => false,
}
}
pub fn full_object_requested(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
//C_ChecksumFullObjectCRC32 as u8 => true,
//C_ChecksumFullObjectCRC32C as u8 => true,
32_u8 => true,
_ => false,
}
}
pub fn key_capitalized(&self) -> String {
self.key()
}
pub fn raw_byte_len(&self) -> usize {
let u = EnumSet::from(*self).intersection(*C_ChecksumMask).as_u8();
if u == ChecksumMode::ChecksumCRC32 as u8 || u == ChecksumMode::ChecksumCRC32C as u8 {
4
} else if u == ChecksumMode::ChecksumSHA1 as u8 {
use sha1::Digest;
sha1::Sha1::output_size() as usize
} else if u == ChecksumMode::ChecksumSHA256 as u8 {
use sha2::Digest;
sha2::Sha256::output_size() as usize
} else if u == ChecksumMode::ChecksumCRC64NVME as u8 {
8
} else {
0
}
}
pub fn hasher(&self) -> Result<Box<dyn rustfs_checksums::http::HttpChecksum>, std::io::Error> {
match /*C_ChecksumMask & **/self {
ChecksumMode::ChecksumCRC32 => {
return Ok(ChecksumAlgorithm::Crc32.into_impl());
}
ChecksumMode::ChecksumCRC32C => {
return Ok(ChecksumAlgorithm::Crc32c.into_impl());
}
ChecksumMode::ChecksumSHA1 => {
return Ok(ChecksumAlgorithm::Sha1.into_impl());
}
ChecksumMode::ChecksumSHA256 => {
return Ok(ChecksumAlgorithm::Sha256.into_impl());
}
ChecksumMode::ChecksumCRC64NVME => {
return Ok(ChecksumAlgorithm::Crc64Nvme.into_impl());
}
_ => return Err(std::io::Error::other("unsupported checksum type")),
}
}
pub fn is_set(&self) -> bool {
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
s.len() == 1
}
pub fn set_default(&mut self, t: ChecksumMode) {
if !self.is_set() {
*self = t;
}
}
pub fn encode_to_string(&self, b: &[u8]) -> Result<String, std::io::Error> {
if !self.is_set() {
return Ok("".to_string());
}
let mut h = self.hasher()?;
h.update(b);
let hash = h.finalize();
Ok(base64_encode(hash.as_ref()))
}
pub fn to_string(&self) -> String {
//match c & checksumMask {
match self {
ChecksumMode::ChecksumCRC32 => {
return "CRC32".to_string();
}
ChecksumMode::ChecksumCRC32C => {
return "CRC32C".to_string();
}
ChecksumMode::ChecksumSHA1 => {
return "SHA1".to_string();
}
ChecksumMode::ChecksumSHA256 => {
return "SHA256".to_string();
}
ChecksumMode::ChecksumNone => {
return "".to_string();
}
ChecksumMode::ChecksumCRC64NVME => {
return "CRC64NVME".to_string();
}
_ => {
return "<invalid>".to_string();
}
}
}
// pub fn check_sum_reader(&self, r: GetObjectReader) -> Result<Checksum, std::io::Error> {
// let mut h = self.hasher()?;
// Ok(Checksum::new(self.clone(), h.sum().as_bytes()))
// }
// pub fn check_sum_bytes(&self, b: &[u8]) -> Result<Checksum, std::io::Error> {
// let mut h = self.hasher()?;
// Ok(Checksum::new(self.clone(), h.sum().as_bytes()))
// }
pub fn composite_checksum(&self, p: &mut [ObjectPart]) -> Result<Checksum, std::io::Error> {
if !self.can_composite() {
return Err(std::io::Error::other("cannot do composite checksum"));
}
p.sort_by(|i, j| {
if i.part_num < j.part_num {
std::cmp::Ordering::Less
} else if i.part_num > j.part_num {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Equal
}
});
let c = self.base();
let crc_bytes = Vec::<u8>::with_capacity(p.len() * self.raw_byte_len() as usize);
let mut h = self.hasher()?;
h.update(crc_bytes.as_ref());
let hash = h.finalize();
Ok(Checksum {
checksum_type: self.clone(),
r: hash.as_ref().to_vec(),
computed: false,
})
}
pub fn full_object_checksum(&self, p: &mut [ObjectPart]) -> Result<Checksum, std::io::Error> {
todo!();
}
}
#[derive(Default)]
pub struct Checksum {
checksum_type: ChecksumMode,
r: Vec<u8>,
computed: bool,
}
#[allow(dead_code)]
impl Checksum {
fn new(t: ChecksumMode, b: &[u8]) -> Checksum {
if t.is_set() && b.len() == t.raw_byte_len() {
return Checksum {
checksum_type: t,
r: b.to_vec(),
computed: false,
};
}
Checksum::default()
}
#[allow(dead_code)]
fn new_checksum_string(t: ChecksumMode, s: &str) -> Result<Checksum, std::io::Error> {
let b = match base64_decode(s.as_bytes()) {
Ok(b) => b,
Err(err) => return Err(std::io::Error::other(err.to_string())),
};
if t.is_set() && b.len() == t.raw_byte_len() {
return Ok(Checksum {
checksum_type: t,
r: b,
computed: false,
});
}
Ok(Checksum::default())
}
fn is_set(&self) -> bool {
self.checksum_type.is_set() && self.r.len() == self.checksum_type.raw_byte_len()
}
fn encoded(&self) -> String {
if !self.is_set() {
return "".to_string();
}
base64_encode(&self.r)
}
#[allow(dead_code)]
fn raw(&self) -> Option<Vec<u8>> {
if !self.is_set() {
return None;
}
Some(self.r.clone())
}
}
pub fn add_auto_checksum_headers(opts: &mut PutObjectOptions) {
opts.user_metadata
.insert("X-Amz-Checksum-Algorithm".to_string(), opts.auto_checksum.to_string());
if opts.auto_checksum.full_object_requested() {
opts.user_metadata
.insert("X-Amz-Checksum-Type".to_string(), "FULL_OBJECT".to_string());
}
}
pub fn apply_auto_checksum(opts: &mut PutObjectOptions, all_parts: &mut [ObjectPart]) -> Result<(), std::io::Error> {
if opts.auto_checksum.can_composite() && !opts.auto_checksum.is(ChecksumMode::ChecksumFullObject) {
let crc = opts.auto_checksum.composite_checksum(all_parts)?;
opts.user_metadata = {
let mut hm = HashMap::new();
hm.insert(opts.auto_checksum.key(), crc.encoded());
hm
}
} else if opts.auto_checksum.can_merge_crc() {
let crc = opts.auto_checksum.full_object_checksum(all_parts)?;
opts.user_metadata = {
let mut hm = HashMap::new();
hm.insert(opts.auto_checksum.key_capitalized(), crc.encoded());
hm.insert("X-Amz-Checksum-Type".to_string(), "FULL_OBJECT".to_string());
hm
}
}
Ok(())
}

View File

@@ -0,0 +1,270 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// use crate::error::StdError;
// use bytes::Bytes;
// use futures::pin_mut;
// use futures::stream::{Stream, StreamExt};
// use std::future::Future;
// use std::pin::Pin;
// use std::task::{Context, Poll};
// use transform_stream::AsyncTryStream;
// pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
// pub struct ChunkedStream<'a> {
// /// inner
// inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'a, Result<(), StdError>>>,
// remaining_length: usize,
// }
// impl<'a> ChunkedStream<'a> {
// pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
// where
// S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'a,
// {
// let inner = AsyncTryStream::<_, _, SyncBoxFuture<'a, Result<(), StdError>>>::new(|mut y| {
// #[allow(clippy::shadow_same)] // necessary for `pin_mut!`
// Box::pin(async move {
// pin_mut!(body);
// // Data left over from the previous call
// let mut prev_bytes = Bytes::new();
// let mut read_size = 0;
// loop {
// let data: Vec<Bytes> = {
// // Read a fixed-size chunk
// match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
// None => break,
// Some(Err(e)) => return Err(e),
// Some(Ok((data, remaining_bytes))) => {
// // debug!(
// // "content_length:{},read_size:{}, read_data data:{}, remaining_bytes: {} ",
// // content_length,
// // read_size,
// // data.len(),
// // remaining_bytes.len()
// // );
// prev_bytes = remaining_bytes;
// data
// }
// }
// };
// for bytes in data {
// read_size += bytes.len();
// // debug!("read_size {}, content_length {}", read_size, content_length,);
// y.yield_ok(bytes).await;
// }
// if read_size + prev_bytes.len() >= content_length {
// // debug!(
// // "Finished reading: read_size:{} + prev_bytes.len({}) == content_length {}",
// // read_size,
// // prev_bytes.len(),
// // content_length,
// // );
// // Pad with zeros?
// if !need_padding {
// y.yield_ok(prev_bytes).await;
// break;
// }
// let mut bytes = vec![0u8; chunk_size];
// let (left, _) = bytes.split_at_mut(prev_bytes.len());
// left.copy_from_slice(&prev_bytes);
// y.yield_ok(Bytes::from(bytes)).await;
// break;
// }
// }
// // debug!("chunked stream exit");
// Ok(())
// })
// });
// Self {
// inner,
// remaining_length: content_length,
// }
// }
// /// read data and return remaining bytes
// async fn read_data<S>(
// mut body: Pin<&mut S>,
// prev_bytes: Bytes,
// data_size: usize,
// ) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
// where
// S: Stream<Item = Result<Bytes, StdError>> + Send,
// {
// let mut bytes_buffer = Vec::new();
// // Run only once
// let mut push_data_bytes = |mut bytes: Bytes| {
// // debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len());
// if bytes.is_empty() {
// return None;
// }
// if data_size == 0 {
// return Some(bytes);
// }
// // Merge with the previous data
// if !prev_bytes.is_empty() {
// let need_size = data_size.wrapping_sub(prev_bytes.len());
// // debug!(
// // "Previous leftover {}, take {} now, total: {}",
// // prev_bytes.len(),
// // need_size,
// // prev_bytes.len() + need_size
// // );
// if bytes.len() >= need_size {
// let data = bytes.split_to(need_size);
// let mut combined = Vec::new();
// combined.extend_from_slice(&prev_bytes);
// combined.extend_from_slice(&data);
// // debug!(
// // "Fetched more bytes than needed: {}, merged result {}, remaining bytes {}",
// // need_size,
// // combined.len(),
// // bytes.len(),
// // );
// bytes_buffer.push(Bytes::from(combined));
// } else {
// let mut combined = Vec::new();
// combined.extend_from_slice(&prev_bytes);
// combined.extend_from_slice(&bytes);
// // debug!(
// // "Fetched fewer bytes than needed: {}, merged result {}, remaining bytes {}, return immediately",
// // need_size,
// // combined.len(),
// // bytes.len(),
// // );
// return Some(Bytes::from(combined));
// }
// }
// // If the fetched data exceeds the chunk, slice the required size
// if data_size <= bytes.len() {
// let n = bytes.len() / data_size;
// for _ in 0..n {
// let data = bytes.split_to(data_size);
// // println!("bytes_buffer.push: {}, remaining: {}", data.len(), bytes.len());
// bytes_buffer.push(data);
// }
// Some(bytes)
// } else {
// // Insufficient data
// Some(bytes)
// }
// };
// // Remaining data
// let remaining_bytes = 'outer: {
// // // Exit if the previous data was sufficient
// // if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
// // println!("Consuming leftovers");
// // break 'outer remaining_bytes;
// // }
// loop {
// match body.next().await? {
// Err(e) => return Some(Err(e)),
// Ok(bytes) => {
// if let Some(remaining_bytes) = push_data_bytes(bytes) {
// break 'outer remaining_bytes;
// }
// }
// }
// }
// };
// Some(Ok((bytes_buffer, remaining_bytes)))
// }
// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, StdError>>> {
// let ans = Pin::new(&mut self.inner).poll_next(cx);
// if let Poll::Ready(Some(Ok(ref bytes))) = ans {
// self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
// }
// ans
// }
// // pub fn exact_remaining_length(&self) -> usize {
// // self.remaining_length
// // }
// }
// impl Stream for ChunkedStream<'_> {
// type Item = Result<Bytes, StdError>;
// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// self.poll(cx)
// }
// fn size_hint(&self) -> (usize, Option<usize>) {
// (0, None)
// }
// }
// #[cfg(test)]
// mod test {
// use super::*;
// #[tokio::test]
// async fn test_chunked_stream() {
// let chunk_size = 4;
// let data1 = vec![1u8; 7777]; // 65536
// let data2 = vec![1u8; 7777]; // 65536
// let content_length = data1.len() + data2.len();
// let chunk1 = Bytes::from(data1);
// let chunk2 = Bytes::from(data2);
// let chunk_results: Vec<Result<Bytes, _>> = vec![Ok(chunk1), Ok(chunk2)];
// let stream = futures::stream::iter(chunk_results);
// let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true);
// loop {
// let ans1 = chunked_stream.next().await;
// if ans1.is_none() {
// break;
// }
// let bytes = ans1.unwrap().unwrap();
// assert!(bytes.len() == chunk_size)
// }
// // assert_eq!(ans1.unwrap(), chunk1_data.as_slice());
// }
// }

View File

@@ -41,7 +41,7 @@ use crate::{
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
pub struct RemoveBucketOptions {
_forced_delete: bool,
_forced_elete: bool,
}
#[derive(Debug)]

View File

@@ -0,0 +1,59 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, sync::Arc};
use crate::{
disk::{
error::{is_unformatted_disk, DiskError},
format::{DistributionAlgoVersion, FormatV3},
new_disk, DiskAPI, DiskInfo, DiskOption, DiskStore,
},
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult,
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
},
credentials::{Credentials, SignatureType,},
api_put_object_multipart::UploadPartParams,
};
use http::HeaderMap;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{error, info};
use url::Url;
struct HookReader {
source: GetObjectReader,
hook: GetObjectReader,
}
impl HookReader {
pub fn new(source: GetObjectReader, hook: GetObjectReader) -> HookReader {
HookReader {
source,
hook,
}
}
fn seek(&self, offset: i64, whence: i64) -> Result<i64> {
todo!();
}
fn read(&self, b: &[u8]) -> Result<i64> {
todo!();
}
}

View File

@@ -132,25 +132,6 @@ pub enum BucketLookupType {
BucketLookupPath,
}
fn load_root_store_from_tls_path() -> Option<rustls::RootCertStore> {
// Load the root certificate bundle from the path specified by the
// RUSTFS_TLS_PATH environment variable.
let tp = std::env::var("RUSTFS_TLS_PATH").ok()?;
let ca = std::path::Path::new(&tp).join(rustfs_config::RUSTFS_CA_CERT);
if !ca.exists() {
return None;
}
let der_list = rustfs_utils::load_cert_bundle_der_bytes(ca.to_str().unwrap_or_default()).ok()?;
let mut store = rustls::RootCertStore::empty();
for der in der_list {
if let Err(e) = store.add(der.into()) {
warn!("Warning: failed to add certificate from '{}' to root store: {e}", ca.display());
}
}
Some(store)
}
impl TransitionClient {
pub async fn new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
let clnt = Self::private_new(endpoint, opts, tier_type).await?;
@@ -161,22 +142,18 @@ impl TransitionClient {
async fn private_new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
let endpoint_url = get_endpoint_url(endpoint, opts.secure)?;
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
//#[cfg(feature = "ring")]
let _ = rustls::crypto::ring::default_provider().install_default();
//#[cfg(feature = "aws-lc-rs")]
// let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let scheme = endpoint_url.scheme();
let client;
let tls = if let Some(store) = load_root_store_from_tls_path() {
rustls::ClientConfig::builder()
.with_root_certificates(store)
.with_no_client_auth()
} else {
rustls::ClientConfig::builder().with_native_roots()?.with_no_client_auth()
};
let tls = rustls::ClientConfig::builder().with_native_roots()?.with_no_client_auth();
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(tls)
.https_or_http()
.enable_http1()
.enable_http2()
.build();
client = Client::builder(TokioExecutor::new()).build(https);

Some files were not shown because too many files have changed in this diff Show More