support multi node

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-09-11 11:21:01 +08:00
parent 51fe557b2f
commit 128fe42d98
15 changed files with 562 additions and 137 deletions

98
Cargo.lock generated
View File

@@ -94,9 +94,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.86"
version = "1.0.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356"
[[package]]
name = "arrayvec"
@@ -205,6 +205,17 @@ dependencies = [
"tower-service",
]
[[package]]
name = "backon"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4fa97bb310c33c811334143cf64c5bb2b7b3c06e453db6b095d7061eff8f113"
dependencies = [
"fastrand",
"gloo-timers",
"tokio",
]
[[package]]
name = "backtrace"
version = "0.3.73"
@@ -257,6 +268,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
version = "1.5.0"
@@ -420,6 +437,7 @@ name = "ecstore"
version = "0.1.0"
dependencies = [
"async-trait",
"backon",
"base64-simd",
"byteorder",
"bytes",
@@ -657,6 +675,18 @@ version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "h2"
version = "0.4.5"
@@ -882,6 +912,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "js-sys"
version = "0.3.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -2291,6 +2330,61 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5"
dependencies = [
"cfg-if",
"once_cell",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -11,6 +11,7 @@ version = "0.0.1"
[workspace.dependencies]
async-trait = "0.1.80"
backon = "1.2.0"
bytes = "1.6.0"
clap = { version = "4.5.7", features = ["derive"] }
ecstore = { path = "./ecstore" }

View File

@@ -419,6 +419,28 @@ pub struct ReadXlResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteVersionsRequest {
#[prost(string, tag = "1")]
pub disk: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub volume: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "3")]
pub versions: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, tag = "4")]
pub opts: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteVersionsResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(string, repeated, tag = "2")]
pub errors: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag = "3")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadMultipleRequest {
#[prost(string, tag = "1")]
pub disk: ::prost::alloc::string::String,
@@ -1048,6 +1070,31 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "ReadXL"));
self.inner.unary(req, path, codec).await
}
pub async fn delete_versions(
&mut self,
request: impl tonic::IntoRequest<super::DeleteVersionsRequest>,
) -> std::result::Result<
tonic::Response<super::DeleteVersionsResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/node_service.NodeService/DeleteVersions",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "DeleteVersions"));
self.inner.unary(req, path, codec).await
}
pub async fn read_multiple(
&mut self,
request: impl tonic::IntoRequest<super::ReadMultipleRequest>,
@@ -1232,6 +1279,13 @@ pub mod node_service_server {
&self,
request: tonic::Request<super::ReadXlRequest>,
) -> std::result::Result<tonic::Response<super::ReadXlResponse>, tonic::Status>;
async fn delete_versions(
&self,
request: tonic::Request<super::DeleteVersionsRequest>,
) -> std::result::Result<
tonic::Response<super::DeleteVersionsResponse>,
tonic::Status,
>;
async fn read_multiple(
&self,
request: tonic::Request<super::ReadMultipleRequest>,
@@ -2264,6 +2318,51 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/DeleteVersions" => {
#[allow(non_camel_case_types)]
struct DeleteVersionsSvc<T: NodeService>(pub Arc<T>);
impl<
T: NodeService,
> tonic::server::UnaryService<super::DeleteVersionsRequest>
for DeleteVersionsSvc<T> {
type Response = super::DeleteVersionsResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::DeleteVersionsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NodeService>::delete_versions(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = DeleteVersionsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/ReadMultiple" => {
#[allow(non_camel_case_types)]
struct ReadMultipleSvc<T: NodeService>(pub Arc<T>);

View File

@@ -258,6 +258,19 @@ message ReadXLResponse {
optional string error_info = 3;
}
message DeleteVersionsRequest {
string disk = 1;
string volume = 2;
repeated string versions = 3;
string opts = 4;
}
message DeleteVersionsResponse {
bool success = 1;
repeated string errors = 2;
optional string error_info = 3;
}
message ReadMultipleRequest {
string disk = 1;
string read_multiple_req = 2;
@@ -308,6 +321,7 @@ service NodeService {
rpc WriteMetadata(WriteMetadataRequest) returns (WriteMetadataResponse) {};
rpc ReadVersion(ReadVersionRequest) returns (ReadVersionResponse) {};
rpc ReadXL(ReadXLRequest) returns (ReadXLResponse) {};
rpc DeleteVersions(DeleteVersionsRequest) returns (DeleteVersionsResponse) {};
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
}

View File

@@ -1,15 +0,0 @@
[package]
name = "e2e_test"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flatbuffers.workspace = true
protos.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }

View File

@@ -1,5 +0,0 @@
#[async_trait::async_trait]
pub trait Coordinator: Send + Sync {
async fn ping();
async fn create_bucket() -> Result<>
}

View File

@@ -4,13 +4,14 @@ use ecstore::disk::VolumeInfo;
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_client::NodeServiceClient, ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse,
node_service_client::NodeServiceClient, ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
},
};
use std::error::Error;
use tonic::Request;
async fn get_client() -> Result<NodeServiceClient<tonic::transport::Channel>, Box<dyn Error>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
Ok(NodeServiceClient::connect("http://localhost:9000").await?)
}
@@ -30,7 +31,7 @@ async fn ping() -> Result<(), Box<dyn Error>> {
assert!(decoded_payload.is_ok());
// 创建客户端
let mut client = NodeServiceClient::connect("http://localhost:9000").await?;
let mut client = get_client().await?;
// 构造 PingRequest
let request = Request::new(PingRequest {
@@ -86,3 +87,20 @@ async fn list_volumes() -> Result<(), Box<dyn Error>> {
println!("{:?}", volume_infos);
Ok(())
}
#[tokio::test]
async fn read_all() -> Result<(), Box<dyn Error>> {
let mut client = get_client().await?;
let request = Request::new(ReadAllRequest {
disk: "data".to_string(),
volume: "ff".to_string(),
path: "format.json".to_string(),
});
let response = client.read_all(request).await?.into_inner();
let volume_infos = response.data;
println!("{}", response.success);
println!("{:?}", volume_infos);
Ok(())
}

View File

@@ -9,7 +9,7 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true, features = ["io-util"] }
backon.workspace = true
bytes.workspace = true
thiserror.workspace = true
futures.workspace = true
@@ -38,6 +38,7 @@ base64-simd = "0.8.0"
sha2 = "0.10.8"
hex-simd = "0.8.0"
path-clean = "1.0.1"
tokio = { workspace = true, features = ["io-util"] }
tokio-stream = "0.1.15"
tonic.workspace = true
tower.workspace = true

View File

@@ -28,7 +28,6 @@ use tokio::{
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tonic::{transport::Channel, Request};
use tower::timeout::Timeout;
use uuid::Uuid;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
@@ -96,7 +95,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct FileInfoVersions {
// Name of the volume.
pub volume: String,
@@ -350,17 +349,11 @@ pub struct RemoteFileWriter {
pub volume: String,
pub path: String,
pub is_append: bool,
client: NodeServiceClient<Timeout<Channel>>,
client: NodeServiceClient<Channel>,
}
impl RemoteFileWriter {
pub fn new(
root: PathBuf,
volume: String,
path: String,
is_append: bool,
client: NodeServiceClient<Timeout<Channel>>,
) -> Self {
pub fn new(root: PathBuf, volume: String, path: String, is_append: bool, client: NodeServiceClient<Channel>) -> Self {
Self {
root,
volume,
@@ -433,11 +426,11 @@ pub struct RemoteFileReader {
pub root: PathBuf,
pub volume: String,
pub path: String,
client: NodeServiceClient<Timeout<Channel>>,
client: NodeServiceClient<Channel>,
}
impl RemoteFileReader {
pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient<Timeout<Channel>>) -> Self {
pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient<Channel>) -> Self {
Self {
root,
volume,

View File

@@ -1,17 +1,18 @@
use std::{path::PathBuf, sync::RwLock, time::Duration};
use std::{path::PathBuf, sync::Arc, time::Duration};
use bytes::Bytes;
use futures::lock::Mutex;
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
node_service_client::NodeServiceClient, DeleteRequest, DeleteVolumeRequest, ListDirRequest, ListVolumesRequest,
MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest,
RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
node_service_client::NodeServiceClient, DeleteRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest,
ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest,
ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest,
WriteMetadataRequest,
},
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use tokio::fs;
use tokio::{fs, sync::RwLock};
use tonic::{
transport::{Channel, Endpoint as tonic_Endpoint},
Request,
@@ -21,6 +22,7 @@ use tracing::info;
use uuid::Uuid;
use crate::{
disk::error::DiskError,
error::{Error, Result},
store_api::{FileInfo, RawFileInfo},
};
@@ -34,7 +36,7 @@ use super::{
#[derive(Debug)]
pub struct RemoteDisk {
id: Mutex<Option<Uuid>>,
channel: RwLock<Option<Channel>>,
channel: Arc<RwLock<Option<Channel>>>,
url: url::Url,
pub root: PathBuf,
}
@@ -44,39 +46,49 @@ impl RemoteDisk {
let root = fs::canonicalize(ep.url.path()).await?;
Ok(Self {
channel: RwLock::new(None),
channel: Arc::new(RwLock::new(None)),
url: ep.url.clone(),
root,
id: Mutex::new(None),
})
}
fn get_client(&self) -> NodeServiceClient<Timeout<Channel>> {
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = self.channel.read().unwrap();
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
info!("disk url: {:?}", addr);
let connector = tonic_Endpoint::from_shared(addr).unwrap();
info!("disk url: {}", addr);
let connector = tonic_Endpoint::from_shared(addr.clone())?;
let new_channel = tokio::runtime::Runtime::new().unwrap().block_on(connector.connect()).unwrap();
let new_channel = connector.connect().await.map_err(|_err| DiskError::DiskNotFound)?;
*self.channel.write().unwrap() = Some(new_channel.clone());
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
node_service_time_out_client(
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
)
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
Ok(NodeServiceClient::connect(addr).await?)
}
}
@@ -104,7 +116,8 @@ impl DiskAPI for RemoteDisk {
}
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
let mut client = self.get_client();
info!("read_all");
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -113,11 +126,18 @@ impl DiskAPI for RemoteDisk {
let response = client.read_all(request).await?.into_inner();
info!("read_all success");
if !response.success {
return Err(DiskError::FileNotFound.into());
}
Ok(Bytes::from(response.data))
}
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
let mut client = self.get_client();
info!("write_all");
let mut client = self.get_client_v2().await?;
let request = Request::new(WriteAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -125,14 +145,19 @@ impl DiskAPI for RemoteDisk {
data,
});
let _response = client.write_all(request).await?.into_inner();
let response = client.write_all(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
info!("delete");
let options = serde_json::to_string(&opt)?;
let mut client = self.get_client();
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -140,13 +165,18 @@ impl DiskAPI for RemoteDisk {
options,
});
let _response = client.delete(request).await?.into_inner();
let response = client.delete(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
let mut client = self.get_client();
info!("rename_file");
let mut client = self.get_client_v2().await?;
let request = Request::new(RenameFileRequst {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -155,42 +185,50 @@ impl DiskAPI for RemoteDisk {
dst_path: dst_path.to_string(),
});
let _response = client.rename_file(request).await?.into_inner();
let response = client.rename_file(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
info!("create_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
self.get_client(),
self.get_client_v2().await?,
)))
}
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
self.get_client(),
self.get_client_v2().await?,
)))
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(FileReader::Remote(RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
self.get_client(),
self.get_client_v2().await?,
)))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
let mut client = self.get_client();
info!("list_dir");
let mut client = self.get_client_v2().await?;
let request = Request::new(ListDirRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -198,18 +236,28 @@ impl DiskAPI for RemoteDisk {
let response = client.list_dir(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(response.volumes)
}
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
info!("walk_dir");
let walk_dir_options = serde_json::to_string(&opts)?;
let mut client = self.get_client();
let mut client = self.get_client_v2().await?;
let request = Request::new(WalkDirRequest {
disk: self.root.to_string_lossy().to_string(),
walk_dir_options,
});
let response = client.walk_dir(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let entries = response
.meta_cache_entry
.into_iter()
@@ -227,8 +275,9 @@ impl DiskAPI for RemoteDisk {
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
info!("rename_data");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client();
let mut client = self.get_client_v2().await?;
let request = Request::new(RenameDataRequest {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
@@ -239,42 +288,63 @@ impl DiskAPI for RemoteDisk {
});
let response = client.rename_data(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let rename_data_resp = serde_json::from_str::<RenameDataResp>(&response.rename_data_resp)?;
Ok(rename_data_resp)
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
let mut client = self.get_client();
info!("make_volumes");
let mut client = self.get_client_v2().await?;
let request = Request::new(MakeVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
});
let _response = client.make_volumes(request).await?.into_inner();
let response = client.make_volumes(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn make_volume(&self, volume: &str) -> Result<()> {
let mut client = self.get_client();
info!("make_volume");
let mut client = self.get_client_v2().await?;
let request = Request::new(MakeVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let _response = client.make_volume(request).await?.into_inner();
let response = client.make_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
let mut client = self.get_client();
info!("list_volumes");
let mut client = self.get_client_v2().await?;
let request = Request::new(ListVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
});
let response = client.list_volumes(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let infos = response
.volume_infos
.into_iter()
@@ -285,21 +355,28 @@ impl DiskAPI for RemoteDisk {
}
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
let mut client = self.get_client();
info!("stat_volume");
let mut client = self.get_client_v2().await?;
let request = Request::new(StatVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.stat_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let volume_info = serde_json::from_str::<VolumeInfo>(&response.volume_info)?;
Ok(volume_info)
}
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
info!("write_metadata");
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client();
let mut client = self.get_client_v2().await?;
let request = Request::new(WriteMetadataRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -307,7 +384,11 @@ impl DiskAPI for RemoteDisk {
file_info,
});
let _response = client.write_metadata(request).await?.into_inner();
let response = client.write_metadata(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}
@@ -320,8 +401,9 @@ impl DiskAPI for RemoteDisk {
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
info!("read_version");
let opts = serde_json::to_string(opts)?;
let mut client = self.get_client();
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadVersionRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -331,13 +413,19 @@ impl DiskAPI for RemoteDisk {
});
let response = client.read_version(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let file_info = serde_json::from_str::<FileInfo>(&response.file_info)?;
Ok(file_info)
}
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
let mut client = self.get_client();
info!("read_xl");
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadXlRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
@@ -346,6 +434,11 @@ impl DiskAPI for RemoteDisk {
});
let response = client.read_xl(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let raw_file_info = serde_json::from_str::<RawFileInfo>(&response.raw_file_info)?;
Ok(raw_file_info)
@@ -353,22 +446,61 @@ impl DiskAPI for RemoteDisk {
async fn delete_versions(
&self,
_volume: &str,
_versions: Vec<FileInfoVersions>,
_opts: DeleteOptions,
volume: &str,
versions: Vec<FileInfoVersions>,
opts: DeleteOptions,
) -> Result<Vec<Option<Error>>> {
unimplemented!()
info!("delete_versions");
let opts = serde_json::to_string(&opts)?;
let mut versions_str = Vec::with_capacity(versions.len());
for file_info_versions in versions.iter() {
versions_str.push(serde_json::to_string(file_info_versions)?);
}
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteVersionsRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
versions: versions_str,
opts,
});
let response = client.delete_versions(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(format!(
"delete versions remote err: {}",
response.error_info.unwrap_or("None".to_string())
)));
}
let errors = response
.errors
.iter()
.map(|error| {
if error.is_empty() {
None
} else {
Some(Error::from_string(error))
}
})
.collect();
Ok(errors)
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
info!("read_multiple");
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = self.get_client();
let mut client = self.get_client_v2().await?;
let request = Request::new(ReadMultipleRequest {
disk: self.root.to_string_lossy().to_string(),
read_multiple_req,
});
let response = client.read_multiple(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let read_multiple_resps = response
.read_multiple_resps
.into_iter()
@@ -379,13 +511,18 @@ impl DiskAPI for RemoteDisk {
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
let mut client = self.get_client();
info!("delete_volume");
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let _response = client.delete_volume(request).await?.into_inner();
let response = client.delete_volume(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
Ok(())
}

View File

@@ -1,12 +1,15 @@
use async_trait::async_trait;
use futures::future::join_all;
use protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest};
use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
use tonic::Request;
use tracing::warn;
use tower::timeout::Timeout;
use tracing::{info, warn};
use crate::store::all_local_disk;
use crate::{
@@ -386,22 +389,58 @@ impl PeerS3Client for LocalPeerS3Client {
#[derive(Debug)]
pub struct RemotePeerS3Client {
pub _node: Option<Node>,
pub node: Option<Node>,
pub pools: Option<Vec<usize>>,
pub channel: Channel,
connector: Endpoint,
channel: Arc<RwLock<Option<Channel>>>,
}
impl RemotePeerS3Client {
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
let connector =
Endpoint::from_shared(format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default())).unwrap();
let channel = tokio::runtime::Runtime::new().unwrap().block_on(connector.connect()).unwrap();
Self {
_node: node,
node,
pools,
channel,
connector,
channel: Arc::new(RwLock::new(None)),
}
}
#[allow(dead_code)]
async fn get_client(&self) -> Result<NodeServiceClient<Timeout<Channel>>> {
let channel_clone = self.channel.clone();
let channel = {
let read_lock = channel_clone.read().await;
if let Some(ref channel) = *read_lock {
channel.clone()
} else {
let new_channel = self.connector.connect().await?;
info!("get channel success");
*self.channel.write().await = Some(new_channel.clone());
new_channel
}
};
Ok(node_service_time_out_client(
channel,
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
))
}
async fn get_client_v2(&self) -> Result<NodeServiceClient<tonic::transport::Channel>> {
// Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?)
// let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap());
let addr = format!("{}", self.node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default());
Ok(NodeServiceClient::connect(addr).await?)
}
}
#[async_trait]
@@ -411,13 +450,7 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let mut client = self.get_client_v2().await?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
let bucket_infos = response
@@ -430,32 +463,23 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let mut client = self.get_client_v2().await?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
});
let _response = client.make_bucket(request).await?.into_inner();
let response = client.make_bucket(request).await?.into_inner();
// TODO: deal with error
if !response.success {
warn!("make bucket error: {:?}", response.error_info);
}
Ok(())
}
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let mut client = self.get_client_v2().await?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
@@ -467,13 +491,7 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let mut client = self.get_client_v2().await?;
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});

View File

@@ -14,16 +14,18 @@ use crate::{
},
store_init, utils,
};
use backon::{ExponentialBuilder, Retryable};
use futures::future::join_all;
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use time::OffsetDateTime;
use tokio::{fs, sync::RwLock};
use tracing::{debug, warn};
use tracing::{debug, info, warn};
use uuid::Uuid;
use lazy_static::lazy_static;
@@ -168,6 +170,8 @@ impl ECStore {
let mut local_disks = Vec::new();
info!("endpoint_pools: {:?}", endpoint_pools);
for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() {
// TODO: read from config parseStorageClass
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
@@ -183,17 +187,23 @@ impl ECStore {
DiskError::check_disk_fatal_errs(&errs)?;
let fm = store_init::connect_load_init_formats(
first_is_local,
&disks,
pool_eps.set_count,
pool_eps.drives_per_set,
deployment_id,
)
let fm = (|| async {
store_init::connect_load_init_formats(
first_is_local,
&disks,
pool_eps.set_count,
pool_eps.drives_per_set,
deployment_id,
)
.await
})
.retry(ExponentialBuilder::default().with_max_times(usize::MAX))
.sleep(tokio::time::sleep)
.notify(|err, dur: Duration| {
info!("retrying get formats {:?} after {:?}", err, dur);
})
.await?;
// TODO: 失败 重试 等试 3次
if deployment_id.is_none() {
deployment_id = Some(fm.id);
}

View File

@@ -133,7 +133,7 @@ fn get_format_erasure_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3
warn!("get_format_erasure_in_quorum fi: {:?}", &formats);
if *max_drives == 0 || *max_count < formats.len() / 2 {
if *max_drives == 0 || *max_count <= formats.len() / 2 {
warn!(
"*max_drives == 0 || *max_count < formats.len() / 2, {} || {}<{}",
max_drives,

View File

@@ -1,5 +1,5 @@
use ecstore::{
disk::{DeleteOptions, DiskStore, ReadMultipleReq, ReadOptions, WalkDirOptions},
disk::{DeleteOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, WalkDirOptions},
erasure::{ReadAt, Write},
peer::{LocalPeerS3Client, PeerS3Client},
store::{all_local_disk_path, find_local_disk},
@@ -12,14 +12,14 @@ use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse,
GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse,
ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse,
MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest,
ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest,
ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, StatVolumeRequest,
StatVolumeResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest,
WriteMetadataResponse, WriteRequest, WriteResponse,
DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVersionsRequest, DeleteVersionsResponse,
DeleteVolumeRequest, DeleteVolumeResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest,
ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest,
MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest,
PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse,
ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse,
RenameFileRequst, RenameFileResponse, StatVolumeRequest, StatVolumeResponse, WalkDirRequest, WalkDirResponse,
WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse,
},
};
@@ -201,6 +201,8 @@ impl Node for NodeService {
}
async fn read_all(&self, request: Request<ReadAllRequest>) -> Result<Response<ReadAllResponse>, Status> {
debug!("read all");
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_all(&request.volume, &request.path).await {
@@ -693,6 +695,63 @@ impl Node for NodeService {
}
}
async fn delete_versions(&self, request: Request<DeleteVersionsRequest>) -> Result<Response<DeleteVersionsResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let mut versions = Vec::with_capacity(request.versions.len());
for version in request.versions.iter() {
match serde_json::from_str::<FileInfoVersions>(&version) {
Ok(version) => versions.push(version),
Err(_) => {
return Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some("can not decode FileInfoVersions".to_string()),
}));
}
};
}
let opts = match serde_json::from_str::<DeleteOptions>(&request.opts) {
Ok(opts) => opts,
Err(_) => {
return Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk.delete_versions(&request.volume, versions, opts).await {
Ok(errors) => {
let errors = errors
.into_iter()
.map(|error| match error {
Some(e) => e.to_string(),
None => "".to_string(),
})
.collect();
Ok(tonic::Response::new(DeleteVersionsResponse {
success: true,
errors,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DeleteVersionsResponse {
success: false,
errors: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_multiple(&self, request: Request<ReadMultipleRequest>) -> Result<Response<ReadMultipleResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {

View File

@@ -167,6 +167,7 @@ async fn run(opt: config::Opt) -> Result<()> {
warn!(" init store");
// init store
ECStore::new(opt.address.clone(), endpoint_pools.clone()).await?;
warn!(" init store success!");
tokio::select! {
_ = tokio::signal::ctrl_c() => {