feat(ecstore): LocalDisk::write_all_internal use InternalBuf

This commit is contained in:
Nugine
2025-06-15 21:01:38 +08:00
parent 3c5e20b633
commit 2f3dbac59b

View File

@@ -83,6 +83,12 @@ impl FormatInfo {
}
}
/// A helper enum to handle internal buffer types for writing data.
pub enum InternalBuf<'a> {
Ref(&'a [u8]),
Owned(Bytes),
}
pub struct LocalDisk {
pub root: PathBuf,
pub format_path: PathBuf,
@@ -596,8 +602,14 @@ impl LocalDisk {
let volume_dir = self.get_bucket_path(volume)?;
self.write_all_private(volume, format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str(), &buf, true, volume_dir)
.await?;
self.write_all_private(
volume,
format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str(),
buf.into(),
true,
&volume_dir,
)
.await?;
Ok(())
}
@@ -610,7 +622,8 @@ impl LocalDisk {
let tmp_volume_dir = self.get_bucket_path(super::RUSTFS_META_TMP_BUCKET)?;
let tmp_file_path = tmp_volume_dir.join(Path::new(Uuid::new_v4().to_string().as_str()));
self.write_all_internal(&tmp_file_path, buf, sync, tmp_volume_dir).await?;
self.write_all_internal(&tmp_file_path, InternalBuf::Ref(buf), sync, &tmp_volume_dir)
.await?;
rename_all(tmp_file_path, file_path, volume_dir).await
}
@@ -624,47 +637,46 @@ impl LocalDisk {
let volume_dir = self.get_bucket_path(volume)?;
self.write_all_private(volume, path, &data, true, volume_dir).await?;
self.write_all_private(volume, path, data.into(), true, &volume_dir).await?;
Ok(())
}
// write_all_private with check_path_length
#[tracing::instrument(level = "debug", skip_all)]
pub async fn write_all_private(
&self,
volume: &str,
path: &str,
buf: &[u8],
sync: bool,
skip_parent: impl AsRef<Path>,
) -> Result<()> {
pub async fn write_all_private(&self, volume: &str, path: &str, buf: Bytes, sync: bool, skip_parent: &Path) -> Result<()> {
let volume_dir = self.get_bucket_path(volume)?;
let file_path = volume_dir.join(Path::new(&path));
check_path_length(file_path.to_string_lossy().as_ref())?;
self.write_all_internal(file_path, buf, sync, skip_parent).await
self.write_all_internal(&file_path, InternalBuf::Owned(buf), sync, skip_parent)
.await
}
// write_all_internal do write file
pub async fn write_all_internal(
&self,
file_path: impl AsRef<Path>,
data: impl AsRef<[u8]>,
file_path: &Path,
data: InternalBuf<'_>,
sync: bool,
skip_parent: impl AsRef<Path>,
skip_parent: &Path,
) -> Result<()> {
let flags = O_CREATE | O_WRONLY | O_TRUNC;
let mut f = {
if sync {
// TODO: suport sync
self.open_file(file_path.as_ref(), flags, skip_parent.as_ref()).await?
self.open_file(file_path, flags, skip_parent).await?
} else {
self.open_file(file_path.as_ref(), flags, skip_parent.as_ref()).await?
self.open_file(file_path, flags, skip_parent).await?
}
};
f.write_all(data.as_ref()).await.map_err(to_file_error)?;
let data: &[u8] = match &data {
InternalBuf::Ref(buf) => buf,
InternalBuf::Owned(buf) => buf.as_ref(),
};
f.write_all(data).await.map_err(to_file_error)?;
Ok(())
}
@@ -1691,7 +1703,7 @@ impl DiskAPI for LocalDisk {
.write_all_private(
dst_volume,
format!("{}/{}/{}", &dst_path, &old_data_dir.to_string(), STORAGE_FORMAT_FILE).as_str(),
&dst_buf,
dst_buf.into(),
true,
&skip_parent,
)