diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index c2d725c6..b10a7d88 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -6,11 +6,16 @@ use std::{ use anyhow::{Error, Result}; use bytes::Bytes; -use futures::future::join_all; +use futures::{future::join_all, Stream}; use path_absolutize::Absolutize; +use s3s::StdError; use time::OffsetDateTime; -use tokio::fs::{self, File}; -use tokio::io::ErrorKind; +use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt}; +use tokio::io::{AsyncWrite, BufWriter, ErrorKind}; +use tokio::{ + fs::{self, File}, + io::DuplexStream, +}; use uuid::Uuid; use crate::{ @@ -43,10 +48,7 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { } } -pub async fn init_disks( - eps: &Endpoints, - opt: &DiskOption, -) -> (Vec>, Vec>) { +pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { let mut futures = Vec::with_capacity(eps.len()); for ep in eps.iter() { @@ -141,12 +143,7 @@ impl LocalDisk { let multipart = format!("{}/{}", RUSTFS_META_BUCKET, "multipart"); let config = format!("{}/{}", RUSTFS_META_BUCKET, "config"); let tmp = format!("{}/{}", RUSTFS_META_BUCKET, "tmp"); - let defaults = vec![ - buckets.as_str(), - multipart.as_str(), - config.as_str(), - tmp.as_str(), - ]; + let defaults = vec![buckets.as_str(), multipart.as_str(), config.as_str(), tmp.as_str()]; self.make_volumes(defaults).await } @@ -166,12 +163,19 @@ impl LocalDisk { self.resolve_abs_path(dir) } - // pub async fn load_format(&self) -> Result> { - // let p = self.get_object_path(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)?; - // let content = fs::read(&p).await?; - - // unimplemented!() - // } + /// Write to the filesystem atomically. + /// This is done by first writing to a temporary location and then moving the file. + pub(crate) async fn prepare_file_write<'a>(&self, path: &'a PathBuf) -> Result> { + let tmp_path = self.get_object_path(RUSTFS_META_TMP_BUCKET, Uuid::new_v4().to_string().as_str())?; + let file = File::create(&path).await?; + let writer = BufWriter::new(file); + Ok(FileWriter { + tmp_path, + dest_path: path, + writer, + clean_tmp: true, + }) + } } // 过滤 std::io::ErrorKind::NotFound @@ -265,13 +269,7 @@ impl DiskAPI for LocalDisk { Ok(()) } - async fn rename_file( - &self, - src_volume: &str, - src_path: &str, - dst_volume: &str, - dst_path: &str, - ) -> Result<()> { + async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { if !skip_access_checks(&src_volume) { check_volume_exists(&src_volume).await?; } @@ -310,6 +308,18 @@ impl DiskAPI for LocalDisk { Ok(()) } + async fn CreateFile(&self, origvolume: &str, volume: &str, path: &str, fileSize: usize, mut r: DuplexStream) -> Result<()> { + let fpath = self.get_object_path(volume, path)?; + + let mut writer = self.prepare_file_write(&fpath).await?; + + io::copy(&mut r, writer.writer()).await?; + + writer.done().await?; + + Ok(()) + } + async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { for vol in volumes { if let Err(e) = self.make_volume(vol).await { @@ -340,6 +350,24 @@ impl DiskAPI for LocalDisk { } } +// pub async fn copy_bytes(mut stream: S, writer: &mut W) -> Result +// where +// S: Stream> + Unpin, +// W: AsyncWrite + Unpin, +// { +// let mut nwritten: u64 = 0; +// while let Some(result) = stream.next().await { +// let bytes = match result { +// Ok(x) => x, +// Err(e) => return Err(Error::new(e)), +// }; +// writer.write_all(&bytes).await?; +// nwritten += bytes.len() as u64; +// } +// writer.flush().await?; +// Ok(nwritten) +// } + // pub struct RemoteDisk {} // impl RemoteDisk { @@ -462,6 +490,45 @@ impl PartialEq for DiskError { } } +pub(crate) struct FileWriter<'a> { + tmp_path: PathBuf, + dest_path: &'a Path, + writer: BufWriter, + clean_tmp: bool, +} + +impl<'a> FileWriter<'a> { + pub(crate) fn tmp_path(&self) -> &Path { + &self.tmp_path + } + + pub(crate) fn dest_path(&self) -> &'a Path { + self.dest_path + } + + pub(crate) fn writer(&mut self) -> &mut BufWriter { + &mut self.writer + } + + pub(crate) async fn done(mut self) -> Result<()> { + if let Some(final_dir_path) = self.dest_path().parent() { + fs::create_dir_all(&final_dir_path).await?; + } + + fs::rename(&self.tmp_path, self.dest_path()).await?; + self.clean_tmp = false; + Ok(()) + } +} + +impl<'a> Drop for FileWriter<'a> { + fn drop(&mut self) { + if self.clean_tmp { + let _ = std::fs::remove_file(&self.tmp_path); + } + } +} + #[cfg(test)] mod test { @@ -500,9 +567,7 @@ mod test { let disk = LocalDisk::new(&ep, false).await.unwrap(); - let tmpp = disk - .resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)) - .unwrap(); + let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap(); println!("ppp :{:?}", &tmpp); diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index b993b980..ebb6fe35 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use anyhow::Result; use bytes::Bytes; +use tokio::io::DuplexStream; #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { @@ -9,13 +10,8 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn read_all(&self, volume: &str, path: &str) -> Result; async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>; - async fn rename_file( - &self, - src_volume: &str, - src_path: &str, - dst_volume: &str, - dst_path: &str, - ) -> Result<()>; + async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; + async fn CreateFile(&self, origvolume: &str, volume: &str, path: &str, fileSize: usize, r: DuplexStream) -> Result<()>; async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>; async fn make_volume(&self, volume: &str) -> Result<()>; diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 4fc73823..2d9cffbe 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -6,6 +6,7 @@ use reed_solomon_erasure::galois_8::ReedSolomon; use s3s::StdError; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; +use tracing::debug; use crate::chunk_stream::ChunkedStream; @@ -51,6 +52,7 @@ impl Erasure { } } + debug!("encode_data write errs:{:?}", errs); // TODO: reduceWriteQuorumErrs } Err(e) => return Err(anyhow!(e)), diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 8929be8d..2cdd3536 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use anyhow::Result; -use futures::AsyncWrite; +use futures::{AsyncWrite, StreamExt}; use time::OffsetDateTime; +use tracing::debug; use uuid::Uuid; use crate::{ @@ -107,7 +108,7 @@ impl StorageAPI for Sets { unimplemented!() } - async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> { + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()> { let disks = self.get_disks_by_key(object); let mut parity_drives = self.partiy_count; @@ -131,15 +132,31 @@ impl StorageAPI for Sets { let mut writers = Vec::with_capacity(disks.len()); - for disk in disks.iter() { + for disk in shuffle_disks.iter() { let (reader, writer) = tokio::io::duplex(fi.erasure.block_size); + let disk = disk.as_ref().unwrap().clone(); + let bucket = bucket.to_string(); + let object = object.to_string(); + tokio::spawn(async move { + debug!("do createfile"); + match disk + .CreateFile("", bucket.as_str(), object.as_str(), data.content_length, reader) + .await + { + Ok(_) => (), + Err(e) => debug!("creatfile err :{:?}", e), + } + }); + writers.push(writer); } let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks); - erasure.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum); + erasure + .encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum) + .await?; unimplemented!() } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 5e0baa6c..7f90928a 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -126,14 +126,14 @@ impl StorageAPI for ECStore { let reader = PutObjReader::new(StreamingBlob::from(body), content_len); - self.put_object(RUSTFS_META_BUCKET, &file_path, &reader, &ObjectOptions { max_parity: true }) + self.put_object(RUSTFS_META_BUCKET, &file_path, reader, ObjectOptions { max_parity: true }) .await?; // TODO: toObjectErr Ok(()) } - async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> { + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()> { // checkPutObjectArgs let object = utils::path::encode_dir_object(object); diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index bba172b2..8c25e1ee 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -139,5 +139,5 @@ pub struct ObjectOptions { pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; - async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()>; + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()>; } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 0dcc1821..ed34c81e 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -169,7 +169,7 @@ impl S3 for FS { let reader = PutObjReader::new(body.into(), content_length as usize); - try_!(self.store.put_object(&bucket, &key, &reader, &ObjectOptions::default()).await); + try_!(self.store.put_object(&bucket, &key, reader, ObjectOptions::default()).await); // self.store.put_object(bucket, object, data, opts); diff --git a/scripts/run.sh b/scripts/run.sh index d27936ab..16df7c98 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -9,12 +9,12 @@ if [ -n "$1" ]; then fi if [ -z "$RUST_LOG" ]; then - export RUST_LOG="rustfs=debug" + export RUST_LOG="s3s-rustfs=debug,s3s=debug" fi cargo run \ -- --access-key AKEXAMPLERUSTFS \ --secret-key SKEXAMPLERUSTFS \ --address 0.0.0.0:9010 \ - --domain-name localhost:9010 \ + --domain-name 127.0.0.1:9010 \ "$DATA_DIR"