todo:rename_data

This commit is contained in:
weisd
2024-07-03 17:47:28 +08:00
parent fdfa68be2e
commit 5794017cdd
6 changed files with 144 additions and 22 deletions

View File

@@ -22,17 +22,21 @@ use uuid::Uuid;
use crate::{
disk_api::DiskAPI,
endpoint::{Endpoint, Endpoints},
file_meta::FileMeta,
format::{DistributionAlgoVersion, FormatV3},
store_api::FileInfo,
utils,
};
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp";
pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash";
pub const BUCKET_META_PREFIX: &str = "buckets";
pub const FORMAT_CONFIG_FILE: &str = "format.json";
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
const STORAGE_FORMAT_FILE: &str = "xl.meta";
pub struct DiskOption {
pub cleanup: bool,
@@ -87,7 +91,7 @@ pub struct LocalDisk {
pub format_data: Vec<u8>,
pub format_meta: Option<Metadata>,
pub format_path: PathBuf,
pub format_legacy: bool,
// pub format_legacy: bool, // drop
pub format_last_check: OffsetDateTime,
}
@@ -107,7 +111,7 @@ impl LocalDisk {
let (format_data, format_meta) = read_file_exists(&format_path).await?;
let mut id = Uuid::nil();
let mut format_legacy = false;
// let mut format_legacy = false;
let mut format_last_check = OffsetDateTime::UNIX_EPOCH;
if !format_data.is_empty() {
@@ -120,7 +124,7 @@ impl LocalDisk {
}
id = fm.erasure.this;
format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1;
// format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1;
format_last_check = OffsetDateTime::now_utc();
}
@@ -130,7 +134,7 @@ impl LocalDisk {
format_meta,
format_data: format_data,
format_path,
format_legacy,
// format_legacy,
format_last_check,
};
@@ -275,10 +279,12 @@ impl DiskAPI for LocalDisk {
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
if !skip_access_checks(&src_volume) {
check_volume_exists(&src_volume).await?;
let vol_path = self.get_bucket_path(&src_volume)?;
check_volume_exists(&vol_path).await?;
}
if !skip_access_checks(&dst_volume) {
check_volume_exists(&dst_volume).await?;
let vol_path = self.get_bucket_path(&dst_volume)?;
check_volume_exists(&vol_path).await?;
}
let srcp = self.get_object_path(&src_volume, &src_path)?;
@@ -330,6 +336,39 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn rename_data(&self, src_volume: &str, src_path: &str, fi: &FileInfo, dst_volume: &str, dst_path: &str) -> Result<()> {
if !skip_access_checks(&src_volume) {
let vol_path = self.get_bucket_path(&src_volume)?;
check_volume_exists(&vol_path).await?;
}
if !skip_access_checks(&dst_volume) {
let vol_path = self.get_bucket_path(&dst_volume)?;
check_volume_exists(&vol_path).await?;
}
let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?;
let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?;
// let mut data_dir = String::new();
// if !fi.is_remote() {
// data_dir = utils::path::retain_slash(&fi.data_dir);
// }
// if !data_dir.is_empty() {}
let curreng_data_path = self.get_object_path(&dst_volume, &dst_path);
let meta = FileMeta::new();
let (dst_buf, _) = read_file_exists(&dst_file_path).await?;
if !dst_buf.is_empty() {
// xl.load
// meta.from(dst_buf);
}
unimplemented!()
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
for vol in volumes {
if let Err(e) = self.make_volume(vol).await {

View File

@@ -4,6 +4,8 @@ use anyhow::Result;
use bytes::Bytes;
use tokio::io::DuplexStream;
use crate::store_api::FileInfo;
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
@@ -12,6 +14,14 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>;
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, fileSize: usize, r: DuplexStream) -> Result<()>;
async fn rename_data(
&self,
src_volume: &str,
src_path: &str,
file_info: &FileInfo,
dst_volume: &str,
dst_path: &str,
) -> Result<()>;
async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>;
async fn make_volume(&self, volume: &str) -> Result<()>;

View File

@@ -7,6 +7,7 @@ mod ellipses;
mod endpoint;
mod erasure;
pub mod error;
mod file_meta;
mod format;
mod peer;
mod sets;

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use anyhow::Result;
use anyhow::{Error, Result};
use futures::{future::join_all, AsyncWrite, StreamExt};
use time::OffsetDateTime;
@@ -91,7 +91,51 @@ impl Sets {
}
}
async fn rename_data(&self) -> Result<()> {
async fn rename_data(
&self,
disks: &Vec<Option<DiskStore>>,
src_bucket: &str,
src_object: &str,
file_infos: &Vec<FileInfo>,
dst_bucket: &str,
dst_object: &str,
// write_quorum: usize,
) -> Vec<Option<Error>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
let disk = disk.as_ref().unwrap();
let file_info = &file_infos[i];
futures.push(async move {
disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object)
.await
})
}
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
errors
}
async fn commit_rename_data_dir(
&self,
disks: &Vec<Option<DiskStore>>,
bucket: &str,
object: &str,
data_dir: &str,
// write_quorum: usize,
) -> Vec<Option<Error>> {
unimplemented!()
}
}
@@ -148,6 +192,8 @@ impl StorageAPI for Sets {
let disk = disk.as_ref().unwrap().clone();
let tmp_object = tmp_object.clone();
// TODO: save small file in fileinfo.data instead of write file;
futures.push(async move {
disk.create_file("", RUSTFS_META_TMP_BUCKET, tmp_object.as_str(), data.content_length, reader)
.await
@@ -189,6 +235,21 @@ impl StorageAPI for Sets {
// TODO: reduceWriteQuorumErrs
// evalDisks
let rename_errs = self
.rename_data(
&shuffle_disks,
RUSTFS_META_TMP_BUCKET,
tmp_dir.as_str(),
&shuffle_parts_metadata,
&bucket,
&object,
)
.await;
// TODO: reduceWriteQuorumErrs
// self.commit_rename_data_dir(&shuffle_disks,&bucket,&object,)
Ok(())
}
}

View File

@@ -19,6 +19,13 @@ pub struct FileInfo {
pub mod_time: OffsetDateTime,
}
impl FileInfo {
pub fn is_remote(&self) -> bool {
// TODO: when lifecycle
false
}
}
impl Default for FileInfo {
fn default() -> Self {
Self {

View File

@@ -1,5 +1,6 @@
const GLOBAL_DIR_SUFFIX: &str = "__XLDIR__";
const SLASH_SEPARATOR: char = '/';
const SLASH_SEPARATOR: &str = "/";
pub fn has_suffix(s: &str, suffix: &str) -> bool {
if cfg!(target_os = "windows") {
@@ -10,12 +11,8 @@ pub fn has_suffix(s: &str, suffix: &str) -> bool {
}
pub fn encode_dir_object(object: &str) -> String {
if has_suffix(object, &SLASH_SEPARATOR.to_string()) {
format!(
"{}{}",
object.trim_end_matches(SLASH_SEPARATOR),
GLOBAL_DIR_SUFFIX
)
if has_suffix(object, SLASH_SEPARATOR) {
format!("{}{}", object.trim_end_matches(SLASH_SEPARATOR), GLOBAL_DIR_SUFFIX)
} else {
object.to_string()
}
@@ -23,12 +20,19 @@ pub fn encode_dir_object(object: &str) -> String {
pub fn decode_dir_object(object: &str) -> String {
if has_suffix(object, GLOBAL_DIR_SUFFIX) {
format!(
"{}{}",
object.trim_end_matches(GLOBAL_DIR_SUFFIX),
SLASH_SEPARATOR
)
format!("{}{}", object.trim_end_matches(GLOBAL_DIR_SUFFIX), SLASH_SEPARATOR)
} else {
object.to_string()
}
}
pub fn retain_slash(s: &str) -> String {
if s.is_empty() {
return s.to_string();
}
if s.ends_with(SLASH_SEPARATOR) {
s.to_string()
} else {
format!("{}{}", s, SLASH_SEPARATOR)
}
}