Merge branch 'upload' into erasure

This commit is contained in:
weisd
2024-07-05 17:30:00 +08:00
4 changed files with 110 additions and 16 deletions

View File

@@ -184,6 +184,43 @@ impl LocalDisk {
clean_tmp: true,
})
}
pub async fn rename_all(&self, src_data_path: &PathBuf, dst_data_path: &PathBuf, skip: &PathBuf) -> Result<()> {
if !skip.starts_with(&src_data_path) {
fs::create_dir_all(dst_data_path.parent().unwrap_or(Path::new("/"))).await?;
}
debug!(
"rename_all from \n {:?} \n to \n {:?} \n skip:{:?}",
&src_data_path, &dst_data_path, &skip
);
fs::rename(&src_data_path, &dst_data_path).await?;
Ok(())
}
pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf) -> Result<()> {
if is_root_path(base_path) || is_root_path(delete_path) {
return Ok(());
}
if !delete_path.starts_with(base_path) || base_path == delete_path {
return Ok(());
}
if delete_path.is_dir() {
fs::remove_dir(delete_path).await?;
} else {
fs::remove_file(delete_path).await?;
}
Ok(())
}
}
fn is_root_path(path: &PathBuf) -> bool {
path.components().count() == 1 && path.has_root()
}
// 过滤 std::io::ErrorKind::NotFound
@@ -337,36 +374,79 @@ impl DiskAPI for LocalDisk {
}
async fn rename_data(&self, src_volume: &str, src_path: &str, fi: &FileInfo, dst_volume: &str, dst_path: &str) -> Result<()> {
let src_volume_path = self.get_bucket_path(&src_volume)?;
if !skip_access_checks(&src_volume) {
let vol_path = self.get_bucket_path(&src_volume)?;
check_volume_exists(&vol_path).await?;
check_volume_exists(&src_volume_path).await?;
}
let dst_volume_path = self.get_bucket_path(&dst_volume)?;
if !skip_access_checks(&dst_volume) {
let vol_path = self.get_bucket_path(&dst_volume)?;
check_volume_exists(&vol_path).await?;
check_volume_exists(&dst_volume_path).await?;
}
let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?;
let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?;
// let mut data_dir = String::new();
// if !fi.is_remote() {
// data_dir = utils::path::retain_slash(&fi.data_dir);
// }
let (src_data_path, dst_data_path) = {
let mut data_dir = String::new();
if !fi.is_remote() {
data_dir = utils::path::retain_slash(fi.data_dir.to_string().as_str());
}
// if !data_dir.is_empty() {}
if !data_dir.is_empty() {
let src_data_path = self.get_object_path(
&src_volume,
utils::path::retain_slash(format!("{}/{}", &src_path, data_dir).as_str()).as_str(),
)?;
let dst_data_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?;
let curreng_data_path = self.get_object_path(&dst_volume, &dst_path);
(src_data_path, dst_data_path)
} else {
(PathBuf::new(), PathBuf::new())
}
};
let meta = FileMeta::new();
// let curreng_data_path = self.get_object_path(&dst_volume, &dst_path);
let mut meta = FileMeta::new();
let (dst_buf, _) = read_file_exists(&dst_file_path).await?;
let mut skipParent = dst_volume_path;
if !&dst_buf.is_empty() {
skipParent = PathBuf::from(&dst_file_path.parent().unwrap_or(Path::new("/")));
}
if !dst_buf.is_empty() {
meta = match FileMeta::unmarshal(dst_buf) {
Ok(m) => m,
Err(e) => FileMeta::new(),
}
// xl.load
// meta.from(dst_buf);
}
unimplemented!()
meta.add_version(fi.clone())?;
let fm_data = meta.marshal_msg()?;
fs::write(&src_file_path, fm_data).await?;
let no_inline = src_data_path.has_root() && fi.data.is_empty() && fi.size > 0;
if no_inline {
self.rename_all(&src_data_path, &dst_data_path, &skipParent).await?;
}
self.rename_all(&src_file_path, &dst_file_path, &skipParent).await?;
if src_volume != RUSTFS_META_MULTIPART_BUCKET {
fs::remove_dir(&src_file_path.parent().unwrap()).await?;
} else {
self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap()))
.await?;
}
Ok(())
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {

View File

@@ -32,12 +32,13 @@ impl Erasure {
block_size: usize,
data_size: usize,
write_quorum: usize,
) -> Result<()>
) -> Result<usize>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
W: AsyncWrite + Unpin,
{
let mut stream = ChunkedStream::new(body, data_size, block_size, true);
let mut total: usize = 0;
while let Some(result) = stream.next().await {
match result {
Ok(data) => {
@@ -46,6 +47,8 @@ impl Erasure {
let mut errs = Vec::new();
for (i, w) in writers.iter_mut().enumerate() {
total += blocks[i].len();
match w.write_all(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
@@ -59,7 +62,7 @@ impl Erasure {
}
}
Ok(())
Ok(total)
// loop {
// match rd.next().await {

View File

@@ -16,6 +16,8 @@ use crate::{
utils::hash,
};
const DEFAULT_INLINE_BLOCKS: usize = 128 * 1024;
#[derive(Debug)]
pub struct Sets {
pub id: Uuid,
@@ -176,7 +178,7 @@ impl StorageAPI for Sets {
let parts_metadata = vec![fi.clone(); disks.len()];
let (shuffle_disks, shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
let (shuffle_disks, mut shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
let mut writers = Vec::with_capacity(disks.len());
@@ -209,7 +211,7 @@ impl StorageAPI for Sets {
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks);
erasure
let w_size = erasure
.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum)
.await?;
@@ -235,6 +237,11 @@ impl StorageAPI for Sets {
// TODO: reduceWriteQuorumErrs
// evalDisks
for fi in shuffle_parts_metadata.iter_mut() {
fi.mod_time = OffsetDateTime::now_utc();
fi.size = w_size;
}
let rename_errs = self
.rename_data(
&shuffle_disks,
@@ -248,6 +255,8 @@ impl StorageAPI for Sets {
// TODO: reduceWriteQuorumErrs
debug!("put_object rename_errs:{:?}", rename_errs);
// self.commit_rename_data_dir(&shuffle_disks,&bucket,&object,)
Ok(())

View File

@@ -20,6 +20,7 @@ pub struct FileInfo {
pub data_dir: Uuid,
pub mod_time: OffsetDateTime,
pub size: usize,
pub data: Vec<u8>,
}
impl FileInfo {
@@ -38,6 +39,7 @@ impl Default for FileInfo {
data_dir: Uuid::nil(),
mod_time: OffsetDateTime::UNIX_EPOCH,
size: Default::default(),
data: Default::default(),
}
}
}