Merge pull request #120 from rustfs/fix/content_type

add content-type fix list_objects
This commit is contained in:
weisd
2024-11-09 02:58:19 +08:00
committed by GitHub
8 changed files with 254 additions and 47 deletions

View File

@@ -14,7 +14,7 @@ use crate::disk::error::{
convert_access_error, is_sys_err_handle_invalid, is_sys_err_invalid_arg, is_sys_err_is_dir, is_sys_err_not_dir,
map_err_not_exists, os_err_to_file_err,
};
use crate::disk::os::check_path_length;
use crate::disk::os::{check_path_length, is_empty_dir};
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::error::{Error, Result};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
@@ -530,7 +530,7 @@ impl LocalDisk {
for fi in fis {
let data_dir = fm.delete_version(fi)?;
warn!("删除版本号 对应data_dir {:?}", &data_dir);
if data_dir.is_some() {
let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap()).as_str())?;
self.move_to_trash(&dir_path, true, false).await?;
@@ -539,8 +539,6 @@ impl LocalDisk {
// 没有版本了删除xl.meta
if fm.versions.is_empty() {
warn!("没有版本了删除xl.meta");
self.delete_file(&volume_dir, &xlpath, true, false).await?;
return Ok(());
}
@@ -1219,6 +1217,7 @@ impl DiskAPI for LocalDisk {
Ok(FileReader::Local(LocalFileReader::new(f)))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result<Vec<String>> {
if !origvolume.is_empty() {
let origvolume_dir = self.get_bucket_path(origvolume)?;
@@ -1249,6 +1248,26 @@ impl DiskAPI for LocalDisk {
// TODO: io.writer
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
// warn!("walk_dir opts {:?}", &opts);
let mut metas = Vec::new();
if opts.base_dir.ends_with(SLASH_SEPARATOR) {
let fpath = self.get_object_path(
&opts.bucket,
format!("{}/{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR), STORAGE_FORMAT_FILE).as_str(),
)?;
if let Ok(data) = self.read_metadata(fpath).await {
let meta = MetaCacheEntry {
name: opts.base_dir.clone(),
metadata: data,
..Default::default()
};
metas.push(meta);
return Ok(metas);
}
}
let mut entries = match self.list_dir("", &opts.bucket, &opts.base_dir, -1).await {
Ok(res) => res,
Err(e) => {
@@ -1274,12 +1293,12 @@ impl DiskAPI for LocalDisk {
let bucket = opts.bucket.as_str();
let mut metas = Vec::new();
let mut dir_objes = HashSet::new();
// 第一层过滤
for entry in entries.iter() {
// warn!("walk_dir get entry {:?}", &entry);
// check limit
if opts.limit > 0 && objs_returned >= opts.limit {
return Ok(metas);
@@ -1289,17 +1308,16 @@ impl DiskAPI for LocalDisk {
continue;
}
// warn!("walk_dir entry {}", entry);
let mut meta = MetaCacheEntry { ..Default::default() };
let fpath = self.get_object_path(bucket, format!("{}/{}", &entry, STORAGE_FORMAT_FILE).as_str())?;
let mut name = {
if opts.base_dir.is_empty() {
entry.clone()
} else {
format!("{}{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR), SLASH_SEPARATOR, entry)
}
};
if let Ok(data) = self.read_metadata(&fpath).await {
meta.metadata = data;
}
let mut name = entry.clone();
if name.ends_with(SLASH_SEPARATOR) {
if name.ends_with(GLOBAL_DIR_SUFFIX_WITH_SLASH) {
name = format!("{}{}", name.as_str().trim_end_matches(GLOBAL_DIR_SUFFIX_WITH_SLASH), SLASH_SEPARATOR);
@@ -1310,6 +1328,18 @@ impl DiskAPI for LocalDisk {
}
meta.name = name;
let fpath = self.get_object_path(bucket, format!("{}/{}", &meta.name, STORAGE_FORMAT_FILE).as_str())?;
if let Ok(data) = self.read_metadata(&fpath).await {
meta.metadata = data;
} else {
let fpath = self.get_object_path(bucket, &meta.name)?;
if !is_empty_dir(fpath).await {
meta.name = format!("{}{}", &meta.name, SLASH_SEPARATOR);
}
}
metas.push(meta);
}
@@ -1661,8 +1691,6 @@ impl DiskAPI for LocalDisk {
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
let p = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
warn!("write_metadata {:?} {:?}", &p, &fi);
let mut meta = FileMeta::new();
if !fi.fresh {
let (buf, _) = read_file_exists(&p).await?;

View File

@@ -73,6 +73,10 @@ pub async fn make_dir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Path>) ->
Ok(())
}
pub async fn is_empty_dir(path: impl AsRef<Path>) -> bool {
read_dir(path.as_ref(), 1).await.is_ok_and(|v| v.is_empty())
}
// read_dir count read limit. when count == 0 unlimit.
pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> Result<Vec<String>> {
let mut entries = fs::read_dir(path.as_ref()).await?;

View File

@@ -1,12 +1,12 @@
use http::{HeaderMap, HeaderValue};
use uuid::Uuid;
use crate::bucket::versioning_sys::BucketVersioningSys;
use crate::error::{Error, Result};
use crate::store_api::ObjectOptions;
use crate::store_err::StorageError;
use crate::utils::path::is_dir_object;
use http::{HeaderMap, HeaderValue};
use lazy_static::lazy_static;
use std::collections::HashMap;
use uuid::Uuid;
pub async fn put_opts(
bucket: &str,
@@ -58,14 +58,64 @@ pub fn put_opts_from_headers(
headers: &HeaderMap<HeaderValue>,
metadata: Option<HashMap<String, String>>,
) -> Result<ObjectOptions> {
// TODO custom headers
let metadata = metadata.unwrap_or_default();
get_default_opts(headers, metadata, false)
}
fn get_default_opts(
_headers: &HeaderMap<HeaderValue>,
_metadata: Option<HashMap<String, String>>,
headers: &HeaderMap<HeaderValue>,
metadata: HashMap<String, String>,
_copy_source: bool,
) -> Result<ObjectOptions> {
Ok(ObjectOptions::default())
Ok(ObjectOptions {
user_defined: metadata.clone(),
..Default::default()
})
}
pub fn extract_metadata(headers: &HeaderMap<HeaderValue>) -> HashMap<String, String> {
let mut metadata = HashMap::new();
extract_metadata_from_mime(&headers, &mut metadata);
metadata
}
fn extract_metadata_from_mime(headers: &HeaderMap<HeaderValue>, metadata: &mut HashMap<String, String>) {
for (k, v) in headers.iter() {
if k.as_str().starts_with("x-amz-meta-") {
metadata.insert(k.to_string(), String::from_utf8_lossy(v.as_bytes()).to_string());
continue;
}
if k.as_str().starts_with("x-rustfs-meta-") {
metadata.insert(k.to_string(), String::from_utf8_lossy(v.as_bytes()).to_string());
continue;
}
for hd in SUPPORTED_HEADERS.iter() {
if k.as_str() == *hd {
metadata.insert(k.to_string(), String::from_utf8_lossy(v.as_bytes()).to_string());
continue;
}
}
}
if !metadata.contains_key("content-type") {
metadata.insert("content-type".to_owned(), "binary/octet-stream".to_owned());
}
}
lazy_static! {
static ref SUPPORTED_HEADERS: Vec<&'static str> = vec![
"content-type",
"cache-control",
"content-language",
"content-encoding",
"content-disposition",
"x-amz-storage-class",
"x-amz-tagging",
"expires",
"x-amz-replication-status"
];
}

View File

@@ -19,7 +19,7 @@ use crate::store_err::{
};
use crate::store_init::ec_drives_no_config;
use crate::utils::crypto::base64_decode;
use crate::utils::path::{decode_dir_object, encode_dir_object, SLASH_SEPARATOR};
use crate::utils::path::{base_dir_from_prefix, decode_dir_object, encode_dir_object, SLASH_SEPARATOR};
use crate::{
bucket::metadata::BucketMetadata,
disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
@@ -53,7 +53,7 @@ use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::Semaphore;
use tracing::{debug, info};
use tracing::{debug, info, warn};
use uuid::Uuid;
const MAX_UPLOADS_LIST: usize = 10000;
@@ -212,8 +212,18 @@ impl ECStore {
self.pools.len() == 1
}
async fn list_path(&self, opts: &ListPathOptions) -> Result<ListObjectsInfo> {
let objects = self.list_merged(opts).await?;
async fn list_path(&self, opts: &ListPathOptions, delimiter: &str) -> Result<ListObjectsInfo> {
// if opts.prefix.ends_with(SLASH_SEPARATOR) {
// return Err(Error::msg("eof"));
// }
let mut opts = opts.clone();
if opts.base_dir.is_empty() {
opts.base_dir = base_dir_from_prefix(&opts.prefix);
}
let objects = self.list_merged(&opts, delimiter).await?;
let info = ListObjectsInfo {
objects,
@@ -223,9 +233,10 @@ impl ECStore {
}
// 读所有
async fn list_merged(&self, opts: &ListPathOptions) -> Result<Vec<ObjectInfo>> {
let opts = WalkDirOptions {
async fn list_merged(&self, opts: &ListPathOptions, delimiter: &str) -> Result<Vec<ObjectInfo>> {
let walk_opts = WalkDirOptions {
bucket: opts.bucket.clone(),
base_dir: opts.base_dir.clone(),
..Default::default()
};
@@ -235,7 +246,7 @@ impl ECStore {
for sets in self.pools.iter() {
for set in sets.disk_set.iter() {
futures.push(set.walk_dir(&opts));
futures.push(set.walk_dir(&walk_opts));
}
}
@@ -254,14 +265,25 @@ impl ECStore {
let entrys = disks_res.as_ref().unwrap();
for entry in entrys {
// warn!("lst_merged entry---- {}", &entry.name);
if !opts.prefix.is_empty() && !entry.name.starts_with(&opts.prefix) {
continue;
}
if !uniq.contains(&entry.name) {
uniq.insert(entry.name.clone());
// TODO: 过滤
if opts.limit > 0 && ress.len() as i32 >= opts.limit {
return Ok(ress);
}
if entry.is_object() {
if !delimiter.is_empty() {
// entry.name.trim_start_matches(pat)
}
let fi = entry.to_fileinfo(&opts.bucket)?;
if let Some(f) = fi {
ress.push(f.to_object_info(&opts.bucket, &entry.name, false));
@@ -699,7 +721,7 @@ pub struct PoolObjInfo {
pub err: Option<Error>,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct ListPathOptions {
pub id: String,
@@ -1039,9 +1061,9 @@ impl StorageAPI for ECStore {
async fn list_objects_v2(
&self,
bucket: &str,
_prefix: &str,
prefix: &str,
continuation_token: &str,
_delimiter: &str,
delimiter: &str,
max_keys: i32,
_fetch_owner: bool,
_start_after: &str,
@@ -1049,10 +1071,11 @@ impl StorageAPI for ECStore {
let opts = ListPathOptions {
bucket: bucket.to_string(),
limit: max_keys,
prefix: prefix.to_owned(),
..Default::default()
};
let info = self.list_path(&opts).await?;
let info = self.list_path(&opts, delimiter).await?;
// warn!("list_objects_v2 info {:?}", info);

View File

@@ -537,6 +537,8 @@ pub struct ObjectOptions {
pub data_movement: bool,
pub src_pool_idx: usize,
pub user_defined: HashMap<String, String>,
pub preserve_etag: Option<String>,
}
// impl Default for ObjectOptions {

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
const GLOBAL_DIR_SUFFIX: &str = "__XLDIR__";
@@ -69,6 +69,20 @@ pub fn path_join(elem: &[PathBuf]) -> PathBuf {
joined_path
}
pub fn base_dir_from_prefix(prefix: &str) -> String {
let mut base_dir = dir(prefix).to_owned();
if base_dir == "." || base_dir == "./" || base_dir == "/" {
base_dir = "".to_owned();
}
if !prefix.contains('/') {
base_dir = "".to_owned();
}
if !base_dir.is_empty() && !base_dir.ends_with(SLASH_SEPARATOR) {
base_dir.push_str(SLASH_SEPARATOR);
}
base_dir
}
pub struct LazyBuf {
s: String,
buf: Option<Vec<u8>>,
@@ -184,10 +198,33 @@ pub fn clean(path: &str) -> String {
out.string()
}
pub fn split(path: &str) -> (&str, &str) {
// Find the last occurrence of the '/' character
if let Some(i) = path.rfind('/') {
// Return the directory (up to and including the last '/') and the file name
return (&path[..i + 1], &path[i + 1..]);
}
// If no '/' is found, return an empty string for the directory and the whole path as the file name
(path, "")
}
pub fn dir(path: &str) -> &str {
let (a, _) = split(path);
a
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_base_dir_from_prefix() {
let a = "da/";
println!("---- in {}", a);
let a = base_dir_from_prefix(a);
println!("---- out {}", a);
}
#[test]
fn test_clean() {
assert_eq!(clean(""), ".");

View File

@@ -1 +1,2 @@
pub(crate) const AMZ_OBJECT_TAGGING: &str = "X-Amz-Tagging";
pub const AMZ_OBJECT_TAGGING: &str = "X-Amz-Tagging";
pub const AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";

View File

@@ -16,8 +16,11 @@ use ecstore::bucket::policy_sys::PolicySys;
use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::disk::error::is_err_file_not_found;
use ecstore::disk::error::DiskError;
use ecstore::error::Error as EcError;
use ecstore::new_object_layer_fn;
use ecstore::options::extract_metadata;
use ecstore::options::put_opts;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
@@ -30,6 +33,7 @@ use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
use ecstore::xhttp;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::HeaderMap;
@@ -45,6 +49,7 @@ use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use tracing::debug;
use tracing::error;
use tracing::info;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
@@ -66,6 +71,15 @@ lazy_static! {
id: Some("c19050dbcee97fda828689dda99097a6321af2248fa760517237346e5d9c8a66".to_owned()),
};
}
fn to_s3_error(err: EcError) -> S3Error {
if is_err_file_not_found(&err) {
return S3Error::with_message(S3ErrorCode::NoSuchKey, format!(" ec err {}", err));
}
S3Error::with_message(S3ErrorCode::InternalError, format!(" ec err {}", err))
}
#[derive(Debug, Clone)]
pub struct FS {
// pub store: ECStore,
@@ -325,14 +339,28 @@ impl S3 for FS {
let info = reader.object_info;
let content_type = try_!(ContentType::from_str("application/x-msdownload"));
let content_type = {
if let Some(content_type) = info.content_type {
let ct = match ContentType::from_str(&content_type) {
Ok(res) => Some(res),
Err(err) => {
error!("parse content-type err {} {:?}", &content_type, err);
//
None
}
};
ct
} else {
None
}
};
let last_modified = info.mod_time.map(Timestamp::from);
let output = GetObjectOutput {
body: Some(reader.stream),
content_length: Some(info.size as i64),
last_modified,
content_type: Some(content_type),
content_type,
..Default::default()
};
@@ -375,15 +403,32 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
let info = try_!(store.get_object_info(&bucket, &key, &ObjectOptions::default()).await);
let info = store
.get_object_info(&bucket, &key, &ObjectOptions::default())
.await
.map_err(to_s3_error)?;
debug!("info {:?}", info);
let content_type = try_!(ContentType::from_str("application/x-msdownload"));
let content_type = {
if let Some(content_type) = info.content_type {
let ct = match ContentType::from_str(&content_type) {
Ok(res) => Some(res),
Err(err) => {
error!("parse content-type err {} {:?}", &content_type, err);
//
None
}
};
ct
} else {
None
}
};
let last_modified = info.mod_time.map(Timestamp::from);
let output = HeadObjectOutput {
content_length: Some(try_!(i64::try_from(info.size))),
content_type: Some(content_type),
content_type,
last_modified,
// metadata: object_metadata,
..Default::default()
@@ -437,6 +482,8 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
// warn!("list_objects_v2 input {:?}", &req.input);
let ListObjectsV2Input {
bucket,
continuation_token,
@@ -534,8 +581,8 @@ impl S3 for FS {
body,
bucket,
key,
metadata,
content_length,
tagging,
..
} = input;
@@ -552,7 +599,12 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
let opts: ObjectOptions = try_!(put_opts(&bucket, &key, None, &req.headers, metadata).await);
let mut metadata = extract_metadata(&req.headers);
if let Some(tags) = tagging {
metadata.insert(xhttp::AMZ_OBJECT_TAGGING.to_owned(), tags);
}
let opts: ObjectOptions = try_!(put_opts(&bucket, &key, None, &req.headers, Some(metadata)).await);
let obj_info = try_!(store.put_object(&bucket, &key, &mut reader, &opts).await);
@@ -573,12 +625,12 @@ impl S3 for FS {
req: S3Request<CreateMultipartUploadInput>,
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
let CreateMultipartUploadInput {
bucket, key, metadata, ..
bucket, key, tagging, ..
} = req.input;
// mc cp step 3
debug!("create_multipart_upload meta {:?}", &metadata);
// debug!("create_multipart_upload meta {:?}", &metadata);
let layer = new_object_layer_fn();
let lock = layer.read().await;
@@ -587,8 +639,15 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
let MultipartUploadResult { upload_id, .. } =
try_!(store.new_multipart_upload(&bucket, &key, &ObjectOptions::default()).await);
let mut metadata = extract_metadata(&req.headers);
if let Some(tags) = tagging {
metadata.insert(xhttp::AMZ_OBJECT_TAGGING.to_owned(), tags);
}
let opts: ObjectOptions = try_!(put_opts(&bucket, &key, None, &req.headers, Some(metadata)).await);
let MultipartUploadResult { upload_id, .. } = try_!(store.new_multipart_upload(&bucket, &key, &opts).await);
let output = CreateMultipartUploadOutput {
bucket: Some(bucket),
@@ -609,6 +668,7 @@ impl S3 for FS {
upload_id,
part_number,
content_length,
// content_md5,
..
} = req.input;
@@ -630,6 +690,8 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
// TODO: hash_reader
let info = try_!(
store
.put_object_part(&bucket, &key, &upload_id, part_id, &mut data, &opts)