mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
refactor(ecstore): DiskAPI::write_all use Bytes
This commit is contained in:
@@ -68,7 +68,7 @@ use uuid::Uuid;
|
||||
#[derive(Debug)]
|
||||
pub struct FormatInfo {
|
||||
pub id: Option<Uuid>,
|
||||
pub data: Vec<u8>,
|
||||
pub data: Bytes,
|
||||
pub file_info: Option<Metadata>,
|
||||
pub last_check: Option<OffsetDateTime>,
|
||||
}
|
||||
@@ -153,7 +153,7 @@ impl LocalDisk {
|
||||
|
||||
let format_info = FormatInfo {
|
||||
id,
|
||||
data: format_data,
|
||||
data: format_data.into(),
|
||||
file_info: format_meta,
|
||||
last_check: format_last_check,
|
||||
};
|
||||
@@ -629,7 +629,7 @@ impl LocalDisk {
|
||||
}
|
||||
|
||||
// write_all_public for trail
|
||||
async fn write_all_public(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
|
||||
async fn write_all_public(&self, volume: &str, path: &str, data: Bytes) -> Result<()> {
|
||||
if volume == RUSTFS_META_BUCKET && path == super::FORMAT_CONFIG_FILE {
|
||||
let mut format_info = self.format_info.write().await;
|
||||
format_info.data.clone_from(&data);
|
||||
@@ -637,7 +637,7 @@ impl LocalDisk {
|
||||
|
||||
let volume_dir = self.get_bucket_path(volume)?;
|
||||
|
||||
self.write_all_private(volume, path, data.into(), true, &volume_dir).await?;
|
||||
self.write_all_private(volume, path, data, true, &volume_dir).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1131,7 +1131,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
format_info.id = Some(disk_id);
|
||||
format_info.file_info = Some(file_meta);
|
||||
format_info.data = b;
|
||||
format_info.data = b.into();
|
||||
format_info.last_check = Some(OffsetDateTime::now_utc());
|
||||
|
||||
Ok(Some(disk_id))
|
||||
@@ -1151,7 +1151,7 @@ impl DiskAPI for LocalDisk {
|
||||
if volume == RUSTFS_META_BUCKET && path == super::FORMAT_CONFIG_FILE {
|
||||
let format_info = self.format_info.read().await;
|
||||
if !format_info.data.is_empty() {
|
||||
return Ok(format_info.data.clone());
|
||||
return Ok(format_info.data.to_vec());
|
||||
}
|
||||
}
|
||||
// TOFIX:
|
||||
@@ -1162,7 +1162,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> {
|
||||
self.write_all_public(volume, path, data).await
|
||||
}
|
||||
|
||||
@@ -1331,7 +1331,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
rename_all(&src_file_path, &dst_file_path, &dst_volume_dir).await?;
|
||||
|
||||
self.write_all(dst_volume, format!("{}.meta", dst_path).as_str(), meta.to_vec())
|
||||
self.write_all(dst_volume, format!("{}.meta", dst_path).as_str(), meta)
|
||||
.await?;
|
||||
|
||||
if let Some(parent) = src_file_path.parent() {
|
||||
@@ -1700,7 +1700,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
let new_dst_buf = xlmeta.marshal_msg()?;
|
||||
|
||||
self.write_all(src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str(), new_dst_buf)
|
||||
self.write_all(src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str(), new_dst_buf.into())
|
||||
.await?;
|
||||
if let Some((src_data_path, dst_data_path)) = has_data_dir_path.as_ref() {
|
||||
let no_inline = fi.data.is_none() && fi.size > 0;
|
||||
@@ -1902,7 +1902,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
let fm_data = meta.marshal_msg()?;
|
||||
|
||||
self.write_all(volume, format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str(), fm_data)
|
||||
self.write_all(volume, format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str(), fm_data.into())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
@@ -2461,8 +2461,8 @@ mod test {
|
||||
disk.make_volume("test-volume").await.unwrap();
|
||||
|
||||
// Test write and read operations
|
||||
let test_data = vec![1, 2, 3, 4, 5];
|
||||
disk.write_all("test-volume", "test-file.txt", test_data.clone())
|
||||
let test_data: Vec<u8> = vec![1, 2, 3, 4, 5];
|
||||
disk.write_all("test-volume", "test-file.txt", test_data.clone().into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -2587,7 +2587,7 @@ mod test {
|
||||
// Valid format info
|
||||
let valid_format_info = FormatInfo {
|
||||
id: Some(Uuid::new_v4()),
|
||||
data: vec![1, 2, 3],
|
||||
data: vec![1, 2, 3].into(),
|
||||
file_info: Some(fs::metadata(".").await.unwrap()),
|
||||
last_check: Some(now),
|
||||
};
|
||||
@@ -2596,7 +2596,7 @@ mod test {
|
||||
// Invalid format info (missing id)
|
||||
let invalid_format_info = FormatInfo {
|
||||
id: None,
|
||||
data: vec![1, 2, 3],
|
||||
data: vec![1, 2, 3].into(),
|
||||
file_info: Some(fs::metadata(".").await.unwrap()),
|
||||
last_check: Some(now),
|
||||
};
|
||||
@@ -2606,7 +2606,7 @@ mod test {
|
||||
let old_time = OffsetDateTime::now_utc() - time::Duration::seconds(10);
|
||||
let old_format_info = FormatInfo {
|
||||
id: Some(Uuid::new_v4()),
|
||||
data: vec![1, 2, 3],
|
||||
data: vec![1, 2, 3].into(),
|
||||
file_info: Some(fs::metadata(".").await.unwrap()),
|
||||
last_check: Some(old_time),
|
||||
};
|
||||
|
||||
@@ -364,7 +364,7 @@ impl DiskAPI for Disk {
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.write_all(volume, path, data).await,
|
||||
Disk::Remote(remote_disk) => remote_disk.write_all(volume, path, data).await,
|
||||
@@ -504,7 +504,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
|
||||
// ReadParts
|
||||
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
|
||||
// CleanAbandonedData
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()>;
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;
|
||||
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>>;
|
||||
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo>;
|
||||
async fn ns_scanner(
|
||||
|
||||
@@ -804,7 +804,7 @@ impl DiskAPI for RemoteDisk {
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
|
||||
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> {
|
||||
info!("write_all");
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
@@ -813,7 +813,7 @@ impl DiskAPI for RemoteDisk {
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
data: data.into(),
|
||||
data,
|
||||
});
|
||||
|
||||
let response = client.write_all(request).await?.into_inner();
|
||||
|
||||
@@ -232,7 +232,7 @@ impl HealingTracker {
|
||||
|
||||
if let Some(disk) = &self.disk {
|
||||
let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME);
|
||||
disk.write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), htracker_bytes)
|
||||
disk.write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), htracker_bytes.into())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -311,7 +311,7 @@ pub async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3
|
||||
let tmpfile = Uuid::new_v4().to_string();
|
||||
|
||||
let disk = disk.as_ref().unwrap();
|
||||
disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data.into_bytes())
|
||||
disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data.into_bytes().into())
|
||||
.await?;
|
||||
|
||||
disk.rename_file(RUSTFS_META_BUCKET, tmpfile.as_str(), RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
|
||||
|
||||
@@ -298,7 +298,7 @@ impl Node for NodeService {
|
||||
async fn write_all(&self, request: Request<WriteAllRequest>) -> Result<Response<WriteAllResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
if let Some(disk) = self.find_disk(&request.disk).await {
|
||||
match disk.write_all(&request.volume, &request.path, request.data.into()).await {
|
||||
match disk.write_all(&request.volume, &request.path, request.data).await {
|
||||
Ok(_) => Ok(tonic::Response::new(WriteAllResponse {
|
||||
success: true,
|
||||
error: None,
|
||||
|
||||
Reference in New Issue
Block a user