This commit is contained in:
安正超
2025-11-11 09:34:58 +08:00
committed by GitHub
parent 77a3489ed2
commit 8a020ec4d9
8 changed files with 403 additions and 97 deletions

View File

@@ -139,6 +139,8 @@ observability. If you want to start redis as well as nginx container, you can sp
make help-docker # Show all Docker-related commands
```
> **Heads-up (macOS cross-compilation)**: macOS keeps the default `ulimit -n` at 256, so `cargo zigbuild` or `./build-rustfs.sh --platform ...` may fail with `ProcessFdQuotaExceeded` when targeting Linux. The build script now tries to raise the limit automatically, but if you still see the warning, run `ulimit -n 4096` (or higher) in your shell before building.
4. **Build with helm chart(Option 4) - Cloud Native environment**
Following the instructions on [helm chart README](./helm/README.md) to install RustFS on kubernetes cluster.
@@ -207,4 +209,3 @@ top charts.
[Apache 2.0](https://opensource.org/licenses/Apache-2.0)
**RustFS** is a trademark of RustFS, Inc. All other trademarks are the property of their respective owners.

View File

@@ -113,12 +113,14 @@ RustFS 是一个使用 Rust全球最受欢迎的编程语言之一构建
你也可以使用 Makefile 提供的目标命令以提升便捷性:
```bash
make docker-buildx # 本地构建
make docker-buildx-push # 构建并推送
make docker-buildx-version VERSION=v1.0.0 # 构建指定版本
make help-docker # 显示全部 Docker 相关命令
```
```bash
make docker-buildx # 本地构建
make docker-buildx-push # 构建并推送
make docker-buildx-version VERSION=v1.0.0 # 构建指定版本
make help-docker # 显示全部 Docker 相关命令
```
> **提示macOS 交叉编译)**macOS 默认的 `ulimit -n` 只有 256使用 `cargo zigbuild` 或 `./build-rustfs.sh --platform ...` 编译 Linux 目标时容易触发 `ProcessFdQuotaExceeded` 链接错误。脚本会尝试自动提升该限制,如仍提示失败,请在构建前手动执行 `ulimit -n 4096`(或更大的值)。
4. **使用 Helm Chart 部署(方案四)- 云原生环境**

View File

@@ -163,6 +163,35 @@ print_message() {
echo -e "${color}${message}${NC}"
}
# Prevent zig/ld from hitting macOS file descriptor defaults during linking
ensure_file_descriptor_limit() {
local required_limit=4096
local current_limit
current_limit=$(ulimit -Sn 2>/dev/null || echo "")
if [ -z "$current_limit" ] || [ "$current_limit" = "unlimited" ]; then
return
fi
if (( current_limit >= required_limit )); then
return
fi
local hard_limit target_limit
hard_limit=$(ulimit -Hn 2>/dev/null || echo "")
target_limit=$required_limit
if [ -n "$hard_limit" ] && [ "$hard_limit" != "unlimited" ] && (( hard_limit < required_limit )); then
target_limit=$hard_limit
fi
if ulimit -Sn "$target_limit" 2>/dev/null; then
print_message $YELLOW "🔧 Increased open file limit from $current_limit to $target_limit to avoid ProcessFdQuotaExceeded"
else
print_message $YELLOW "⚠️ Unable to raise ulimit -n automatically (current: $current_limit, needed: $required_limit). Please run 'ulimit -n $required_limit' manually before building."
fi
}
# Get version from git
get_version() {
if git describe --abbrev=0 --tags >/dev/null 2>&1; then
@@ -570,10 +599,11 @@ main() {
fi
fi
ensure_file_descriptor_limit
# Start build process
build_rustfs
}
# Run main function
main

View File

@@ -34,9 +34,10 @@ use rustfs_protos::{
};
use std::{
collections::{HashMap, HashSet},
time::SystemTime,
time::{Duration, SystemTime},
};
use time::OffsetDateTime;
use tokio::time::timeout;
use tonic::Request;
use tracing::warn;
@@ -44,6 +45,8 @@ use shadow_rs::shadow;
shadow!(build);
const SERVER_PING_TIMEOUT: Duration = Duration::from_secs(1);
// pub const ITEM_OFFLINE: &str = "offline";
// pub const ITEM_INITIALIZING: &str = "initializing";
// pub const ITEM_ONLINE: &str = "online";
@@ -83,42 +86,45 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
endpoint.url.host_str().unwrap(),
endpoint.url.port().unwrap()
);
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let ping_task = async {
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let finished_data = fbb.finished_data();
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let finished_data = fbb.finished_data();
// Create the client
let mut client = node_service_time_out_client(&addr)
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let mut client = node_service_time_out_client(&addr)
.await
.map_err(|err| Error::other(err.to_string()))?;
let request = Request::new(PingRequest {
version: 1,
body: bytes::Bytes::copy_from_slice(finished_data),
});
let response: PingResponse = client.ping(request).await?.into_inner();
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{e}");
} else {
println!("ping_resp:body(flatbuffer): {ping_response_body:?}");
}
Ok(())
};
timeout(SERVER_PING_TIMEOUT, ping_task)
.await
.map_err(|err| Error::other(err.to_string()))?;
// Build the PingRequest
let request = Request::new(PingRequest {
version: 1,
body: bytes::Bytes::copy_from_slice(finished_data),
});
// Send the request and obtain the response
let response: PingResponse = client.ping(request).await?.into_inner();
// Print the response
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{e}");
} else {
println!("ping_resp:body(flatbuffer): {ping_response_body:?}");
}
Ok(())
.map_err(|_| Error::other("server ping timeout"))?
}
pub async fn get_local_server_property() -> ServerProperties {

View File

@@ -26,9 +26,11 @@ use rustfs_madmin::metrics::RealtimeMetrics;
use rustfs_madmin::net::NetInfo;
use rustfs_madmin::{ItemState, ServerProperties};
use std::collections::hash_map::DefaultHasher;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::sync::OnceLock;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use tokio::time::timeout;
use tracing::{error, warn};
lazy_static! {
@@ -220,24 +222,21 @@ impl NotificationSys {
pub async fn server_info(&self) -> Vec<ServerProperties> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
let endpoints = get_global_endpoints();
let peer_timeout = Duration::from_secs(2);
for client in self.peer_clients.iter() {
let endpoints = endpoints.clone();
futures.push(async move {
if let Some(client) = client {
match client.server_info().await {
Ok(info) => info,
Err(_) => ServerProperties {
uptime: SystemTime::now()
.duration_since(*GLOBAL_BOOT_TIME.get().unwrap())
.unwrap_or_default()
.as_secs(),
version: get_commit_id(),
endpoint: client.host.to_string(),
state: ItemState::Offline.to_string().to_owned(),
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),
..Default::default()
},
}
let host = client.host.to_string();
call_peer_with_timeout(
peer_timeout,
&host,
|| client.server_info(),
|| offline_server_properties(&host, &endpoints),
)
.await
} else {
ServerProperties::default()
}
@@ -694,6 +693,43 @@ impl NotificationSys {
}
}
async fn call_peer_with_timeout<F, Fut>(
timeout_dur: Duration,
host_label: &str,
op: F,
fallback: impl FnOnce() -> ServerProperties,
) -> ServerProperties
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ServerProperties>> + Send,
{
match timeout(timeout_dur, op()).await {
Ok(Ok(info)) => info,
Ok(Err(err)) => {
warn!("peer {host_label} server_info failed: {err}");
fallback()
}
Err(_) => {
warn!("peer {host_label} server_info timed out after {:?}", timeout_dur);
fallback()
}
}
}
fn offline_server_properties(host: &str, endpoints: &EndpointServerPools) -> ServerProperties {
ServerProperties {
uptime: SystemTime::now()
.duration_since(*GLOBAL_BOOT_TIME.get().unwrap())
.unwrap_or_default()
.as_secs(),
version: get_commit_id(),
endpoint: host.to_string(),
state: ItemState::Offline.to_string().to_owned(),
disks: get_offline_disks(host, endpoints),
..Default::default()
}
}
fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec<rustfs_madmin::Disk> {
let mut offline_disks = Vec::new();
@@ -714,3 +750,57 @@ fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec
offline_disks
}
#[cfg(test)]
mod tests {
use super::*;
fn build_props(endpoint: &str) -> ServerProperties {
ServerProperties {
endpoint: endpoint.to_string(),
..Default::default()
}
}
#[tokio::test]
async fn call_peer_with_timeout_returns_value_when_fast() {
let result = call_peer_with_timeout(
Duration::from_millis(50),
"peer-1",
|| async { Ok::<_, Error>(build_props("fast")) },
|| build_props("fallback"),
)
.await;
assert_eq!(result.endpoint, "fast");
}
#[tokio::test]
async fn call_peer_with_timeout_uses_fallback_on_error() {
let result = call_peer_with_timeout(
Duration::from_millis(50),
"peer-2",
|| async { Err::<ServerProperties, _>(Error::other("boom")) },
|| build_props("fallback"),
)
.await;
assert_eq!(result.endpoint, "fallback");
}
#[tokio::test]
async fn call_peer_with_timeout_uses_fallback_on_timeout() {
let result = call_peer_with_timeout(
Duration::from_millis(5),
"peer-3",
|| async {
tokio::time::sleep(Duration::from_millis(25)).await;
Ok::<_, Error>(build_props("slow"))
},
|| build_props("fallback"),
)
.await;
assert_eq!(result.endpoint, "fallback");
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};
use bytes::Bytes;
use futures::lock::Mutex;
@@ -40,7 +40,7 @@ use crate::{
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_rio::{HttpReader, HttpWriter};
use tokio::io::AsyncWrite;
use tokio::{io::AsyncWrite, net::TcpStream, time::timeout};
use tonic::Request;
use tracing::info;
use uuid::Uuid;
@@ -54,6 +54,8 @@ pub struct RemoteDisk {
endpoint: Endpoint,
}
const REMOTE_DISK_ONLINE_PROBE_TIMEOUT: Duration = Duration::from_millis(750);
impl RemoteDisk {
pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result<Self> {
// let root = fs::canonicalize(ep.url.path()).await?;
@@ -83,11 +85,19 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(skip(self))]
async fn is_online(&self) -> bool {
// TODO: connection status tracking
if node_service_time_out_client(&self.addr).await.is_ok() {
return true;
let Some(host) = self.endpoint.url.host_str().map(|host| host.to_string()) else {
return false;
};
let port = self.endpoint.url.port_or_known_default().unwrap_or(80);
match timeout(REMOTE_DISK_ONLINE_PROBE_TIMEOUT, TcpStream::connect((host, port))).await {
Ok(Ok(stream)) => {
drop(stream);
true
}
_ => false,
}
false
}
#[tracing::instrument(skip(self))]
@@ -957,6 +967,7 @@ impl DiskAPI for RemoteDisk {
#[cfg(test)]
mod tests {
use super::*;
use tokio::net::TcpListener;
use uuid::Uuid;
#[tokio::test]
@@ -1040,6 +1051,58 @@ mod tests {
assert!(path.to_string_lossy().contains("storage"));
}
#[tokio::test]
async fn test_remote_disk_is_online_detects_active_listener() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = url::Url::parse(&format!("http://{}:{}/data/rustfs0", addr.ip(), addr.port())).unwrap();
let endpoint = Endpoint {
url,
is_local: false,
pool_idx: 0,
set_idx: 0,
disk_idx: 0,
};
let disk_option = DiskOption {
cleanup: false,
health_check: false,
};
let remote_disk = RemoteDisk::new(&endpoint, &disk_option).await.unwrap();
assert!(remote_disk.is_online().await);
drop(listener);
}
#[tokio::test]
async fn test_remote_disk_is_online_detects_missing_listener() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let ip = addr.ip();
let port = addr.port();
drop(listener);
let url = url::Url::parse(&format!("http://{}:{}/data/rustfs0", ip, port)).unwrap();
let endpoint = Endpoint {
url,
is_local: false,
pool_idx: 0,
set_idx: 0,
disk_idx: 0,
};
let disk_option = DiskOption {
cleanup: false,
health_check: false,
};
let remote_disk = RemoteDisk::new(&endpoint, &disk_option).await.unwrap();
assert!(!remote_disk.is_online().await);
}
#[tokio::test]
async fn test_remote_disk_disk_id() {
let url = url::Url::parse("http://remote-server:9000").unwrap();

View File

@@ -88,7 +88,7 @@ use s3s::header::X_AMZ_RESTORE;
use sha2::{Digest, Sha256};
use std::hash::Hash;
use std::mem::{self};
use std::time::SystemTime;
use std::time::{Instant, SystemTime};
use std::{
collections::{HashMap, HashSet},
io::{Cursor, Write},
@@ -104,7 +104,7 @@ use tokio::{
use tokio::{
select,
sync::mpsc::{self, Sender},
time::interval,
time::{interval, timeout},
};
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -113,6 +113,8 @@ use uuid::Uuid;
pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024;
pub const MAX_PARTS_COUNT: usize = 10000;
const DISK_ONLINE_TIMEOUT: Duration = Duration::from_secs(1);
const DISK_HEALTH_CACHE_TTL: Duration = Duration::from_millis(750);
#[derive(Clone, Debug)]
pub struct SetDisks {
@@ -125,6 +127,23 @@ pub struct SetDisks {
pub set_index: usize,
pub pool_index: usize,
pub format: FormatV3,
disk_health_cache: Arc<RwLock<Vec<Option<DiskHealthEntry>>>>,
}
#[derive(Clone, Debug)]
struct DiskHealthEntry {
last_check: Instant,
online: bool,
}
impl DiskHealthEntry {
fn cached_value(&self) -> Option<bool> {
if self.last_check.elapsed() <= DISK_HEALTH_CACHE_TTL {
Some(self.online)
} else {
None
}
}
}
impl SetDisks {
@@ -150,8 +169,60 @@ impl SetDisks {
pool_index,
format,
set_endpoints,
disk_health_cache: Arc::new(RwLock::new(Vec::new())),
})
}
async fn cached_disk_health(&self, index: usize) -> Option<bool> {
let cache = self.disk_health_cache.read().await;
cache
.get(index)
.and_then(|entry| entry.as_ref().and_then(|state| state.cached_value()))
}
async fn update_disk_health(&self, index: usize, online: bool) {
let mut cache = self.disk_health_cache.write().await;
if cache.len() <= index {
cache.resize(index + 1, None);
}
cache[index] = Some(DiskHealthEntry {
last_check: Instant::now(),
online,
});
}
async fn is_disk_online_cached(&self, index: usize, disk: &DiskStore) -> bool {
if let Some(online) = self.cached_disk_health(index).await {
return online;
}
let disk_clone = disk.clone();
let online = timeout(DISK_ONLINE_TIMEOUT, async move { disk_clone.is_online().await })
.await
.unwrap_or(false);
self.update_disk_health(index, online).await;
online
}
async fn filter_online_disks(&self, disks: Vec<Option<DiskStore>>) -> (Vec<Option<DiskStore>>, usize) {
let mut filtered = Vec::with_capacity(disks.len());
let mut online_count = 0;
for (idx, disk) in disks.into_iter().enumerate() {
if let Some(disk_store) = disk {
if self.is_disk_online_cached(idx, &disk_store).await {
filtered.push(Some(disk_store));
online_count += 1;
} else {
filtered.push(None);
}
} else {
filtered.push(None);
}
}
(filtered, online_count)
}
fn format_lock_error(&self, bucket: &str, object: &str, mode: &str, err: &LockResult) -> String {
match err {
LockResult::Timeout => {
@@ -187,25 +258,9 @@ impl SetDisks {
}
async fn get_online_disks(&self) -> Vec<Option<DiskStore>> {
let mut disks = self.get_disks_internal().await;
// TODO: diskinfo filter online
let mut new_disk = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if let Some(d) = disk {
if d.is_online().await {
new_disk.push(disk.clone());
}
}
}
let mut rng = rand::rng();
disks.shuffle(&mut rng);
new_disk
let disks = self.get_disks_internal().await;
let (filtered, _) = self.filter_online_disks(disks).await;
filtered.into_iter().filter(|disk| disk.is_some()).collect()
}
async fn get_online_local_disks(&self) -> Vec<Option<DiskStore>> {
let mut disks = self.get_online_disks().await;
@@ -3581,7 +3636,8 @@ impl ObjectIO for SetDisks {
#[tracing::instrument(level = "debug", skip(self, data,))]
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
let disks = self.disks.read().await;
let disks_snapshot = self.get_disks_internal().await;
let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
let _object_lock_guard = if !opts.no_lock {
@@ -3622,6 +3678,14 @@ impl ObjectIO for SetDisks {
write_quorum += 1
}
if filtered_online < write_quorum {
warn!(
"online disk snapshot {} below write quorum {} for {}/{}; returning erasure write quorum error",
filtered_online, write_quorum, bucket, object
);
return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
}
let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives);
fi.version_id = {
@@ -4901,7 +4965,16 @@ impl StorageAPI for SetDisks {
return Err(Error::other(format!("checksum mismatch: {checksum}")));
}
let disks = self.disks.read().await.clone();
let disks_snapshot = self.get_disks_internal().await;
let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
if filtered_online < write_quorum {
warn!(
"online disk snapshot {} below write quorum {} for multipart {}/{}; returning erasure write quorum error",
filtered_online, write_quorum, bucket, object
);
return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
}
let shuffle_disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
@@ -6562,6 +6635,26 @@ mod tests {
use std::collections::HashMap;
use time::OffsetDateTime;
#[test]
fn disk_health_entry_returns_cached_value_within_ttl() {
let entry = DiskHealthEntry {
last_check: Instant::now(),
online: true,
};
assert_eq!(entry.cached_value(), Some(true));
}
#[test]
fn disk_health_entry_expires_after_ttl() {
let entry = DiskHealthEntry {
last_check: Instant::now() - (DISK_HEALTH_CACHE_TTL + Duration::from_millis(100)),
online: true,
};
assert!(entry.cached_value().is_none());
}
#[test]
fn test_check_part_constants() {
// Test that all CHECK_PART constants have expected values

View File

@@ -14,6 +14,8 @@
use bytes::Bytes;
use futures::{Stream, StreamExt, pin_mut};
#[cfg(test)]
use std::sync::MutexGuard;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
@@ -71,18 +73,41 @@ fn clear_dns_cache() {
}
#[cfg(test)]
pub fn set_mock_dns_resolver<F>(resolver: F)
where
F: Fn(&str) -> std::io::Result<HashSet<IpAddr>> + Send + Sync + 'static,
{
*CUSTOM_DNS_RESOLVER.write().unwrap() = Some(Arc::new(resolver));
static DNS_RESOLVER_TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
#[cfg(test)]
fn reset_dns_resolver_inner() {
*CUSTOM_DNS_RESOLVER.write().unwrap() = None;
clear_dns_cache();
}
#[cfg(test)]
pub fn reset_dns_resolver() {
*CUSTOM_DNS_RESOLVER.write().unwrap() = None;
pub struct MockResolverGuard {
_lock: MutexGuard<'static, ()>,
}
#[cfg(test)]
impl Drop for MockResolverGuard {
fn drop(&mut self) {
reset_dns_resolver_inner();
}
}
#[cfg(test)]
pub fn set_mock_dns_resolver<F>(resolver: F) -> MockResolverGuard
where
F: Fn(&str) -> std::io::Result<HashSet<IpAddr>> + Send + Sync + 'static,
{
let lock = DNS_RESOLVER_TEST_LOCK.lock().unwrap();
*CUSTOM_DNS_RESOLVER.write().unwrap() = Some(Arc::new(resolver));
clear_dns_cache();
MockResolverGuard { _lock: lock }
}
#[cfg(test)]
pub fn reset_dns_resolver() {
let _lock = DNS_RESOLVER_TEST_LOCK.lock().unwrap();
reset_dns_resolver_inner();
}
/// helper for validating if the provided arg is an ip address.
@@ -403,7 +428,7 @@ mod test {
#[test]
fn test_is_local_host() {
set_mock_dns_resolver(mock_resolver);
let _resolver_guard = set_mock_dns_resolver(mock_resolver);
// Test localhost domain
let localhost_host = Host::Domain("localhost");
@@ -429,13 +454,11 @@ mod test {
// Test invalid domain should return error
let invalid_host = Host::Domain("invalid.nonexistent.domain.example");
assert!(is_local_host(invalid_host, 0, 0).is_err());
reset_dns_resolver();
}
#[tokio::test]
async fn test_get_host_ip() {
set_mock_dns_resolver(mock_resolver);
let _resolver_guard = set_mock_dns_resolver(mock_resolver);
// Test IPv4 address
let ipv4_host = Host::Ipv4(Ipv4Addr::new(192, 168, 1, 1));
@@ -462,8 +485,6 @@ mod test {
// Test invalid domain
let invalid_host = Host::Domain("invalid.nonexistent.domain.example");
assert!(get_host_ip(invalid_host).await.is_err());
reset_dns_resolver();
}
#[test]