From 91c3fc2fe1cb2e0f0373b22e2a32d7e759d314af Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 5 Jul 2024 11:48:34 +0800 Subject: [PATCH] test:FileMeta --- ecstore/src/disk.rs | 47 ++++++++++++++++++++++++++++++++++-------- ecstore/src/erasure.rs | 7 +++++-- ecstore/src/sets.rs | 11 ++++++++-- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 22a42268..ab1597b4 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -341,32 +341,61 @@ impl DiskAPI for LocalDisk { let vol_path = self.get_bucket_path(&src_volume)?; check_volume_exists(&vol_path).await?; } + + let dst_volume_path = self.get_bucket_path(&dst_volume)?; if !skip_access_checks(&dst_volume) { - let vol_path = self.get_bucket_path(&dst_volume)?; - check_volume_exists(&vol_path).await?; + check_volume_exists(&dst_volume_path).await?; } let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?; let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?; - // let mut data_dir = String::new(); - // if !fi.is_remote() { - // data_dir = utils::path::retain_slash(&fi.data_dir); - // } + let (src_data_path, dst_data_path) = { + let mut data_dir = String::new(); + if !fi.is_remote() { + data_dir = utils::path::retain_slash(fi.data_dir.to_string().as_str()); + } - // if !data_dir.is_empty() {} + if !data_dir.is_empty() { + let src_data_path = self.get_object_path( + &src_volume, + utils::path::retain_slash(format!("{}/{}", &src_path, data_dir).as_str()).as_str(), + )?; + let dst_data_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?; + + (src_data_path, dst_data_path) + } else { + (PathBuf::new(), PathBuf::new()) + } + }; let curreng_data_path = self.get_object_path(&dst_volume, &dst_path); - let meta = FileMeta::new(); + let mut meta = FileMeta::new(); let (dst_buf, _) = read_file_exists(&dst_file_path).await?; + + let mut skipParent = dst_volume_path.as_path(); + if !&dst_buf.is_empty() { + skipParent = skipParent.parent().unwrap_or(Path::new("/")); + } + if !dst_buf.is_empty() { + meta = match FileMeta::unmarshal(dst_buf) { + Ok(m) => m, + Err(e) => FileMeta::new(), + } // xl.load // meta.from(dst_buf); } - unimplemented!() + meta.add_version(fi.clone())?; + + let fm_data = meta.marshal_msg()?; + + fs::write(&src_file_path, fm_data).await?; + + Ok(()) } async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 2d9cffbe..b560a443 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -32,12 +32,13 @@ impl Erasure { block_size: usize, data_size: usize, write_quorum: usize, - ) -> Result<()> + ) -> Result where S: Stream> + Send + Sync + 'static, W: AsyncWrite + Unpin, { let mut stream = ChunkedStream::new(body, data_size, block_size, true); + let mut total: usize = 0; while let Some(result) = stream.next().await { match result { Ok(data) => { @@ -46,6 +47,8 @@ impl Erasure { let mut errs = Vec::new(); for (i, w) in writers.iter_mut().enumerate() { + total += blocks[i].len(); + match w.write_all(blocks[i].as_ref()).await { Ok(_) => errs.push(None), Err(e) => errs.push(Some(e)), @@ -59,7 +62,7 @@ impl Erasure { } } - Ok(()) + Ok(total) // loop { // match rd.next().await { diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 4096b6eb..3f981449 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -176,7 +176,7 @@ impl StorageAPI for Sets { let parts_metadata = vec![fi.clone(); disks.len()]; - let (shuffle_disks, shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi); + let (shuffle_disks, mut shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi); let mut writers = Vec::with_capacity(disks.len()); @@ -209,7 +209,7 @@ impl StorageAPI for Sets { let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks); - erasure + let w_size = erasure .encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum) .await?; @@ -235,6 +235,11 @@ impl StorageAPI for Sets { // TODO: reduceWriteQuorumErrs // evalDisks + for fi in shuffle_parts_metadata.iter_mut() { + fi.mod_time = OffsetDateTime::now_utc(); + fi.size = w_size; + } + let rename_errs = self .rename_data( &shuffle_disks, @@ -248,6 +253,8 @@ impl StorageAPI for Sets { // TODO: reduceWriteQuorumErrs + debug!("put_object rename_errs:{:?}", rename_errs); + // self.commit_rename_data_dir(&shuffle_disks,&bucket,&object,) Ok(())