From 5794017cdd89ee8cee0141b930f52197f9cdfaaa Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 3 Jul 2024 17:47:28 +0800 Subject: [PATCH] todo:rename_data --- ecstore/src/disk.rs | 55 ++++++++++++++++++++++++++++----- ecstore/src/disk_api.rs | 10 ++++++ ecstore/src/lib.rs | 1 + ecstore/src/sets.rs | 65 +++++++++++++++++++++++++++++++++++++-- ecstore/src/store_api.rs | 7 +++++ ecstore/src/utils/path.rs | 28 +++++++++-------- 6 files changed, 144 insertions(+), 22 deletions(-) diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 95f78a9e..22a42268 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -22,17 +22,21 @@ use uuid::Uuid; use crate::{ disk_api::DiskAPI, endpoint::{Endpoint, Endpoints}, + file_meta::FileMeta, format::{DistributionAlgoVersion, FormatV3}, + store_api::FileInfo, + utils, }; +pub type DiskStore = Arc>; + pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys"; pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart"; pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp"; pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash"; pub const BUCKET_META_PREFIX: &str = "buckets"; pub const FORMAT_CONFIG_FILE: &str = "format.json"; - -pub type DiskStore = Arc>; +const STORAGE_FORMAT_FILE: &str = "xl.meta"; pub struct DiskOption { pub cleanup: bool, @@ -87,7 +91,7 @@ pub struct LocalDisk { pub format_data: Vec, pub format_meta: Option, pub format_path: PathBuf, - pub format_legacy: bool, + // pub format_legacy: bool, // drop pub format_last_check: OffsetDateTime, } @@ -107,7 +111,7 @@ impl LocalDisk { let (format_data, format_meta) = read_file_exists(&format_path).await?; let mut id = Uuid::nil(); - let mut format_legacy = false; + // let mut format_legacy = false; let mut format_last_check = OffsetDateTime::UNIX_EPOCH; if !format_data.is_empty() { @@ -120,7 +124,7 @@ impl LocalDisk { } id = fm.erasure.this; - format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1; + // format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1; format_last_check = OffsetDateTime::now_utc(); } @@ -130,7 +134,7 @@ impl LocalDisk { format_meta, format_data: format_data, format_path, - format_legacy, + // format_legacy, format_last_check, }; @@ -275,10 +279,12 @@ impl DiskAPI for LocalDisk { async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { if !skip_access_checks(&src_volume) { - check_volume_exists(&src_volume).await?; + let vol_path = self.get_bucket_path(&src_volume)?; + check_volume_exists(&vol_path).await?; } if !skip_access_checks(&dst_volume) { - check_volume_exists(&dst_volume).await?; + let vol_path = self.get_bucket_path(&dst_volume)?; + check_volume_exists(&vol_path).await?; } let srcp = self.get_object_path(&src_volume, &src_path)?; @@ -330,6 +336,39 @@ impl DiskAPI for LocalDisk { Ok(()) } + async fn rename_data(&self, src_volume: &str, src_path: &str, fi: &FileInfo, dst_volume: &str, dst_path: &str) -> Result<()> { + if !skip_access_checks(&src_volume) { + let vol_path = self.get_bucket_path(&src_volume)?; + check_volume_exists(&vol_path).await?; + } + if !skip_access_checks(&dst_volume) { + let vol_path = self.get_bucket_path(&dst_volume)?; + check_volume_exists(&vol_path).await?; + } + + let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?; + let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?; + + // let mut data_dir = String::new(); + // if !fi.is_remote() { + // data_dir = utils::path::retain_slash(&fi.data_dir); + // } + + // if !data_dir.is_empty() {} + + let curreng_data_path = self.get_object_path(&dst_volume, &dst_path); + + let meta = FileMeta::new(); + + let (dst_buf, _) = read_file_exists(&dst_file_path).await?; + if !dst_buf.is_empty() { + // xl.load + // meta.from(dst_buf); + } + + unimplemented!() + } + async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { for vol in volumes { if let Err(e) = self.make_volume(vol).await { diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 476e0e52..cf3da89d 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -4,6 +4,8 @@ use anyhow::Result; use bytes::Bytes; use tokio::io::DuplexStream; +use crate::store_api::FileInfo; + #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { fn is_local(&self) -> bool; @@ -12,6 +14,14 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>; async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; async fn create_file(&self, origvolume: &str, volume: &str, path: &str, fileSize: usize, r: DuplexStream) -> Result<()>; + async fn rename_data( + &self, + src_volume: &str, + src_path: &str, + file_info: &FileInfo, + dst_volume: &str, + dst_path: &str, + ) -> Result<()>; async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>; async fn make_volume(&self, volume: &str) -> Result<()>; diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 257a702d..7335ba9a 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -7,6 +7,7 @@ mod ellipses; mod endpoint; mod erasure; pub mod error; +mod file_meta; mod format; mod peer; mod sets; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 393bd8c1..d7d1cd20 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::Result; +use anyhow::{Error, Result}; use futures::{future::join_all, AsyncWrite, StreamExt}; use time::OffsetDateTime; @@ -91,7 +91,51 @@ impl Sets { } } - async fn rename_data(&self) -> Result<()> { + async fn rename_data( + &self, + disks: &Vec>, + src_bucket: &str, + src_object: &str, + file_infos: &Vec, + dst_bucket: &str, + dst_object: &str, + // write_quorum: usize, + ) -> Vec> { + let mut futures = Vec::with_capacity(disks.len()); + + for (i, disk) in disks.iter().enumerate() { + let disk = disk.as_ref().unwrap(); + let file_info = &file_infos[i]; + futures.push(async move { + disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object) + .await + }) + } + + let mut errors = Vec::with_capacity(disks.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(_) => { + errors.push(None); + } + Err(e) => { + errors.push(Some(e)); + } + } + } + errors + } + + async fn commit_rename_data_dir( + &self, + disks: &Vec>, + bucket: &str, + object: &str, + data_dir: &str, + // write_quorum: usize, + ) -> Vec> { unimplemented!() } } @@ -148,6 +192,8 @@ impl StorageAPI for Sets { let disk = disk.as_ref().unwrap().clone(); let tmp_object = tmp_object.clone(); + // TODO: save small file in fileinfo.data instead of write file; + futures.push(async move { disk.create_file("", RUSTFS_META_TMP_BUCKET, tmp_object.as_str(), data.content_length, reader) .await @@ -189,6 +235,21 @@ impl StorageAPI for Sets { // TODO: reduceWriteQuorumErrs // evalDisks + let rename_errs = self + .rename_data( + &shuffle_disks, + RUSTFS_META_TMP_BUCKET, + tmp_dir.as_str(), + &shuffle_parts_metadata, + &bucket, + &object, + ) + .await; + + // TODO: reduceWriteQuorumErrs + + // self.commit_rename_data_dir(&shuffle_disks,&bucket,&object,) + Ok(()) } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 5ce0c67d..e488db4d 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -19,6 +19,13 @@ pub struct FileInfo { pub mod_time: OffsetDateTime, } +impl FileInfo { + pub fn is_remote(&self) -> bool { + // TODO: when lifecycle + false + } +} + impl Default for FileInfo { fn default() -> Self { Self { diff --git a/ecstore/src/utils/path.rs b/ecstore/src/utils/path.rs index 40559577..ab93e949 100644 --- a/ecstore/src/utils/path.rs +++ b/ecstore/src/utils/path.rs @@ -1,5 +1,6 @@ const GLOBAL_DIR_SUFFIX: &str = "__XLDIR__"; -const SLASH_SEPARATOR: char = '/'; + +const SLASH_SEPARATOR: &str = "/"; pub fn has_suffix(s: &str, suffix: &str) -> bool { if cfg!(target_os = "windows") { @@ -10,12 +11,8 @@ pub fn has_suffix(s: &str, suffix: &str) -> bool { } pub fn encode_dir_object(object: &str) -> String { - if has_suffix(object, &SLASH_SEPARATOR.to_string()) { - format!( - "{}{}", - object.trim_end_matches(SLASH_SEPARATOR), - GLOBAL_DIR_SUFFIX - ) + if has_suffix(object, SLASH_SEPARATOR) { + format!("{}{}", object.trim_end_matches(SLASH_SEPARATOR), GLOBAL_DIR_SUFFIX) } else { object.to_string() } @@ -23,12 +20,19 @@ pub fn encode_dir_object(object: &str) -> String { pub fn decode_dir_object(object: &str) -> String { if has_suffix(object, GLOBAL_DIR_SUFFIX) { - format!( - "{}{}", - object.trim_end_matches(GLOBAL_DIR_SUFFIX), - SLASH_SEPARATOR - ) + format!("{}{}", object.trim_end_matches(GLOBAL_DIR_SUFFIX), SLASH_SEPARATOR) } else { object.to_string() } } + +pub fn retain_slash(s: &str) -> String { + if s.is_empty() { + return s.to_string(); + } + if s.ends_with(SLASH_SEPARATOR) { + s.to_string() + } else { + format!("{}{}", s, SLASH_SEPARATOR) + } +}