test global_disks

This commit is contained in:
weisd
2024-09-10 17:32:39 +08:00
parent c930fb45f1
commit fc533c7b2b
2 changed files with 75 additions and 69 deletions

View File

@@ -69,84 +69,90 @@ async fn run(opt: config::Opt) -> Result<()> {
// 用于rpc
let (endpoint_pools, _) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())?;
// Setup S3 service
// 本项目使用s3s库来实现s3服务
let service = {
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), endpoint_pools).await?);
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new().await?);
//设置AK和SK
//其中部份内容从config配置文件中读取
let mut access_key = String::from_str(config::DEFAULT_ACCESS_KEY).unwrap();
let mut secret_key = String::from_str(config::DEFAULT_SECRET_KEY).unwrap();
tokio::spawn(async move {
// Setup S3 service
// 本项目使用s3s库来实现s3服务
let service = {
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), endpoint_pools).await?);
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new());
//设置AK和SK
//其中部份内容从config配置文件中读取
let mut access_key = String::from_str(config::DEFAULT_ACCESS_KEY).unwrap();
let mut secret_key = String::from_str(config::DEFAULT_SECRET_KEY).unwrap();
// Enable authentication
if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) {
access_key = ak;
secret_key = sk;
}
//显示info信息
info!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(SimpleAuth::from_single(access_key, secret_key));
// Enable parsing virtual-hosted-style requests
if let Some(dm) = opt.domain_name {
info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
b.set_base_domain(dm);
}
// if domain_name.is_some() {
// info!(
// "virtual-hosted-style requests are enabled use domain_name {}",
// domain_name.as_ref().unwrap()
// );
// b.set_base_domain(domain_name.unwrap());
// }
b.build()
};
let hyper_service = service.into_shared();
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
info!("server is running at http://{local_addr}");
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
// Enable authentication
if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) {
access_key = ak;
secret_key = sk;
}
_ = ctrl_c.as_mut() => {
break;
//显示info信息
info!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(SimpleAuth::from_single(access_key, secret_key));
// Enable parsing virtual-hosted-style requests
if let Some(dm) = opt.domain_name {
info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
b.set_base_domain(dm);
}
// if domain_name.is_some() {
// info!(
// "virtual-hosted-style requests are enabled use domain_name {}",
// domain_name.as_ref().unwrap()
// );
// b.set_base_domain(domain_name.unwrap());
// }
b.build()
};
let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
let hyper_service = service.into_shared();
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
info!("server is running at http://{local_addr}");
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
});
warn!(" init store");
// init store
ECStore::new(opt.address.clone(), endpoint_pools.clone()).await?;
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
_ = tokio::signal::ctrl_c() => {
}
}

View File

@@ -45,9 +45,9 @@ pub struct FS {
}
impl FS {
pub async fn new() -> Result<Self> {
pub fn new() -> Self {
// let store: ECStore = ECStore::new(address, endpoint_pools).await?;
Ok(Self {})
Self {}
}
}
#[async_trait::async_trait]