support remote disk

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-08-29 18:28:47 +08:00
parent 38fafed013
commit a8e546c9af
13 changed files with 2688 additions and 67 deletions

3
Cargo.lock generated
View File

@@ -407,8 +407,10 @@ dependencies = [
name = "e2e_test"
version = "0.0.1"
dependencies = [
"ecstore",
"flatbuffers",
"protos",
"serde_json",
"tokio",
"tonic",
]
@@ -1550,6 +1552,7 @@ dependencies = [
"protobuf",
"protos",
"s3s",
"serde_json",
"time",
"tokio",
"tonic",

File diff suppressed because it is too large Load Diff

View File

@@ -26,9 +26,259 @@ message MakeBucketResponse {
optional string error_info = 2;
}
message ReadAllRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
}
message ReadAllResponse {
bool success = 1;
bytes data = 2;
optional string error_info = 3;
}
message WriteAllRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
bytes data = 4;
}
message WriteAllResponse {
bool success = 1;
optional string error_info = 2;
}
message DeleteRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
string options = 4;
}
message DeleteResponse {
bool success = 1;
optional string error_info = 2;
}
message RenameFileRequst {
string disk = 1;
string src_volume = 2;
string src_path = 3;
string dst_volume = 4;
string dst_path = 5;
}
message RenameFileResponse {
bool success = 1;
optional string error_info = 2;
}
message WriteRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
bool is_append = 4;
bytes data = 5;
}
message WriteResponse {
bool success = 1;
optional string error_info = 2;
}
// message AppendRequest {
// string disk = 1; // indicate which one in the disks
// string volume = 2;
// string path = 3;
// bytes data = 4;
// }
//
// message AppendResponse {
// bool success = 1;
// optional string error_info = 2;
// }
message ReadAtRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
int64 offset = 4;
int64 length = 5;
}
message ReadAtResponse {
bool success = 1;
bytes data = 2;
int64 read_size = 3;
optional string error_info = 4;
}
message ListDirRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
}
message ListDirResponse {
bool success = 1;
repeated string volumes = 2;
optional string error_info = 3;
}
message WalkDirRequest {
string disk = 1; // indicate which one in the disks
string walk_dir_options = 2;
}
message WalkDirResponse {
bool success = 1;
repeated string meta_cache_entry = 2;
optional string error_info = 3;
}
message RenameDataRequest {
string disk = 1; // indicate which one in the disks
string src_volume = 2;
string src_path = 3;
string file_info = 4;
string dst_volume = 5;
string dst_path = 6;
}
message RenameDataResponse {
bool success = 1;
string rename_data_resp = 2;
optional string error_info = 3;
}
message MakeVolumesRequest {
string disk = 1; // indicate which one in the disks
repeated string volumes = 2;
}
message MakeVolumesResponse {
bool success = 1;
optional string error_info = 2;
}
message MakeVolumeRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
}
message MakeVolumeResponse {
bool success = 1;
optional string error_info = 2;
}
message ListVolumesRequest {
string disk = 1; // indicate which one in the disks
}
message ListVolumesResponse {
bool success = 1;
repeated string volume_infos = 2;
optional string error_info = 3;
}
message StatVolumeRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
}
message StatVolumeResponse {
bool success = 1;
string volume_info = 2;
optional string error_info = 3;
}
message WriteMetadataRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
string path = 3;
string file_info = 4;
}
message WriteMetadataResponse {
bool success = 1;
optional string error_info = 2;
}
message ReadVersionRequest {
string disk = 1;
string volume = 2;
string path = 3;
string version_id = 4;
string opts = 5;
}
message ReadVersionResponse {
bool success = 1;
string file_info = 2;
optional string error_info = 3;
}
message ReadXLRequest {
string disk = 1;
string volume = 2;
string path = 3;
bool read_data = 4;
}
message ReadXLResponse {
bool success = 1;
string raw_file_info = 2;
optional string error_info = 3;
}
message ReadMultipleRequest {
string disk = 1;
string read_multiple_req = 2;
}
message ReadMultipleResponse {
bool success = 1;
repeated string read_multiple_resps = 2;
optional string error_info = 3;
}
message DeleteVolumeRequest {
string disk = 1;
string volume = 2;
}
message DeleteVolumeResponse {
bool success = 1;
optional string error_info = 2;
}
/* -------------------------------------------------------------------- */
service NodeService {
/* -------------------------------meta service-------------------------- */
rpc Ping(PingRequest) returns (PingResponse) {};
rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {};
}
/* -------------------------------disk service-------------------------- */
rpc ReadAll(ReadAllRequest) returns (ReadAllResponse) {};
rpc WriteAll(WriteAllRequest) returns (WriteAllResponse) {};
rpc Delete(DeleteRequest) returns (DeleteResponse) {};
rpc RenameFile(RenameFileRequst) returns (RenameFileResponse) {};
rpc Write(WriteRequest) returns (WriteResponse) {};
// rpc Append(AppendRequest) returns (AppendResponse) {};
rpc ReadAt(ReadAtRequest) returns (ReadAtResponse) {};
rpc ListDir(ListDirRequest) returns (ListDirResponse) {};
rpc WalkDir(WalkDirRequest) returns (WalkDirResponse) {};
rpc RenameData(RenameDataRequest) returns (RenameDataResponse) {};
rpc MakeVolumes(MakeVolumesRequest) returns (MakeVolumesResponse) {};
rpc MakeVolume(MakeVolumeRequest) returns (MakeVolumeResponse) {};
rpc ListVolumes(ListVolumesRequest) returns (ListVolumesResponse) {};
rpc StatVolume(StatVolumeRequest) returns (StatVolumeResponse) {};
rpc WriteMetadata(WriteMetadataRequest) returns (WriteMetadataResponse) {};
rpc ReadVersion(ReadVersionRequest) returns (ReadVersionResponse) {};
rpc ReadXL(ReadXLRequest) returns (ReadXLResponse) {};
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
}

View File

@@ -9,7 +9,9 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
protos.workspace = true
serde_json.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }

View File

@@ -1,14 +1,21 @@
#![cfg(test)]
use ecstore::disk::VolumeInfo;
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{node_service_client::NodeServiceClient, PingRequest, PingResponse},
proto_gen::node_service::{
node_service_client::NodeServiceClient, ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse,
},
};
use std::error::Error;
use tonic::Request;
async fn get_client() -> Result<NodeServiceClient<tonic::transport::Channel>, Box<dyn Error>> {
Ok(NodeServiceClient::connect("http://localhost:9000").await?)
}
#[tokio::test]
async fn main() -> Result<(), Box<dyn Error>> {
async fn ping() -> Result<(), Box<dyn Error>> {
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
@@ -44,3 +51,38 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[tokio::test]
async fn make_volume() -> Result<(), Box<dyn Error>> {
let mut client = get_client().await?;
let request = Request::new(MakeVolumeRequest {
disk: "data".to_string(),
volume: "dandan".to_string(),
});
let response = client.make_volume(request).await?.into_inner();
if response.success {
println!("success");
} else {
println!("failed: {:?}", response.error_info);
}
Ok(())
}
#[tokio::test]
async fn list_volumes() -> Result<(), Box<dyn Error>> {
let mut client = get_client().await?;
let request = Request::new(ListVolumesRequest {
disk: "data".to_string(),
});
let response = client.list_volumes(request).await?.into_inner();
let volume_infos: Vec<VolumeInfo> = response
.volume_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<VolumeInfo>(&json_str).ok())
.collect();
println!("{:?}", volume_infos);
Ok(())
}

View File

@@ -3,7 +3,7 @@ use super::{
DeleteOptions, DiskAPI, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions,
RenameDataResp, VolumeInfo, WalkDirOptions,
};
use crate::disk::STORAGE_FORMAT_FILE;
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::{
error::{Error, Result},
file_meta::FileMeta,
@@ -344,6 +344,10 @@ impl DiskAPI for LocalDisk {
self.id
}
fn path(&self) -> PathBuf {
self.root.clone()
}
#[must_use]
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
let p = self.get_object_path(volume, path)?;
@@ -443,7 +447,8 @@ impl DiskAPI for LocalDisk {
let file = File::create(&fpath).await?;
Ok(FileWriter::new(file))
Ok(FileWriter::Local(LocalFileWriter::new(file)))
// Ok(FileWriter::new(file))
// let mut writer = BufWriter::new(file);
@@ -469,7 +474,8 @@ impl DiskAPI for LocalDisk {
.open(&p)
.await?;
Ok(FileWriter::new(file))
Ok(FileWriter::Local(LocalFileWriter::new(file)))
// Ok(FileWriter::new(file))
// let mut writer = BufWriter::new(file);
@@ -486,7 +492,7 @@ impl DiskAPI for LocalDisk {
debug!("read_file {:?}", &p);
let file = File::options().read(true).open(&p).await?;
Ok(FileReader::new(file))
Ok(FileReader::Local(LocalFileReader::new(file)))
// file.seek(SeekFrom::Start(offset as u64)).await?;
@@ -683,9 +689,7 @@ impl DiskAPI for LocalDisk {
.await?;
}
Ok(RenameDataResp {
old_data_dir: old_data_dir,
})
Ok(RenameDataResp { old_data_dir })
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {

View File

@@ -2,6 +2,7 @@ pub mod endpoint;
pub mod error;
pub mod format;
mod local;
mod remote;
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
@@ -12,18 +13,22 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json";
const STORAGE_FORMAT_FILE: &str = "xl.meta";
use crate::{
erasure::ReadAt,
erasure::{ReadAt, Write},
error::Result,
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
};
use bytes::Bytes;
use std::{fmt::Debug, io::SeekFrom, pin::Pin, sync::Arc};
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, ReadAtRequest, WriteRequest};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc};
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWrite},
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tonic::{transport::Channel, Request};
use tower::timeout::Timeout;
use uuid::Uuid;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
@@ -33,8 +38,8 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskS
let s = local::LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Box::new(s)))
} else {
let _ = opt.health_check;
unimplemented!()
let remote_disk = remote::RemoteDisk::new(ep, opt).await?;
Ok(Arc::new(Box::new(remote_disk)))
}
}
@@ -42,6 +47,7 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskS
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
fn id(&self) -> Uuid;
fn path(&self) -> PathBuf;
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
@@ -82,7 +88,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct WalkDirOptions {
// Bucket to scanner
pub bucket: String,
@@ -109,7 +115,7 @@ pub struct WalkDirOptions {
pub disk_id: String,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MetaCacheEntry {
// name is the full name of the object including prefixes
pub name: String,
@@ -184,17 +190,18 @@ pub struct DiskOption {
pub health_check: bool,
}
#[derive(Serialize, Deserialize)]
pub struct RenameDataResp {
pub old_data_dir: Option<Uuid>,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DeleteOptions {
pub recursive: bool,
pub immediate: bool,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadMultipleReq {
pub bucket: String,
pub prefix: String,
@@ -205,7 +212,7 @@ pub struct ReadMultipleReq {
pub max_results: usize,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReadMultipleResp {
pub bucket: String,
pub prefix: String,
@@ -230,65 +237,160 @@ pub struct ReadMultipleResp {
// }
// }
#[derive(Debug, Deserialize, Serialize)]
pub struct VolumeInfo {
pub name: String,
pub created: Option<OffsetDateTime>,
}
#[derive(Deserialize, Serialize)]
pub struct ReadOptions {
pub read_data: bool,
pub healing: bool,
}
pub struct FileWriter {
pub inner: Pin<Box<dyn AsyncWrite + Send + Sync + 'static>>,
// pub struct FileWriter {
// pub inner: Pin<Box<dyn AsyncWrite + Send + Sync + 'static>>,
// }
// impl AsyncWrite for FileWriter {
// fn poll_write(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// buf: &[u8],
// ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
// Pin::new(&mut self.inner).poll_write(cx, buf)
// }
// fn poll_flush(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_flush(cx)
// }
// fn poll_shutdown(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
// Pin::new(&mut self.inner).poll_shutdown(cx)
// }
// }
// impl FileWriter {
// pub fn new<W>(inner: W) -> Self
// where
// W: AsyncWrite + Send + Sync + 'static,
// {
// Self { inner: Box::pin(inner) }
// }
// }
pub enum FileWriter {
Local(LocalFileWriter),
Remote(RemoteFileWriter),
}
impl AsyncWrite for FileWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
#[async_trait::async_trait]
impl Write for FileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
match self {
Self::Local(local_file_writer) => local_file_writer.write(buf).await,
Self::Remote(remote_file_writer) => remote_file_writer.write(buf).await,
}
}
}
impl FileWriter {
pub fn new<W>(inner: W) -> Self
where
W: AsyncWrite + Send + Sync + 'static,
{
Self { inner: Box::pin(inner) }
}
}
#[derive(Debug)]
pub struct FileReader {
pub struct LocalFileWriter {
pub inner: File,
}
impl FileReader {
impl LocalFileWriter {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
#[async_trait::async_trait]
impl Write for LocalFileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
self.inner.write(buf).await?;
self.inner.flush().await?;
Ok(())
}
}
pub struct RemoteFileWriter {
pub root: PathBuf,
pub volume: String,
pub path: String,
pub is_append: bool,
client: NodeServiceClient<Timeout<Channel>>,
}
impl RemoteFileWriter {
pub fn new(
root: PathBuf,
volume: String,
path: String,
is_append: bool,
client: NodeServiceClient<Timeout<Channel>>,
) -> Self {
Self {
root,
volume,
path,
is_append,
client,
}
}
}
#[async_trait::async_trait]
impl Write for RemoteFileWriter {
async fn write(&mut self, buf: &[u8]) -> Result<()> {
let request = Request::new(WriteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
is_append: self.is_append,
data: buf.to_vec(),
});
let _response = self.client.write(request).await?.into_inner();
Ok(())
}
}
#[derive(Debug)]
pub enum FileReader {
Local(LocalFileReader),
Remote(RemoteFileReader),
}
#[async_trait::async_trait]
impl ReadAt for FileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
match self {
Self::Local(local_file_writer) => local_file_writer.read_at(offset, length).await,
Self::Remote(remote_file_writer) => remote_file_writer.read_at(offset, length).await,
}
}
}
#[derive(Debug)]
pub struct LocalFileReader {
pub inner: File,
}
impl LocalFileReader {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
#[async_trait::async_trait]
impl ReadAt for LocalFileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
self.inner.seek(SeekFrom::Start(offset as u64)).await?;
@@ -301,3 +403,38 @@ impl ReadAt for FileReader {
Ok((buffer, bytes_read))
}
}
#[derive(Debug)]
pub struct RemoteFileReader {
pub root: PathBuf,
pub volume: String,
pub path: String,
client: NodeServiceClient<Timeout<Channel>>,
}
impl RemoteFileReader {
pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient<Timeout<Channel>>) -> Self {
Self {
root,
volume,
path,
client,
}
}
}
#[async_trait::async_trait]
impl ReadAt for RemoteFileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
let request = Request::new(ReadAtRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
offset: offset.try_into().unwrap(),
length: length.try_into().unwrap(),
});
let response = self.client.read_at(request).await?.into_inner();
Ok((response.data, response.read_size.try_into().unwrap()))
}
}

356
ecstore/src/disk/remote.rs Normal file
View File

@@ -0,0 +1,356 @@
use std::{path::PathBuf, time::Duration};
use bytes::Bytes;
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
node_service_client::NodeServiceClient, DeleteRequest, DeleteVolumeRequest, ListDirRequest, ListVolumesRequest,
MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest,
RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
},
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use tokio::fs;
use tonic::{
transport::{Channel, Endpoint as tonic_Endpoint},
Request,
};
use tower::timeout::Timeout;
use uuid::Uuid;
use crate::{
error::Result,
store_api::{FileInfo, RawFileInfo},
};
use super::{
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskOption, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq,
ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp, VolumeInfo, WalkDirOptions,
};
#[derive(Debug)]
pub struct RemoteDisk {
channel: Channel,
pub root: PathBuf,
}
impl RemoteDisk {
pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result<Self> {
let connector = tonic_Endpoint::from_shared(format!(
"{}://{}{}",
ep.url.scheme(),
ep.url.host_str().unwrap(),
ep.url.port().unwrap()
))
.unwrap();
let channel = tokio::runtime::Runtime::new().unwrap().block_on(connector.connect()).unwrap();
let root = fs::canonicalize(ep.url.path()).await?;
Ok(Self { channel, root })
}
fn get_client(&self) -> NodeServiceClient<Timeout<Channel>> {
node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
)
}
}
// TODO: all api need to handle errors
#[async_trait::async_trait]
impl DiskAPI for RemoteDisk {
fn is_local(&self) -> bool {
false
}
fn id(&self) -> Uuid {
Uuid::nil()
}
fn path(&self) -> PathBuf {
self.root.clone()
}
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
let mut client = self.get_client();
let request = Request::new(ReadAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
});
let response = client.read_all(request).await?.into_inner();
Ok(Bytes::from(response.data))
}
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
let mut client = self.get_client();
let request = Request::new(WriteAllRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
data,
});
let _response = client.write_all(request).await?.into_inner();
Ok(())
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
let options = serde_json::to_string(&opt)?;
let mut client = self.get_client();
let request = Request::new(DeleteRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
options,
});
let _response = client.delete(request).await?.into_inner();
Ok(())
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
let mut client = self.get_client();
let request = Request::new(RenameFileRequst {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
src_path: src_path.to_string(),
dst_volume: dst_volume.to_string(),
dst_path: dst_path.to_string(),
});
let _response = client.rename_file(request).await?.into_inner();
Ok(())
}
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
self.get_client(),
)))
}
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
self.get_client(),
)))
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
Ok(FileReader::Remote(RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
self.get_client(),
)))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
let mut client = self.get_client();
let request = Request::new(ListDirRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.list_dir(request).await?.into_inner();
Ok(response.volumes)
}
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
let walk_dir_options = serde_json::to_string(&opts)?;
let mut client = self.get_client();
let request = Request::new(WalkDirRequest {
disk: self.root.to_string_lossy().to_string(),
walk_dir_options,
});
let response = client.walk_dir(request).await?.into_inner();
let entries = response
.meta_cache_entry
.into_iter()
.filter_map(|json_str| serde_json::from_str::<MetaCacheEntry>(&json_str).ok())
.collect();
Ok(entries)
}
async fn rename_data(
&self,
src_volume: &str,
src_path: &str,
fi: FileInfo,
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client();
let request = Request::new(RenameDataRequest {
disk: self.root.to_string_lossy().to_string(),
src_volume: src_volume.to_string(),
src_path: src_path.to_string(),
file_info,
dst_volume: dst_volume.to_string(),
dst_path: dst_path.to_string(),
});
let response = client.rename_data(request).await?.into_inner();
let rename_data_resp = serde_json::from_str::<RenameDataResp>(&response.rename_data_resp)?;
Ok(rename_data_resp)
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
let mut client = self.get_client();
let request = Request::new(MakeVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
});
let _response = client.make_volumes(request).await?.into_inner();
Ok(())
}
async fn make_volume(&self, volume: &str) -> Result<()> {
let mut client = self.get_client();
let request = Request::new(MakeVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let _response = client.make_volume(request).await?.into_inner();
Ok(())
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
let mut client = self.get_client();
let request = Request::new(ListVolumesRequest {
disk: self.root.to_string_lossy().to_string(),
});
let response = client.list_volumes(request).await?.into_inner();
let infos = response
.volume_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<VolumeInfo>(&json_str).ok())
.collect();
Ok(infos)
}
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
let mut client = self.get_client();
let request = Request::new(StatVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let response = client.stat_volume(request).await?.into_inner();
let volume_info = serde_json::from_str::<VolumeInfo>(&response.volume_info)?;
Ok(volume_info)
}
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
let file_info = serde_json::to_string(&fi)?;
let mut client = self.get_client();
let request = Request::new(WriteMetadataRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
file_info,
});
let _response = client.write_metadata(request).await?.into_inner();
Ok(())
}
async fn read_version(
&self,
_org_volume: &str,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
let opts = serde_json::to_string(opts)?;
let mut client = self.get_client();
let request = Request::new(ReadVersionRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
version_id: version_id.to_string(),
opts,
});
let response = client.read_version(request).await?.into_inner();
let file_info = serde_json::from_str::<FileInfo>(&response.file_info)?;
Ok(file_info)
}
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
let mut client = self.get_client();
let request = Request::new(ReadXlRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
path: path.to_string(),
read_data,
});
let response = client.read_xl(request).await?.into_inner();
let raw_file_info = serde_json::from_str::<RawFileInfo>(&response.raw_file_info)?;
Ok(raw_file_info)
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = self.get_client();
let request = Request::new(ReadMultipleRequest {
disk: self.root.to_string_lossy().to_string(),
read_multiple_req,
});
let response = client.read_multiple(request).await?.into_inner();
let read_multiple_resps = response
.read_multiple_resps
.into_iter()
.filter_map(|json_str| serde_json::from_str::<ReadMultipleResp>(&json_str).ok())
.collect();
Ok(read_multiple_resps)
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
let mut client = self.get_client();
let request = Request::new(DeleteVolumeRequest {
disk: self.root.to_string_lossy().to_string(),
volume: volume.to_string(),
});
let _response = client.delete_volume(request).await?.into_inner();
Ok(())
}
}

View File

@@ -3,7 +3,7 @@ use bytes::Bytes;
use futures::future::join_all;
use futures::{Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use tokio::io::AsyncWrite;
use std::fmt::Debug;
use tokio::io::AsyncWriteExt;
use tokio::io::DuplexStream;
use tracing::debug;
@@ -13,7 +13,7 @@ use uuid::Uuid;
use crate::chunk_stream::ChunkedStream;
use crate::disk::error::DiskError;
use crate::disk::FileReader;
use crate::disk::{FileReader, FileWriter};
pub struct Erasure {
data_shards: usize,
@@ -43,17 +43,16 @@ impl Erasure {
}
}
pub async fn encode<S, W>(
pub async fn encode<S>(
&self,
body: S,
writers: &mut [W],
writers: &mut [FileWriter],
// block_size: usize,
total_size: usize,
_write_quorum: usize,
) -> Result<usize>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
W: AsyncWrite + Unpin,
{
let mut stream = ChunkedStream::new(body, total_size, self.block_size, false);
let mut total: usize = 0;
@@ -85,7 +84,7 @@ impl Erasure {
let mut errs = Vec::new();
for (i, w) in writers.iter_mut().enumerate() {
match w.write_all(blocks[i].as_ref()).await {
match w.write(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
@@ -318,6 +317,12 @@ impl Erasure {
}
}
#[async_trait::async_trait]
pub trait Write {
async fn write(&mut self, buf: &[u8]) -> Result<()>;
}
#[async_trait::async_trait]
pub trait ReadAt {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
}

View File

@@ -3,7 +3,7 @@ mod chunk_stream;
pub mod disk;
mod disks_layout;
mod endpoints;
mod erasure;
pub mod erasure;
pub mod error;
mod file_meta;
pub mod peer;

View File

@@ -197,6 +197,7 @@ pub struct ObjectPartInfo {
// }
// }
#[derive(Serialize, Deserialize)]
pub struct RawFileInfo {
pub buf: Vec<u8>,
}

View File

@@ -28,6 +28,7 @@ prost-types.workspace = true
protos.workspace = true
protobuf.workspace = true
s3s.workspace = true
serde_json.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
tokio = { workspace = true, features = [

View File

@@ -1,4 +1,10 @@
use ecstore::{disk::DiskStore, peer::LocalPeerS3Client};
use ecstore::{
disk::{DeleteOptions, DiskStore, ReadMultipleReq, ReadOptions, WalkDirOptions},
erasure::{ReadAt, Write},
peer::{LocalPeerS3Client, PeerS3Client},
store_api::{FileInfo, MakeBucketOptions},
};
use tokio::fs;
use tonic::{Request, Response, Status};
use tracing::{debug, error, info};
@@ -6,7 +12,13 @@ use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
MakeBucketRequest, MakeBucketResponse, PingRequest, PingResponse,
DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse, ListDirRequest, ListDirResponse,
ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse,
MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest,
ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest,
ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, StatVolumeRequest,
StatVolumeResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest,
WriteMetadataResponse, WriteRequest, WriteResponse,
},
};
@@ -20,6 +32,24 @@ pub fn make_server(local_disks: Vec<DiskStore>) -> NodeServer<impl Node> {
NodeServer::new(NodeService { local_peer })
}
impl NodeService {
async fn find_disk(&self, disk_path: &String) -> Option<DiskStore> {
let disk_path = match fs::canonicalize(disk_path).await {
Ok(disk_path) => disk_path,
Err(_) => return None,
};
self.local_peer.local_disks.iter().find(|&x| x.path() == disk_path).cloned()
}
fn all_disk(&self) -> Vec<String> {
self.local_peer
.local_disks
.iter()
.map(|disk| disk.path().to_string_lossy().to_string())
.collect()
}
}
#[tonic::async_trait]
impl Node for NodeService {
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
@@ -52,9 +82,578 @@ impl Node for NodeService {
async fn make_bucket(&self, request: Request<MakeBucketRequest>) -> Result<Response<MakeBucketResponse>, Status> {
debug!("make bucket");
let _req = request.into_inner();
let make_bucket_request = request.into_inner();
let options = if let Some(opts) = make_bucket_request.options {
MakeBucketOptions {
force_create: opts.force_create,
}
} else {
MakeBucketOptions::default()
};
match self.local_peer.make_bucket(&make_bucket_request.name, &options).await {
Ok(_) => Ok(tonic::Response::new(MakeBucketResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(MakeBucketResponse {
success: false,
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
// match self.local_peer.make_bucket(&name, &MakeBucketOptions::default()).await {}
unimplemented!()
async fn read_all(&self, request: Request<ReadAllRequest>) -> Result<Response<ReadAllResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_all(&request.volume, &request.path).await {
Ok(data) => Ok(tonic::Response::new(ReadAllResponse {
success: true,
data: data.to_vec(),
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadAllResponse {
success: false,
data: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadAllResponse {
success: false,
data: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn write_all(&self, request: Request<WriteAllRequest>) -> Result<Response<WriteAllResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.write_all(&request.volume, &request.path, request.data).await {
Ok(_) => Ok(tonic::Response::new(WriteAllResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(WriteAllResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WriteAllResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn delete(&self, request: Request<DeleteRequest>) -> Result<Response<DeleteResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let options = match serde_json::from_str::<DeleteOptions>(&request.options) {
Ok(options) => options,
Err(_) => {
return Ok(tonic::Response::new(DeleteResponse {
success: false,
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk.delete(&request.volume, &request.path, options).await {
Ok(_) => Ok(tonic::Response::new(DeleteResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DeleteResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DeleteResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn rename_file(&self, request: Request<RenameFileRequst>) -> Result<Response<RenameFileResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk
.rename_file(&request.src_volume, &request.src_path, &request.dst_volume, &request.dst_path)
.await
{
Ok(_) => Ok(tonic::Response::new(RenameFileResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(RenameFileResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(RenameFileResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn write(&self, request: Request<WriteRequest>) -> Result<Response<WriteResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_writer = if request.is_append {
disk.append_file(&request.volume, &request.path).await
} else {
disk.create_file("", &request.volume, &request.path, 0).await
};
match file_writer {
Ok(mut file_writer) => match file_writer.write(&request.data).await {
Ok(_) => Ok(tonic::Response::new(WriteResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(WriteResponse {
success: false,
error_info: Some(err.to_string()),
})),
},
Err(err) => Ok(tonic::Response::new(WriteResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WriteResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_at(&self, request: Request<ReadAtRequest>) -> Result<Response<ReadAtResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_file(&request.volume, &request.path).await {
Ok(mut file_reader) => {
match file_reader
.read_at(request.offset.try_into().unwrap(), request.length.try_into().unwrap())
.await
{
Ok((data, read_size)) => Ok(tonic::Response::new(ReadAtResponse {
success: true,
data,
read_size: read_size.try_into().unwrap(),
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
}
}
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn list_dir(&self, request: Request<ListDirRequest>) -> Result<Response<ListDirResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.list_dir("", &request.volume, "", 0).await {
Ok(volumes) => Ok(tonic::Response::new(ListDirResponse {
success: true,
volumes,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ListDirResponse {
success: false,
volumes: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ListDirResponse {
success: false,
volumes: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn walk_dir(&self, request: Request<WalkDirRequest>) -> Result<Response<WalkDirResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let opts = match serde_json::from_str::<WalkDirOptions>(&request.walk_dir_options) {
Ok(options) => options,
Err(_) => {
return Ok(tonic::Response::new(WalkDirResponse {
success: false,
meta_cache_entry: Vec::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk.walk_dir(opts).await {
Ok(entries) => {
let entries = entries
.into_iter()
.filter_map(|entry| serde_json::to_string(&entry).ok())
.collect();
Ok(tonic::Response::new(WalkDirResponse {
success: true,
meta_cache_entry: entries,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(WalkDirResponse {
success: false,
meta_cache_entry: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WalkDirResponse {
success: false,
meta_cache_entry: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn rename_data(&self, request: Request<RenameDataRequest>) -> Result<Response<RenameDataResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_info = match serde_json::from_str::<FileInfo>(&request.file_info) {
Ok(file_info) => file_info,
Err(_) => {
return Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk
.rename_data(&request.src_volume, &request.src_path, file_info, &request.dst_volume, &request.dst_path)
.await
{
Ok(rename_data_resp) => {
let rename_data_resp = match serde_json::to_string(&rename_data_resp) {
Ok(file_info) => file_info,
Err(_) => {
return Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some("can not encode RenameDataResp".to_string()),
}));
}
};
Ok(tonic::Response::new(RenameDataResponse {
success: true,
rename_data_resp,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(RenameDataResponse {
success: false,
rename_data_resp: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn make_volumes(&self, request: Request<MakeVolumesRequest>) -> Result<Response<MakeVolumesResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.make_volumes(request.volumes.iter().map(|s| &**s).collect()).await {
Ok(_) => Ok(tonic::Response::new(MakeVolumesResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(MakeVolumesResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(MakeVolumesResponse {
success: false,
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk())),
}))
}
}
async fn make_volume(&self, request: Request<MakeVolumeRequest>) -> Result<Response<MakeVolumeResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.make_volume(&request.volume).await {
Ok(_) => Ok(tonic::Response::new(MakeVolumeResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(MakeVolumeResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(MakeVolumeResponse {
success: false,
error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk())),
}))
}
}
async fn list_volumes(&self, request: Request<ListVolumesRequest>) -> Result<Response<ListVolumesResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.list_volumes().await {
Ok(volume_infos) => {
let volume_infos = volume_infos
.into_iter()
.filter_map(|volume_info| serde_json::to_string(&volume_info).ok())
.collect();
Ok(tonic::Response::new(ListVolumesResponse {
success: true,
volume_infos,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(ListVolumesResponse {
success: false,
volume_infos: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ListVolumesResponse {
success: false,
volume_infos: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn stat_volume(&self, request: Request<StatVolumeRequest>) -> Result<Response<StatVolumeResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.stat_volume(&request.volume).await {
Ok(volume_info) => match serde_json::to_string(&volume_info) {
Ok(volume_info) => Ok(tonic::Response::new(StatVolumeResponse {
success: true,
volume_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(StatVolumeResponse {
success: false,
volume_info: String::new(),
error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(StatVolumeResponse {
success: false,
volume_info: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(StatVolumeResponse {
success: false,
volume_info: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn write_metadata(&self, request: Request<WriteMetadataRequest>) -> Result<Response<WriteMetadataResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let file_info = match serde_json::from_str::<FileInfo>(&request.file_info) {
Ok(file_info) => file_info,
Err(err) => {
return Ok(tonic::Response::new(WriteMetadataResponse {
success: false,
error_info: Some(format!("decode FileInfo failed, {}", err.to_string())),
}));
}
};
match disk.write_metadata("", &request.volume, &request.path, file_info).await {
Ok(_) => Ok(tonic::Response::new(WriteMetadataResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(WriteMetadataResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(WriteMetadataResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_version(&self, request: Request<ReadVersionRequest>) -> Result<Response<ReadVersionResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let opts = match serde_json::from_str::<ReadOptions>(&request.opts) {
Ok(options) => options,
Err(_) => {
return Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some("can not decode DeleteOptions".to_string()),
}));
}
};
match disk
.read_version("", &request.volume, &request.path, &request.version_id, &opts)
.await
{
Ok(file_info) => match serde_json::to_string(&file_info) {
Ok(file_info) => Ok(tonic::Response::new(ReadVersionResponse {
success: true,
file_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadVersionResponse {
success: false,
file_info: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_xl(&self, request: Request<ReadXlRequest>) -> Result<Response<ReadXlResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_xl(&request.volume, &request.path, request.read_data).await {
Ok(raw_file_info) => match serde_json::to_string(&raw_file_info) {
Ok(raw_file_info) => Ok(tonic::Response::new(ReadXlResponse {
success: true,
raw_file_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadXlResponse {
success: false,
raw_file_info: String::new(),
error_info: Some(format!("encode RawFileInfo failed, {}", err.to_string())),
})),
},
Err(err) => Ok(tonic::Response::new(ReadXlResponse {
success: false,
raw_file_info: String::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadXlResponse {
success: false,
raw_file_info: String::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn read_multiple(&self, request: Request<ReadMultipleRequest>) -> Result<Response<ReadMultipleResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let read_multiple_req = match serde_json::from_str::<ReadMultipleReq>(&request.read_multiple_req) {
Ok(read_multiple_req) => read_multiple_req,
Err(_) => {
return Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some("can not decode ReadMultipleReq".to_string()),
}));
}
};
match disk.read_multiple(read_multiple_req).await {
Ok(read_multiple_resps) => {
let read_multiple_resps = read_multiple_resps
.into_iter()
.filter_map(|read_multiple_resp| serde_json::to_string(&read_multiple_resp).ok())
.collect();
Ok(tonic::Response::new(ReadMultipleResponse {
success: true,
read_multiple_resps,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn delete_volume(&self, request: Request<DeleteVolumeRequest>) -> Result<Response<DeleteVolumeResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.delete_volume(&request.volume).await {
Ok(_) => Ok(tonic::Response::new(DeleteVolumeResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DeleteVolumeResponse {
success: false,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DeleteVolumeResponse {
success: false,
error_info: Some("can not find disk".to_string()),
}))
}
}
}