mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
test:FileMeta
This commit is contained in:
@@ -341,32 +341,61 @@ impl DiskAPI for LocalDisk {
|
||||
let vol_path = self.get_bucket_path(&src_volume)?;
|
||||
check_volume_exists(&vol_path).await?;
|
||||
}
|
||||
|
||||
let dst_volume_path = self.get_bucket_path(&dst_volume)?;
|
||||
if !skip_access_checks(&dst_volume) {
|
||||
let vol_path = self.get_bucket_path(&dst_volume)?;
|
||||
check_volume_exists(&vol_path).await?;
|
||||
check_volume_exists(&dst_volume_path).await?;
|
||||
}
|
||||
|
||||
let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?;
|
||||
let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?;
|
||||
|
||||
// let mut data_dir = String::new();
|
||||
// if !fi.is_remote() {
|
||||
// data_dir = utils::path::retain_slash(&fi.data_dir);
|
||||
// }
|
||||
let (src_data_path, dst_data_path) = {
|
||||
let mut data_dir = String::new();
|
||||
if !fi.is_remote() {
|
||||
data_dir = utils::path::retain_slash(fi.data_dir.to_string().as_str());
|
||||
}
|
||||
|
||||
// if !data_dir.is_empty() {}
|
||||
if !data_dir.is_empty() {
|
||||
let src_data_path = self.get_object_path(
|
||||
&src_volume,
|
||||
utils::path::retain_slash(format!("{}/{}", &src_path, data_dir).as_str()).as_str(),
|
||||
)?;
|
||||
let dst_data_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?;
|
||||
|
||||
(src_data_path, dst_data_path)
|
||||
} else {
|
||||
(PathBuf::new(), PathBuf::new())
|
||||
}
|
||||
};
|
||||
|
||||
let curreng_data_path = self.get_object_path(&dst_volume, &dst_path);
|
||||
|
||||
let meta = FileMeta::new();
|
||||
let mut meta = FileMeta::new();
|
||||
|
||||
let (dst_buf, _) = read_file_exists(&dst_file_path).await?;
|
||||
|
||||
let mut skipParent = dst_volume_path.as_path();
|
||||
if !&dst_buf.is_empty() {
|
||||
skipParent = skipParent.parent().unwrap_or(Path::new("/"));
|
||||
}
|
||||
|
||||
if !dst_buf.is_empty() {
|
||||
meta = match FileMeta::unmarshal(dst_buf) {
|
||||
Ok(m) => m,
|
||||
Err(e) => FileMeta::new(),
|
||||
}
|
||||
// xl.load
|
||||
// meta.from(dst_buf);
|
||||
}
|
||||
|
||||
unimplemented!()
|
||||
meta.add_version(fi.clone())?;
|
||||
|
||||
let fm_data = meta.marshal_msg()?;
|
||||
|
||||
fs::write(&src_file_path, fm_data).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
|
||||
|
||||
@@ -32,12 +32,13 @@ impl Erasure {
|
||||
block_size: usize,
|
||||
data_size: usize,
|
||||
write_quorum: usize,
|
||||
) -> Result<()>
|
||||
) -> Result<usize>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
|
||||
W: AsyncWrite + Unpin,
|
||||
{
|
||||
let mut stream = ChunkedStream::new(body, data_size, block_size, true);
|
||||
let mut total: usize = 0;
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(data) => {
|
||||
@@ -46,6 +47,8 @@ impl Erasure {
|
||||
let mut errs = Vec::new();
|
||||
|
||||
for (i, w) in writers.iter_mut().enumerate() {
|
||||
total += blocks[i].len();
|
||||
|
||||
match w.write_all(blocks[i].as_ref()).await {
|
||||
Ok(_) => errs.push(None),
|
||||
Err(e) => errs.push(Some(e)),
|
||||
@@ -59,7 +62,7 @@ impl Erasure {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(total)
|
||||
|
||||
// loop {
|
||||
// match rd.next().await {
|
||||
|
||||
@@ -176,7 +176,7 @@ impl StorageAPI for Sets {
|
||||
|
||||
let parts_metadata = vec![fi.clone(); disks.len()];
|
||||
|
||||
let (shuffle_disks, shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
|
||||
let (shuffle_disks, mut shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
|
||||
|
||||
let mut writers = Vec::with_capacity(disks.len());
|
||||
|
||||
@@ -209,7 +209,7 @@ impl StorageAPI for Sets {
|
||||
|
||||
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks);
|
||||
|
||||
erasure
|
||||
let w_size = erasure
|
||||
.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum)
|
||||
.await?;
|
||||
|
||||
@@ -235,6 +235,11 @@ impl StorageAPI for Sets {
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
// evalDisks
|
||||
|
||||
for fi in shuffle_parts_metadata.iter_mut() {
|
||||
fi.mod_time = OffsetDateTime::now_utc();
|
||||
fi.size = w_size;
|
||||
}
|
||||
|
||||
let rename_errs = self
|
||||
.rename_data(
|
||||
&shuffle_disks,
|
||||
@@ -248,6 +253,8 @@ impl StorageAPI for Sets {
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
|
||||
debug!("put_object rename_errs:{:?}", rename_errs);
|
||||
|
||||
// self.commit_rename_data_dir(&shuffle_disks,&bucket,&object,)
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user