mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
refactor(app): complete phase 5 gate and equivalence guards (#1979)
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
use crate::admin::{
|
||||
handlers::{bucket_meta, heal, health, kms, pools, profile_admin, quota, rebalance, replication, sts, system, tier, user},
|
||||
router::{AdminOperation, S3Router},
|
||||
rpc,
|
||||
};
|
||||
use crate::server::{ADMIN_PREFIX, HEALTH_PREFIX, HEALTH_READY_PATH, PROFILE_CPU_PATH, PROFILE_MEMORY_PATH};
|
||||
use hyper::Method;
|
||||
@@ -49,6 +50,7 @@ fn test_register_routes_cover_representative_admin_paths() {
|
||||
replication::register_replication_route(&mut router).expect("register replication route");
|
||||
profile_admin::register_profiling_route(&mut router).expect("register profile route");
|
||||
kms::register_kms_route(&mut router).expect("register kms route");
|
||||
rpc::register_rpc_route(&mut router).expect("register rpc route");
|
||||
|
||||
assert_route(&router, Method::GET, HEALTH_PREFIX);
|
||||
assert_route(&router, Method::HEAD, HEALTH_PREFIX);
|
||||
@@ -100,4 +102,44 @@ fn test_register_routes_cover_representative_admin_paths() {
|
||||
assert_route(&router, Method::POST, &admin_path("/v3/kms/keys"));
|
||||
assert_route(&router, Method::GET, &admin_path("/v3/kms/keys"));
|
||||
assert_route(&router, Method::GET, &admin_path("/v3/kms/keys/test-key"));
|
||||
assert_route(&router, Method::GET, "/rustfs/rpc/read_file_stream");
|
||||
assert_route(&router, Method::HEAD, "/rustfs/rpc/read_file_stream");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_phase5_admin_info_and_rpc_read_file_contract() {
|
||||
let system_src = include_str!("handlers/system.rs");
|
||||
let rpc_src = include_str!("rpc.rs");
|
||||
|
||||
let server_info_impl_marker = "impl Operation for ServerInfoHandler";
|
||||
let server_info_impl_start = system_src
|
||||
.find(server_info_impl_marker)
|
||||
.expect("Expected impl Operation for ServerInfoHandler in handlers/system.rs");
|
||||
let server_info_impl_block = &system_src[server_info_impl_start..];
|
||||
|
||||
assert!(
|
||||
server_info_impl_block.contains("DefaultAdminUsecase::from_global()")
|
||||
&& server_info_impl_block.contains("execute_query_server_info(QueryServerInfoRequest { include_pools: true })"),
|
||||
"admin server info path must be served through DefaultAdminUsecase::execute_query_server_info"
|
||||
);
|
||||
|
||||
let register_route_marker = "pub fn register_rpc_route";
|
||||
let register_route_start = rpc_src
|
||||
.find(register_route_marker)
|
||||
.expect("Expected register_rpc_route in rpc.rs");
|
||||
let register_route_block = &rpc_src[register_route_start..];
|
||||
|
||||
let read_file_marker = "pub struct ReadFile {}";
|
||||
let read_file_start = rpc_src.find(read_file_marker).expect("Expected ReadFile operation in rpc.rs");
|
||||
let read_file_block = &rpc_src[read_file_start..];
|
||||
|
||||
assert!(
|
||||
register_route_block.contains("format!(\"{}{}\", RPC_PREFIX, \"/read_file_stream\")"),
|
||||
"rpc read_file_stream route path must remain registered with RPC_PREFIX"
|
||||
);
|
||||
|
||||
assert!(
|
||||
read_file_block.contains(".read_file_stream(&query.volume, &query.path, query.offset, query.length)"),
|
||||
"rpc read_file_stream route must remain wired to disk.read_file_stream"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -384,14 +384,14 @@ pub fn default_buffer_config_interface() -> Arc<dyn BufferConfigInterface> {
|
||||
Arc::new(BufferConfigHandle)
|
||||
}
|
||||
|
||||
static GLOBAL_APP_CONTEXT: OnceLock<Arc<AppContext>> = OnceLock::new();
|
||||
static APP_CONTEXT_SINGLETON: OnceLock<Arc<AppContext>> = OnceLock::new();
|
||||
|
||||
/// Initialize global application context once and return the canonical instance.
|
||||
pub fn init_global_app_context(context: AppContext) -> Arc<AppContext> {
|
||||
GLOBAL_APP_CONTEXT.get_or_init(|| Arc::new(context)).clone()
|
||||
APP_CONTEXT_SINGLETON.get_or_init(|| Arc::new(context)).clone()
|
||||
}
|
||||
|
||||
/// Get global application context if it has been initialized.
|
||||
pub fn get_global_app_context() -> Option<Arc<AppContext>> {
|
||||
GLOBAL_APP_CONTEXT.get().cloned()
|
||||
APP_CONTEXT_SINGLETON.get().cloned()
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use std::sync::OnceLock;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
/// Global buffer configuration that can be set at application startup
|
||||
static GLOBAL_BUFFER_CONFIG: OnceLock<RustFSBufferConfig> = OnceLock::new();
|
||||
static BUFFER_CONFIG_SINGLETON: OnceLock<RustFSBufferConfig> = OnceLock::new();
|
||||
|
||||
/// Global flag indicating whether buffer profiles are enabled
|
||||
static BUFFER_PROFILE_ENABLED: AtomicBool = AtomicBool::new(false);
|
||||
@@ -60,14 +60,14 @@ pub fn is_buffer_profile_enabled() -> bool {
|
||||
/// init_global_buffer_config(RustFSBufferConfig::new(WorkloadProfile::AiTraining));
|
||||
/// ```
|
||||
pub fn init_global_buffer_config(config: RustFSBufferConfig) {
|
||||
let _ = GLOBAL_BUFFER_CONFIG.set(config);
|
||||
let _ = BUFFER_CONFIG_SINGLETON.set(config);
|
||||
}
|
||||
|
||||
/// Get the global buffer configuration
|
||||
///
|
||||
/// Returns the configured profile, or GeneralPurpose if not initialized.
|
||||
pub fn get_global_buffer_config() -> &'static RustFSBufferConfig {
|
||||
GLOBAL_BUFFER_CONFIG.get_or_init(RustFSBufferConfig::default)
|
||||
BUFFER_CONFIG_SINGLETON.get_or_init(RustFSBufferConfig::default)
|
||||
}
|
||||
|
||||
/// Workload profile types that define buffer sizing strategies
|
||||
|
||||
@@ -359,6 +359,44 @@ mod tests {
|
||||
set_buffer_profile_enabled(false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_phase5_s3_entrypoints_delegate_to_usecases() {
|
||||
fn assert_delegates_within_method(src: &str, signature: &str, delegation_call: &str, error_msg: &str) {
|
||||
let sig_pos = src
|
||||
.find(signature)
|
||||
.unwrap_or_else(|| panic!("Expected to find method signature: {signature}"));
|
||||
|
||||
let after_sig = &src[sig_pos + signature.len()..];
|
||||
let method_body_end_rel = after_sig.find("async fn ").unwrap_or(after_sig.len());
|
||||
let method_body = &after_sig[..method_body_end_rel];
|
||||
|
||||
assert!(method_body.contains(delegation_call), "{error_msg}");
|
||||
}
|
||||
|
||||
let src = include_str!("ecfs.rs");
|
||||
|
||||
assert_delegates_within_method(
|
||||
src,
|
||||
"async fn put_object(&self, req: S3Request<PutObjectInput>)",
|
||||
"usecase.execute_put_object(self, req).await",
|
||||
"put_object must delegate to DefaultObjectUsecase::execute_put_object",
|
||||
);
|
||||
|
||||
assert_delegates_within_method(
|
||||
src,
|
||||
"async fn get_object(&self, req: S3Request<GetObjectInput>)",
|
||||
"usecase.execute_get_object(req).await",
|
||||
"get_object must delegate to DefaultObjectUsecase::execute_get_object",
|
||||
);
|
||||
|
||||
assert_delegates_within_method(
|
||||
src,
|
||||
"async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>)",
|
||||
"usecase.execute_list_objects_v2(req).await",
|
||||
"list_objects_v2 must delegate to DefaultBucketUsecase::execute_list_objects_v2",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_list_object_unordered_with_delimiter() {
|
||||
// [1] Normal case: No delimiter specified.
|
||||
|
||||
Reference in New Issue
Block a user