peer rest frame

Signed-off-by: mujunxiang <1948535941@qq.com>
This commit is contained in:
mujunxiang
2024-11-25 15:34:46 +08:00
parent b79b967036
commit 7029524504
16 changed files with 3012 additions and 26 deletions

6
Cargo.lock generated
View File

@@ -594,6 +594,8 @@ dependencies = [
"lazy_static",
"lock",
"protos",
"rmp-serde",
"serde",
"serde_json",
"tokio",
"tonic",
@@ -614,6 +616,7 @@ dependencies = [
"bytesize",
"common",
"crc32fast",
"flatbuffers",
"futures",
"glob",
"hex-simd",
@@ -2152,6 +2155,8 @@ dependencies = [
"prost-types",
"protobuf",
"protos",
"rmp-serde",
"router",
"s3s",
"serde",
"serde_json",
@@ -2169,6 +2174,7 @@ dependencies = [
"tracing-error",
"tracing-subscriber",
"transform-stream",
"url",
"uuid",
]

View File

@@ -51,6 +51,8 @@ prost-types = "0.13.3"
protobuf = "3.7"
protos = { path = "./common/protos" }
rand = "0.8.5"
rmp = "0.8.14"
rmp-serde = "1.3.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "207170f526c75a8190e8f9afadf961909fd01d34", default-features = true, features = [
"tower",
] }

File diff suppressed because it is too large Load Diff

View File

@@ -411,8 +411,327 @@ message GenerallyLockRequest {
}
message GenerallyLockResponse {
bool success = 1;
optional string error_info = 2;
bool success = 1;
optional string error_info = 2;
}
message Mss {
map<string, string> value = 1;
}
message LocalStorageInfoRequest {
bool metrics = 1;
}
message LocalStorageInfoResponse {
bool success = 1;
bytes storage_info = 2;
optional string error_info = 3;
}
message ServerInfoRequest {
bool metrics = 1;
}
message ServerInfoResponse {
bool success = 1;
bytes server_properties = 2;
optional string error_info = 3;
}
message GetCpusRequest {}
message GetCpusResponse {
bool success = 1;
bytes cpus = 2;
optional string error_info = 3;
}
message GetNetInfoRequest {}
message GetNetInfoResponse {
bool success = 1;
bytes net_info = 2;
optional string error_info = 3;
}
message GetPartitionsRequest {}
message GetPartitionsResponse {
bool success = 1;
bytes partitions = 2;
optional string error_info = 3;
}
message GetOsInfoRequest {}
message GetOsInfoResponse {
bool success = 1;
bytes os_info = 2;
optional string error_info = 3;
}
message GetSELinuxInfoRequest {}
message GetSELinuxInfoResponse {
bool success = 1;
bytes sys_services = 2;
optional string error_info = 3;
}
message GetSysConfigRequest {}
message GetSysConfigResponse {
bool success = 1;
bytes sys_config = 2;
optional string error_info = 3;
}
message GetSysErrorsRequest {}
message GetSysErrorsResponse {
bool success = 1;
bytes sys_errors = 2;
optional string error_info = 3;
}
message GetMemInfoRequest {}
message GetMemInfoResponse {
bool success = 1;
bytes mem_info = 2;
optional string error_info = 3;
}
message GetMetricsRequest {
uint64 metric_type = 1;
bytes opts = 2;
}
message GetMetricsResponse {
bool success = 1;
bytes realtime_metrics = 2;
optional string error_info = 3;
}
message GetProcInfoRequest {}
message GetProcInfoResponse {
bool success = 1;
bytes proc_info = 2;
optional string error_info = 3;
}
message StartProfilingRequest {
string profiler = 1;
}
message StartProfilingResponse {
bool success = 1;
optional string error_info = 2;
}
message DownloadProfileDataRequest {}
message DownloadProfileDataResponse {
bool success = 1;
map<string, bytes> data = 2;
optional string error_info = 3;
}
message GetBucketStatsDataRequest {
string bucket = 1;
}
message GetBucketStatsDataResponse {
bool success = 1;
bytes bucket_stats = 2;
optional string error_info = 3;
}
message GetSRMetricsDataRequest {}
message GetSRMetricsDataResponse {
bool success = 1;
bytes sr_metrics_summary = 2;
optional string error_info = 3;
}
message GetAllBucketStatsRequest {}
message GetAllBucketStatsResponse {
bool success = 1;
bytes bucket_stats_map = 2;
optional string error_info = 3;
}
message LoadBucketMetadataRequest {
string bucket = 1;
}
message LoadBucketMetadataResponse {
bool success = 1;
optional string error_info = 2;
}
message DeleteBucketMetadataRequest {
string bucket = 1;
}
message DeleteBucketMetadataResponse {
bool success = 1;
optional string error_info = 2;
}
message DeletePolicyRequest {
string policy_name = 1;
}
message DeletePolicyResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadPolicyRequest {
string policy_name = 1;
}
message LoadPolicyResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadPolicyMappingRequest {
string user_or_group = 1;
uint64 user_type = 2;
bool is_group = 3;
}
message LoadPolicyMappingResponse {
bool success = 1;
optional string error_info = 2;
}
message DeleteUserRequest {
string access_key = 1;
}
message DeleteUserResponse {
bool success = 1;
optional string error_info = 2;
}
message DeleteServiceAccountRequest {
string access_key = 1;
}
message DeleteServiceAccountResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadUserRequest {
string access_key = 1;
bool temp = 2;
}
message LoadUserResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadServiceAccountRequest {
string access_key = 1;
}
message LoadServiceAccountResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadGroupRequest {
string group = 1;
}
message LoadGroupResponse {
bool success = 1;
optional string error_info = 2;
}
message ReloadSiteReplicationConfigRequest {}
message ReloadSiteReplicationConfigResponse {
bool success = 1;
optional string error_info = 2;
}
message SignalServiceRequest {
Mss vars = 1;
}
message SignalServiceResponse {
bool success = 1;
optional string error_info = 2;
}
message BackgroundHealStatusRequest {}
message BackgroundHealStatusResponse {
bool success = 1;
bytes bg_heal_state = 2;
optional string error_info = 3;
}
message GetMetacacheListingRequest {
bytes opts = 1;
}
message GetMetacacheListingResponse {
bool success = 1;
bytes metacache = 2;
optional string error_info = 3;
}
message UpdateMetacacheListingRequest {
bytes metacache = 1;
}
message UpdateMetacacheListingResponse {
bool success = 1;
bytes metacache = 2;
optional string error_info = 3;
}
message ReloadPoolMetaRequest {}
message ReloadPoolMetaResponse {
bool success = 1;
optional string error_info = 2;
}
message StopRebalanceRequest {}
message StopRebalanceResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadRebalanceMetaRequest {
bool start_rebalance = 1;
}
message LoadRebalanceMetaResponse {
bool success = 1;
optional string error_info = 2;
}
message LoadTransitionTierConfigRequest {
bool start_rebalance = 1;
}
message LoadTransitionTierConfigResponse {
bool success = 1;
optional string error_info = 2;
}
/* -------------------------------------------------------------------- */
@@ -466,4 +785,45 @@ service NodeService {
rpc RUnLock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc ForceUnLock(GenerallyLockRequest) returns (GenerallyLockResponse) {};
rpc Refresh(GenerallyLockRequest) returns (GenerallyLockResponse) {};
/* -------------------------------peer rest service-------------------------- */
rpc LocalStorageInfo(LocalStorageInfoRequest) returns (LocalStorageInfoResponse) {};
rpc ServerInfo(ServerInfoRequest) returns (ServerInfoResponse) {};
rpc GetCpus(GetCpusRequest) returns (GetCpusResponse) {};
rpc GetNetInfo(GetNetInfoRequest) returns (GetNetInfoResponse) {};
rpc GetPartitions(GetPartitionsRequest) returns (GetPartitionsResponse) {};
rpc GetOsInfo(GetOsInfoRequest) returns (GetOsInfoResponse) {};
rpc GetSELinuxInfo(GetSELinuxInfoRequest) returns (GetSELinuxInfoResponse) {};
rpc GetSysConfig(GetSysConfigRequest) returns (GetSysConfigResponse) {};
rpc GetSysErrors(GetSysErrorsRequest) returns (GetSysErrorsResponse) {};
rpc GetMemInfo(GetMemInfoRequest) returns (GetMemInfoResponse) {};
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {};
rpc GetProcInfo(GetProcInfoRequest) returns (GetProcInfoResponse) {};
rpc StartProfiling(StartProfilingRequest) returns (StartProfilingResponse) {};
rpc DownloadProfileData(DownloadProfileDataRequest) returns (DownloadProfileDataResponse) {};
rpc GetBucketStats(GetBucketStatsDataRequest) returns (GetBucketStatsDataResponse) {};
rpc GetSRMetrics(GetSRMetricsDataRequest) returns (GetSRMetricsDataResponse) {};
rpc GetAllBucketStats(GetAllBucketStatsRequest) returns (GetAllBucketStatsResponse) {};
rpc LoadBucketMetadata(LoadBucketMetadataRequest) returns (LoadBucketMetadataResponse) {};
rpc DeleteBucketMetadata(DeleteBucketMetadataRequest) returns (DeleteBucketMetadataResponse) {};
rpc DeletePolicy(DeletePolicyRequest) returns (DeletePolicyResponse) {};
rpc LoadPolicy(LoadPolicyRequest) returns (LoadPolicyResponse) {};
rpc LoadPolicyMapping(LoadPolicyMappingRequest) returns (LoadPolicyMappingResponse) {};
rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse) {};
rpc DeleteServiceAccount(DeleteServiceAccountRequest) returns (DeleteServiceAccountResponse) {};
rpc LoadUser(LoadUserRequest) returns (LoadUserResponse) {};
rpc LoadServiceAccount(LoadServiceAccountRequest) returns (LoadServiceAccountResponse) {};
rpc LoadGroup(LoadGroupRequest) returns (LoadGroupResponse) {};
rpc ReloadSiteReplicationConfig(ReloadSiteReplicationConfigRequest) returns (ReloadSiteReplicationConfigResponse) {};
// rpc VerifyBinary() returns () {};
// rpc CommitBinary() returns () {};
rpc SignalService(SignalServiceRequest) returns (SignalServiceResponse) {};
rpc BackgroundHealStatus(BackgroundHealStatusRequest) returns (BackgroundHealStatusResponse) {};
rpc GetMetacacheListing(GetMetacacheListingRequest) returns (GetMetacacheListingResponse) {};
rpc UpdateMetacacheListing(UpdateMetacacheListingRequest) returns (UpdateMetacacheListingResponse) {};
rpc ReloadPoolMeta(ReloadPoolMetaRequest) returns (ReloadPoolMetaResponse) {};
rpc StopRebalance(StopRebalanceRequest) returns (StopRebalanceResponse) {};
rpc LoadRebalanceMeta(LoadRebalanceMetaRequest) returns (LoadRebalanceMetaResponse) {};
rpc LoadTransitionTierConfig(LoadTransitionTierConfigRequest) returns (LoadTransitionTierConfigResponse) {};
}

View File

@@ -14,6 +14,8 @@ flatbuffers.workspace = true
lazy_static.workspace = true
lock.workspace = true
protos.workspace = true
rmp-serde.workspace = true
serde.workspace = true
serde_json.workspace = true
tonic = { version = "0.12.3", features = ["gzip"] }
tokio = { workspace = true }

View File

@@ -1,12 +1,16 @@
#![cfg(test)]
use ecstore::disk::VolumeInfo;
use ecstore::{disk::VolumeInfo, store_api::StorageInfo};
use protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest},
proto_gen::node_service::{
ListVolumesRequest, LocalStorageInfoRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
},
};
use std::error::Error;
use rmp_serde::Deserializer;
use serde::Deserialize;
use std::{error::Error, io::Cursor};
use tonic::Request;
const CLUSTER_ADDR: &str = "http://localhost:9000";
@@ -100,3 +104,21 @@ async fn read_all() -> Result<(), Box<dyn Error>> {
println!("{:?}", volume_infos);
Ok(())
}
#[tokio::test]
async fn storage_info() -> Result<(), Box<dyn Error>> {
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(LocalStorageInfoRequest { metrics: true });
let response = client.local_storage_info(request).await?.into_inner();
if !response.success {
println!("{:?}", response.error_info);
return Ok(());
}
let info = response.storage_info;
let mut buf = Deserializer::new(Cursor::new(info));
let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
println!("{:?}", storage_info);
Ok(())
}

View File

@@ -17,6 +17,7 @@ common.workspace = true
reader.workspace = true
glob = "0.3.1"
thiserror.workspace = true
flatbuffers.workspace = true
futures.workspace = true
tracing.workspace = true
serde.workspace = true
@@ -38,7 +39,8 @@ netif = "0.1.6"
nix = { version = "0.29.0", features = ["fs"] }
path-absolutize = "3.1.1"
protos.workspace = true
rmp-serde = "1.3.0"
rmp.workspace = true
rmp-serde.workspace = true
tokio-util = { version = "0.7.12", features = ["io", "compat"] }
crc32fast = "1.4.2"
siphasher = "1.0.1"
@@ -51,7 +53,6 @@ tokio = { workspace = true, features = ["io-util", "sync", "signal"] }
tokio-stream = "0.1.15"
tonic.workspace = true
tower.workspace = true
rmp = "0.8.14"
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.12", features = ["xxh64"] }
num = "0.4.3"

View File

@@ -0,0 +1,169 @@
use std::{
collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH},
};
use common::{
error::{Error, Result},
globals::GLOBAL_Local_Node_Name,
};
use protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{PingRequest, PingResponse},
};
use serde::{Deserialize, Serialize};
use tonic::Request;
use crate::{
disk::endpoint::Endpoint,
global::GLOBAL_Endpoints,
new_object_layer_fn,
store_api::{StorageAPI, StorageDisk},
};
pub const ITEM_OFFLINE: &str = "offline";
pub const ITEM_INITIALIZING: &str = "initializing";
pub const ITEM_ONLINE: &str = "online";
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MemStats {
alloc: u64,
total_alloc: u64,
mallocs: u64,
frees: u64,
heap_alloc: u64,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ServerProperties {
state: String,
endpoint: String,
scheme: String,
uptime: u64,
version: String,
commit_id: String,
network: HashMap<String, String>,
disks: Vec<StorageDisk>,
pool_number: i32,
pool_numbers: Vec<i32>,
mem_stats: MemStats,
max_procs: u64,
num_cpu: u64,
runtime_version: String,
rustfs_env_vars: HashMap<String, String>,
}
async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
let addr = format!(
"{}://{}:{}",
endpoint.url.scheme(),
endpoint.url.host_str().unwrap(),
endpoint.url.port().unwrap()
);
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
// 创建客户端
let mut client = node_service_time_out_client(&addr)
.await
.map_err(|err| Error::msg(err.to_string()))?;
// 构造 PingRequest
let request = Request::new(PingRequest {
version: 1,
body: finished_data.to_vec(),
});
// 发送请求并获取响应
let response: PingResponse = client.ping(request).await?.into_inner();
// 打印响应
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{}", e);
} else {
println!("ping_resp:body(flatbuffer): {:?}", ping_response_body);
}
Ok(())
}
pub async fn get_local_server_property() -> ServerProperties {
let addr = GLOBAL_Local_Node_Name.read().await.clone();
let mut pool_numbers = HashSet::new();
let mut network = HashMap::new();
for ep in GLOBAL_Endpoints.read().await.as_ref().iter() {
for endpoint in ep.endpoints.as_ref().iter() {
let node_name = match endpoint.url.host_str() {
Some(s) => s.to_string(),
None => addr.clone(),
};
if endpoint.is_local {
pool_numbers.insert(endpoint.pool_idx + 1);
network.insert(node_name, ITEM_ONLINE.to_string());
continue;
}
if !network.contains_key(&node_name) {
if is_server_resolvable(endpoint).await.is_err() {
network.insert(node_name, ITEM_OFFLINE.to_string());
} else {
network.insert(node_name, ITEM_ONLINE.to_string());
}
}
}
}
// todo: mem collect
// let mem_stats =
let mut props = ServerProperties {
endpoint: addr,
uptime: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
network,
..Default::default()
};
for pool_num in pool_numbers.iter() {
props.pool_numbers.push(*pool_num);
}
props.pool_numbers.sort();
props.pool_number = if props.pool_numbers.len() == 1 {
props.pool_numbers[1]
} else {
i32::MAX
};
// let mut sensitive = HashSet::new();
// sensitive.insert(ENV_ACCESS_KEY.to_string());
// sensitive.insert(ENV_SECRET_KEY.to_string());
// sensitive.insert(ENV_ROOT_USER.to_string());
// sensitive.insert(ENV_ROOT_PASSWORD.to_string());
let layer = new_object_layer_fn();
let lock = layer.read().await;
match lock.as_ref() {
Some(store) => {
let storage_info = store.local_storage_info().await;
props.state = ITEM_ONLINE.to_string();
props.disks = storage_info.disks;
}
None => {
props.state = ITEM_INITIALIZING.to_string();
// todo: get_offline_disks
// props.disks =
}
};
props
}

View File

@@ -19,6 +19,11 @@ lazy_static! {
pub static ref GLOBAL_ConfigSys: ConfigSys = ConfigSys::new();
}
pub const ENV_ACCESS_KEY: &str = "RUSTFS_ACCESS_KEY";
pub const ENV_SECRET_KEY: &str = "RUSTFS_SECRET_KEY";
pub const ENV_ROOT_USER: &str = "RUSTFS_ROOT_USER";
pub const ENV_ROOT_PASSWORD: &str = "RUSTFS_ROOT_PASSWORD";
pub static RUSTFS_CONFIG_PREFIX: &str = "config";
pub struct ConfigSys {}

View File

@@ -97,6 +97,7 @@ impl Default for HealStartSuccess {
pub type HealStopSuccess = HealStartSuccess;
#[derive(Debug, Default)]
pub struct HealingDisk {
pub id: String,
pub heal_id: String,

View File

@@ -1,3 +1,4 @@
pub mod admin_server_info;
pub mod bitrot;
pub mod cache_value;
mod chunk_stream;

View File

@@ -795,7 +795,7 @@ pub struct DeletedObject {
// pub replication_state: ReplicationState,
}
#[derive(Debug, Default, Serialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub enum BackendByte {
#[default]
Unknown,
@@ -833,13 +833,13 @@ pub struct StorageDisk {
pub disk_index: i32,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct StorageInfo {
pub disks: Vec<StorageDisk>,
pub backend: BackendInfo,
}
#[derive(Debug, Default, Serialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BackendDisks(HashMap<String, usize>);
impl BackendDisks {
@@ -851,7 +851,7 @@ impl BackendDisks {
}
}
#[derive(Debug, Default, Serialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase", default)]
pub struct BackendInfo {
pub backend_type: BackendByte,

View File

@@ -31,7 +31,9 @@ prost.workspace = true
prost-types.workspace = true
protos.workspace = true
protobuf.workspace = true
rmp-serde.workspace = true
s3s.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
@@ -51,6 +53,7 @@ tracing-error.workspace = true
tracing-subscriber.workspace = true
transform-stream.workspace = true
uuid = "1.11.0"
url.workspace = true
admin = { path = "../api/admin" }
axum.workspace = true
matchit = "0.8.4"

View File

@@ -1,36 +1,27 @@
use std::{error::Error, io::ErrorKind, pin::Pin};
use ecstore::{
admin_server_info::get_local_server_property,
disk::{
DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, Reader,
UpdateMetadataOpts, WalkDirOptions,
},
erasure::Writer,
heal::{data_usage_cache::DataUsageCache, heal_commands::HealOpts},
new_object_layer_fn,
peer::{LocalPeerS3Client, PeerS3Client},
store::{all_local_disk_path, find_local_disk},
store_api::{BucketOptions, DeleteBucketOptions, FileInfo, MakeBucketOptions},
store_api::{BucketOptions, DeleteBucketOptions, FileInfo, MakeBucketOptions, StorageAPI},
};
use futures::{Stream, StreamExt};
use lock::{lock_args::LockArgs, Locker, GLOBAL_LOCAL_SERVER};
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::NodeService as Node, CheckPartsRequest, CheckPartsResponse, DeleteBucketRequest,
DeleteBucketResponse, DeletePathsRequest, DeletePathsResponse, DeleteRequest, DeleteResponse, DeleteVersionRequest,
DeleteVersionResponse, DeleteVersionsRequest, DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse,
DiskInfoRequest, DiskInfoResponse, GenerallyLockRequest, GenerallyLockResponse, GetBucketInfoRequest,
GetBucketInfoResponse, HealBucketRequest, HealBucketResponse, ListBucketRequest, ListBucketResponse, ListDirRequest,
ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest,
MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, NsScannerRequest, NsScannerResponse, PingRequest,
PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse,
ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse,
RenameFileRequst, RenameFileResponse, RenamePartRequst, RenamePartResponse, StatVolumeRequest, StatVolumeResponse,
UpdateMetadataRequest, UpdateMetadataResponse, VerifyFileRequest, VerifyFileResponse, WalkDirRequest, WalkDirResponse,
WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse,
},
proto_gen::node_service::{node_service_server::NodeService as Node, *},
};
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
@@ -1506,4 +1497,246 @@ impl Node for NodeService {
})),
}
}
async fn local_storage_info(
&self,
_request: Request<LocalStorageInfoRequest>,
) -> Result<Response<LocalStorageInfoResponse>, Status> {
// let request = request.into_inner();
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => {
return Ok(tonic::Response::new(LocalStorageInfoResponse {
success: false,
storage_info: vec![],
error_info: Some("errServerNotInitialized".to_string()),
}))
}
};
let info = store.local_storage_info().await;
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(tonic::Response::new(LocalStorageInfoResponse {
success: false,
storage_info: vec![],
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(LocalStorageInfoResponse {
success: true,
storage_info: buf,
error_info: None,
}))
}
async fn server_info(&self, _request: Request<ServerInfoRequest>) -> Result<Response<ServerInfoResponse>, Status> {
let info = get_local_server_property().await;
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(tonic::Response::new(ServerInfoResponse {
success: false,
server_properties: vec![],
error_info: Some(err.to_string()),
}));
}
Ok(tonic::Response::new(ServerInfoResponse {
success: true,
server_properties: buf,
error_info: None,
}))
}
async fn get_cpus(&self, _request: Request<GetCpusRequest>) -> Result<Response<GetCpusResponse>, Status> {
todo!()
}
async fn get_net_info(&self, _request: Request<GetNetInfoRequest>) -> Result<Response<GetNetInfoResponse>, Status> {
todo!()
}
async fn get_partitions(&self, _request: Request<GetPartitionsRequest>) -> Result<Response<GetPartitionsResponse>, Status> {
todo!()
}
async fn get_os_info(&self, _request: Request<GetOsInfoRequest>) -> Result<Response<GetOsInfoResponse>, Status> {
todo!()
}
async fn get_se_linux_info(
&self,
_request: Request<GetSeLinuxInfoRequest>,
) -> Result<Response<GetSeLinuxInfoResponse>, Status> {
todo!()
}
async fn get_sys_config(&self, _request: Request<GetSysConfigRequest>) -> Result<Response<GetSysConfigResponse>, Status> {
todo!()
}
async fn get_sys_errors(&self, _request: Request<GetSysErrorsRequest>) -> Result<Response<GetSysErrorsResponse>, Status> {
todo!()
}
async fn get_mem_info(&self, _request: Request<GetMemInfoRequest>) -> Result<Response<GetMemInfoResponse>, Status> {
todo!()
}
async fn get_metrics(&self, _request: Request<GetMetricsRequest>) -> Result<Response<GetMetricsResponse>, Status> {
todo!()
}
async fn get_proc_info(&self, _request: Request<GetProcInfoRequest>) -> Result<Response<GetProcInfoResponse>, Status> {
todo!()
}
async fn start_profiling(
&self,
_request: Request<StartProfilingRequest>,
) -> Result<Response<StartProfilingResponse>, Status> {
todo!()
}
async fn download_profile_data(
&self,
_request: Request<DownloadProfileDataRequest>,
) -> Result<Response<DownloadProfileDataResponse>, Status> {
todo!()
}
async fn get_bucket_stats(
&self,
_request: Request<GetBucketStatsDataRequest>,
) -> Result<Response<GetBucketStatsDataResponse>, Status> {
todo!()
}
async fn get_sr_metrics(
&self,
_request: Request<GetSrMetricsDataRequest>,
) -> Result<Response<GetSrMetricsDataResponse>, Status> {
todo!()
}
async fn get_all_bucket_stats(
&self,
_request: Request<GetAllBucketStatsRequest>,
) -> Result<Response<GetAllBucketStatsResponse>, Status> {
todo!()
}
async fn load_bucket_metadata(
&self,
_request: Request<LoadBucketMetadataRequest>,
) -> Result<Response<LoadBucketMetadataResponse>, Status> {
todo!()
}
async fn delete_bucket_metadata(
&self,
_request: Request<DeleteBucketMetadataRequest>,
) -> Result<Response<DeleteBucketMetadataResponse>, Status> {
todo!()
}
async fn delete_policy(&self, _request: Request<DeletePolicyRequest>) -> Result<Response<DeletePolicyResponse>, Status> {
todo!()
}
async fn load_policy(&self, _request: Request<LoadPolicyRequest>) -> Result<Response<LoadPolicyResponse>, Status> {
todo!()
}
async fn load_policy_mapping(
&self,
_request: Request<LoadPolicyMappingRequest>,
) -> Result<Response<LoadPolicyMappingResponse>, Status> {
todo!()
}
async fn delete_user(&self, _request: Request<DeleteUserRequest>) -> Result<Response<DeleteUserResponse>, Status> {
todo!()
}
async fn delete_service_account(
&self,
_request: Request<DeleteServiceAccountRequest>,
) -> Result<Response<DeleteServiceAccountResponse>, Status> {
todo!()
}
async fn load_user(&self, _request: Request<LoadUserRequest>) -> Result<Response<LoadUserResponse>, Status> {
todo!()
}
async fn load_service_account(
&self,
_request: Request<LoadServiceAccountRequest>,
) -> Result<Response<LoadServiceAccountResponse>, Status> {
todo!()
}
async fn load_group(&self, _request: Request<LoadGroupRequest>) -> Result<Response<LoadGroupResponse>, Status> {
todo!()
}
async fn reload_site_replication_config(
&self,
_request: Request<ReloadSiteReplicationConfigRequest>,
) -> Result<Response<ReloadSiteReplicationConfigResponse>, Status> {
todo!()
}
async fn signal_service(&self, _request: Request<SignalServiceRequest>) -> Result<Response<SignalServiceResponse>, Status> {
todo!()
}
async fn background_heal_status(
&self,
_request: Request<BackgroundHealStatusRequest>,
) -> Result<Response<BackgroundHealStatusResponse>, Status> {
todo!()
}
async fn get_metacache_listing(
&self,
_request: Request<GetMetacacheListingRequest>,
) -> Result<Response<GetMetacacheListingResponse>, Status> {
todo!()
}
async fn update_metacache_listing(
&self,
_request: Request<UpdateMetacacheListingRequest>,
) -> Result<Response<UpdateMetacacheListingResponse>, Status> {
todo!()
}
async fn reload_pool_meta(
&self,
_request: Request<ReloadPoolMetaRequest>,
) -> Result<Response<ReloadPoolMetaResponse>, Status> {
todo!()
}
async fn stop_rebalance(&self, _request: Request<StopRebalanceRequest>) -> Result<Response<StopRebalanceResponse>, Status> {
todo!()
}
async fn load_rebalance_meta(
&self,
_request: Request<LoadRebalanceMetaRequest>,
) -> Result<Response<LoadRebalanceMetaResponse>, Status> {
todo!()
}
async fn load_transition_tier_config(
&self,
_request: Request<LoadTransitionTierConfigRequest>,
) -> Result<Response<LoadTransitionTierConfigResponse>, Status> {
todo!()
}
}

View File

@@ -1,6 +1,7 @@
mod admin;
mod config;
mod grpc;
mod peer_rest_client;
mod service;
mod storage;

View File

@@ -0,0 +1,47 @@
use std::io::Cursor;
use common::error::{Error, Result};
use ecstore::{admin_server_info::ServerProperties, store_api::StorageInfo};
use protos::{node_service_time_out_client, proto_gen::node_service::LocalStorageInfoRequest};
use rmp_serde::Deserializer;
use serde::Deserialize;
use tonic::Request;
struct PeerRestClient {
addr: String,
}
impl PeerRestClient {
pub fn new(url: url::Url) -> Self {
Self {
addr: format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()),
}
}
}
impl PeerRestClient {
pub async fn local_storage_info(&self) -> Result<StorageInfo> {
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::msg(err.to_string()))?;
let request = Request::new(LocalStorageInfoRequest { metrics: true });
let response = client.local_storage_info(request).await?.into_inner();
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::msg(msg));
}
return Err(Error::msg(""));
}
let info = response.storage_info;
let mut buf = Deserializer::new(Cursor::new(info));
let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
Ok(storage_info)
}
pub async fn server_info(&self) -> Request<ServerProperties> {
todo!()
}
}