test:encode

This commit is contained in:
weisd
2024-07-02 18:09:00 +08:00
parent 5eb6d52daf
commit d1a62f6697
8 changed files with 126 additions and 46 deletions

View File

@@ -6,11 +6,16 @@ use std::{
use anyhow::{Error, Result};
use bytes::Bytes;
use futures::future::join_all;
use futures::{future::join_all, Stream};
use path_absolutize::Absolutize;
use s3s::StdError;
use time::OffsetDateTime;
use tokio::fs::{self, File};
use tokio::io::ErrorKind;
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt};
use tokio::io::{AsyncWrite, BufWriter, ErrorKind};
use tokio::{
fs::{self, File},
io::DuplexStream,
};
use uuid::Uuid;
use crate::{
@@ -43,10 +48,7 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
}
}
pub async fn init_disks(
eps: &Endpoints,
opt: &DiskOption,
) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.len());
for ep in eps.iter() {
@@ -141,12 +143,7 @@ impl LocalDisk {
let multipart = format!("{}/{}", RUSTFS_META_BUCKET, "multipart");
let config = format!("{}/{}", RUSTFS_META_BUCKET, "config");
let tmp = format!("{}/{}", RUSTFS_META_BUCKET, "tmp");
let defaults = vec![
buckets.as_str(),
multipart.as_str(),
config.as_str(),
tmp.as_str(),
];
let defaults = vec![buckets.as_str(), multipart.as_str(), config.as_str(), tmp.as_str()];
self.make_volumes(defaults).await
}
@@ -166,12 +163,19 @@ impl LocalDisk {
self.resolve_abs_path(dir)
}
// pub async fn load_format(&self) -> Result<Option<FormatV3>> {
// let p = self.get_object_path(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)?;
// let content = fs::read(&p).await?;
// unimplemented!()
// }
/// Write to the filesystem atomically.
/// This is done by first writing to a temporary location and then moving the file.
pub(crate) async fn prepare_file_write<'a>(&self, path: &'a PathBuf) -> Result<FileWriter<'a>> {
let tmp_path = self.get_object_path(RUSTFS_META_TMP_BUCKET, Uuid::new_v4().to_string().as_str())?;
let file = File::create(&path).await?;
let writer = BufWriter::new(file);
Ok(FileWriter {
tmp_path,
dest_path: path,
writer,
clean_tmp: true,
})
}
}
// 过滤 std::io::ErrorKind::NotFound
@@ -265,13 +269,7 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn rename_file(
&self,
src_volume: &str,
src_path: &str,
dst_volume: &str,
dst_path: &str,
) -> Result<()> {
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
if !skip_access_checks(&src_volume) {
check_volume_exists(&src_volume).await?;
}
@@ -310,6 +308,18 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn CreateFile(&self, origvolume: &str, volume: &str, path: &str, fileSize: usize, mut r: DuplexStream) -> Result<()> {
let fpath = self.get_object_path(volume, path)?;
let mut writer = self.prepare_file_write(&fpath).await?;
io::copy(&mut r, writer.writer()).await?;
writer.done().await?;
Ok(())
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
for vol in volumes {
if let Err(e) = self.make_volume(vol).await {
@@ -340,6 +350,24 @@ impl DiskAPI for LocalDisk {
}
}
// pub async fn copy_bytes<S, W>(mut stream: S, writer: &mut W) -> Result<u64>
// where
// S: Stream<Item = Result<Bytes, StdError>> + Unpin,
// W: AsyncWrite + Unpin,
// {
// let mut nwritten: u64 = 0;
// while let Some(result) = stream.next().await {
// let bytes = match result {
// Ok(x) => x,
// Err(e) => return Err(Error::new(e)),
// };
// writer.write_all(&bytes).await?;
// nwritten += bytes.len() as u64;
// }
// writer.flush().await?;
// Ok(nwritten)
// }
// pub struct RemoteDisk {}
// impl RemoteDisk {
@@ -462,6 +490,45 @@ impl PartialEq for DiskError {
}
}
pub(crate) struct FileWriter<'a> {
tmp_path: PathBuf,
dest_path: &'a Path,
writer: BufWriter<File>,
clean_tmp: bool,
}
impl<'a> FileWriter<'a> {
pub(crate) fn tmp_path(&self) -> &Path {
&self.tmp_path
}
pub(crate) fn dest_path(&self) -> &'a Path {
self.dest_path
}
pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
&mut self.writer
}
pub(crate) async fn done(mut self) -> Result<()> {
if let Some(final_dir_path) = self.dest_path().parent() {
fs::create_dir_all(&final_dir_path).await?;
}
fs::rename(&self.tmp_path, self.dest_path()).await?;
self.clean_tmp = false;
Ok(())
}
}
impl<'a> Drop for FileWriter<'a> {
fn drop(&mut self) {
if self.clean_tmp {
let _ = std::fs::remove_file(&self.tmp_path);
}
}
}
#[cfg(test)]
mod test {
@@ -500,9 +567,7 @@ mod test {
let disk = LocalDisk::new(&ep, false).await.unwrap();
let tmpp = disk
.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET))
.unwrap();
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
println!("ppp :{:?}", &tmpp);

View File

@@ -2,6 +2,7 @@ use std::fmt::Debug;
use anyhow::Result;
use bytes::Bytes;
use tokio::io::DuplexStream;
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
@@ -9,13 +10,8 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;
async fn rename_file(
&self,
src_volume: &str,
src_path: &str,
dst_volume: &str,
dst_path: &str,
) -> Result<()>;
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>;
async fn CreateFile(&self, origvolume: &str, volume: &str, path: &str, fileSize: usize, r: DuplexStream) -> Result<()>;
async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>;
async fn make_volume(&self, volume: &str) -> Result<()>;

View File

@@ -6,6 +6,7 @@ use reed_solomon_erasure::galois_8::ReedSolomon;
use s3s::StdError;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tracing::debug;
use crate::chunk_stream::ChunkedStream;
@@ -51,6 +52,7 @@ impl Erasure {
}
}
debug!("encode_data write errs:{:?}", errs);
// TODO: reduceWriteQuorumErrs
}
Err(e) => return Err(anyhow!(e)),

View File

@@ -2,8 +2,9 @@ use std::sync::Arc;
use anyhow::Result;
use futures::AsyncWrite;
use futures::{AsyncWrite, StreamExt};
use time::OffsetDateTime;
use tracing::debug;
use uuid::Uuid;
use crate::{
@@ -107,7 +108,7 @@ impl StorageAPI for Sets {
unimplemented!()
}
async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> {
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()> {
let disks = self.get_disks_by_key(object);
let mut parity_drives = self.partiy_count;
@@ -131,15 +132,31 @@ impl StorageAPI for Sets {
let mut writers = Vec::with_capacity(disks.len());
for disk in disks.iter() {
for disk in shuffle_disks.iter() {
let (reader, writer) = tokio::io::duplex(fi.erasure.block_size);
let disk = disk.as_ref().unwrap().clone();
let bucket = bucket.to_string();
let object = object.to_string();
tokio::spawn(async move {
debug!("do createfile");
match disk
.CreateFile("", bucket.as_str(), object.as_str(), data.content_length, reader)
.await
{
Ok(_) => (),
Err(e) => debug!("creatfile err :{:?}", e),
}
});
writers.push(writer);
}
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks);
erasure.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum);
erasure
.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum)
.await?;
unimplemented!()
}

View File

@@ -126,14 +126,14 @@ impl StorageAPI for ECStore {
let reader = PutObjReader::new(StreamingBlob::from(body), content_len);
self.put_object(RUSTFS_META_BUCKET, &file_path, &reader, &ObjectOptions { max_parity: true })
self.put_object(RUSTFS_META_BUCKET, &file_path, reader, ObjectOptions { max_parity: true })
.await?;
// TODO: toObjectErr
Ok(())
}
async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> {
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);

View File

@@ -139,5 +139,5 @@ pub struct ObjectOptions {
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()>;
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()>;
}

View File

@@ -169,7 +169,7 @@ impl S3 for FS {
let reader = PutObjReader::new(body.into(), content_length as usize);
try_!(self.store.put_object(&bucket, &key, &reader, &ObjectOptions::default()).await);
try_!(self.store.put_object(&bucket, &key, reader, ObjectOptions::default()).await);
// self.store.put_object(bucket, object, data, opts);

View File

@@ -9,12 +9,12 @@ if [ -n "$1" ]; then
fi
if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug"
export RUST_LOG="s3s-rustfs=debug,s3s=debug"
fi
cargo run \
-- --access-key AKEXAMPLERUSTFS \
--secret-key SKEXAMPLERUSTFS \
--address 0.0.0.0:9010 \
--domain-name localhost:9010 \
--domain-name 127.0.0.1:9010 \
"$DATA_DIR"