mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
3 Commits
copilot/ad
...
feat/metac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f63f54850 | ||
|
|
efe37bcc12 | ||
|
|
e823922654 |
478
crates/ecstore/src/cache_value/metacache_manager.rs
Normal file
478
crates/ecstore/src/cache_value/metacache_manager.rs
Normal file
@@ -0,0 +1,478 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::disk::RUSTFS_META_BUCKET;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::store::ECStore;
|
||||
use crate::store_api::{ObjectIO, ObjectOptions};
|
||||
use crate::store_list_objects::ListPathOptions;
|
||||
use rustfs_filemeta::{MetaCacheEntriesSorted, MetaCacheEntry, MetacacheReader, MetacacheWriter};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Scan status for metacache entries
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ScanStatus {
|
||||
None = 0,
|
||||
Started = 1,
|
||||
Success = 2,
|
||||
Error = 3,
|
||||
}
|
||||
|
||||
impl Default for ScanStatus {
|
||||
fn default() -> Self {
|
||||
Self::None
|
||||
}
|
||||
}
|
||||
|
||||
/// Metacache entry representing a list operation
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Metacache {
|
||||
pub id: String,
|
||||
pub bucket: String,
|
||||
pub root: String,
|
||||
pub filter: Option<String>,
|
||||
pub status: ScanStatus,
|
||||
pub started: SystemTime,
|
||||
pub ended: Option<SystemTime>,
|
||||
pub last_handout: SystemTime,
|
||||
pub last_update: SystemTime,
|
||||
pub error: Option<String>,
|
||||
pub file_not_found: bool,
|
||||
pub recursive: bool,
|
||||
pub data_version: u8,
|
||||
}
|
||||
|
||||
impl Metacache {
|
||||
pub fn new(opts: &ListPathOptions) -> Self {
|
||||
let now = SystemTime::now();
|
||||
Self {
|
||||
id: opts.id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()),
|
||||
bucket: opts.bucket.clone(),
|
||||
root: opts.base_dir.clone(),
|
||||
filter: opts.filter_prefix.clone(),
|
||||
status: ScanStatus::Started,
|
||||
started: now,
|
||||
ended: None,
|
||||
last_handout: now,
|
||||
last_update: now,
|
||||
error: None,
|
||||
file_not_found: false,
|
||||
recursive: opts.recursive,
|
||||
data_version: 2,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finished(&self) -> bool {
|
||||
self.ended.is_some()
|
||||
}
|
||||
|
||||
/// Check if the cache is worth keeping
|
||||
pub fn worth_keeping(&self) -> bool {
|
||||
const MAX_RUNNING_AGE: Duration = Duration::from_secs(3600); // 1 hour
|
||||
const MAX_CLIENT_WAIT: Duration = Duration::from_secs(180); // 3 minutes
|
||||
const MAX_FINISHED_WAIT: Duration = Duration::from_secs(900); // 15 minutes
|
||||
const MAX_ERROR_WAIT: Duration = Duration::from_secs(300); // 5 minutes
|
||||
|
||||
let now = SystemTime::now();
|
||||
|
||||
match self.status {
|
||||
ScanStatus::Started => {
|
||||
// Not finished and update for MAX_RUNNING_AGE, discard it
|
||||
if let Ok(elapsed) = now.duration_since(self.last_update) {
|
||||
elapsed < MAX_RUNNING_AGE
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
ScanStatus::Success => {
|
||||
// Keep for MAX_FINISHED_WAIT after we last saw the client
|
||||
if let Ok(elapsed) = now.duration_since(self.last_handout) {
|
||||
elapsed < MAX_FINISHED_WAIT
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
ScanStatus::Error | ScanStatus::None => {
|
||||
// Remove failed listings after MAX_ERROR_WAIT
|
||||
if let Ok(elapsed) = now.duration_since(self.last_update) {
|
||||
elapsed < MAX_ERROR_WAIT
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update cache with new status
|
||||
pub fn update(&mut self, update: &Metacache) {
|
||||
let now = SystemTime::now();
|
||||
self.last_update = now;
|
||||
|
||||
if update.last_handout > self.last_handout {
|
||||
self.last_handout = update.last_update;
|
||||
if self.last_handout > now {
|
||||
self.last_handout = now;
|
||||
}
|
||||
}
|
||||
|
||||
if self.status == ScanStatus::Started && update.status == ScanStatus::Success {
|
||||
self.ended = Some(now);
|
||||
}
|
||||
|
||||
if self.status == ScanStatus::Started && update.status != ScanStatus::Started {
|
||||
self.status = update.status;
|
||||
}
|
||||
|
||||
if self.status == ScanStatus::Started {
|
||||
if let Ok(elapsed) = now.duration_since(self.last_handout) {
|
||||
if elapsed > Duration::from_secs(180) {
|
||||
// Drop if client hasn't been seen for 3 minutes
|
||||
self.status = ScanStatus::Error;
|
||||
self.error = Some("client not seen".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.error.is_none() && update.error.is_some() {
|
||||
self.error = update.error.clone();
|
||||
self.status = ScanStatus::Error;
|
||||
self.ended = Some(now);
|
||||
}
|
||||
|
||||
self.file_not_found = self.file_not_found || update.file_not_found;
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket-level metacache manager
|
||||
#[derive(Debug)]
|
||||
pub struct BucketMetacache {
|
||||
bucket: String,
|
||||
caches: HashMap<String, Metacache>,
|
||||
caches_root: HashMap<String, Vec<String>>,
|
||||
updated: bool,
|
||||
}
|
||||
|
||||
impl BucketMetacache {
|
||||
pub fn new(bucket: String) -> Self {
|
||||
Self {
|
||||
bucket,
|
||||
caches: HashMap::new(),
|
||||
caches_root: HashMap::new(),
|
||||
updated: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find or create a cache entry
|
||||
pub fn find_cache(&mut self, opts: &ListPathOptions) -> Metacache {
|
||||
// Check if exists already
|
||||
if let Some(mut cache) = self.caches.get(&opts.id.clone().unwrap_or_default()).cloned() {
|
||||
cache.last_handout = SystemTime::now();
|
||||
self.caches.insert(cache.id.clone(), cache.clone());
|
||||
debug!("returning existing cache {}", cache.id);
|
||||
return cache;
|
||||
}
|
||||
|
||||
if !opts.create {
|
||||
return Metacache {
|
||||
id: opts.id.clone().unwrap_or_default(),
|
||||
bucket: opts.bucket.clone(),
|
||||
root: opts.base_dir.clone(),
|
||||
filter: opts.filter_prefix.clone(),
|
||||
status: ScanStatus::None,
|
||||
started: SystemTime::now(),
|
||||
ended: None,
|
||||
last_handout: SystemTime::now(),
|
||||
last_update: SystemTime::now(),
|
||||
error: None,
|
||||
file_not_found: false,
|
||||
recursive: opts.recursive,
|
||||
data_version: 2,
|
||||
};
|
||||
}
|
||||
|
||||
// Create new cache
|
||||
let cache = Metacache::new(opts);
|
||||
let root = cache.root.clone();
|
||||
let id = cache.id.clone();
|
||||
self.caches.insert(id.clone(), cache.clone());
|
||||
self.caches_root.entry(root).or_default().push(id);
|
||||
self.updated = true;
|
||||
debug!("returning new cache {}, bucket: {}", cache.id, cache.bucket);
|
||||
cache
|
||||
}
|
||||
|
||||
/// Update cache entry
|
||||
pub fn update_cache_entry(&mut self, update: Metacache) -> Result<Metacache> {
|
||||
if let Some(cache) = self.caches.get_mut(&update.id) {
|
||||
cache.update(&update);
|
||||
self.updated = true;
|
||||
Ok(cache.clone())
|
||||
} else {
|
||||
// Create new entry
|
||||
let root = update.root.clone();
|
||||
let id = update.id.clone();
|
||||
self.caches.insert(id.clone(), update.clone());
|
||||
self.caches_root.entry(root).or_default().push(id);
|
||||
self.updated = true;
|
||||
Ok(update)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get cache by ID
|
||||
pub fn get_cache(&self, id: &str) -> Option<&Metacache> {
|
||||
self.caches.get(id)
|
||||
}
|
||||
|
||||
/// Cleanup outdated entries
|
||||
pub fn cleanup(&mut self) {
|
||||
let mut to_remove = Vec::new();
|
||||
for (id, cache) in &self.caches {
|
||||
if !cache.worth_keeping() {
|
||||
to_remove.push(id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for id in to_remove {
|
||||
if let Some(cache) = self.caches.remove(&id) {
|
||||
// Remove from root index
|
||||
if let Some(ids) = self.caches_root.get_mut(&cache.root) {
|
||||
ids.retain(|x| x != &id);
|
||||
if ids.is_empty() {
|
||||
self.caches_root.remove(&cache.root);
|
||||
}
|
||||
}
|
||||
debug!("removed outdated cache {}", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Global metacache manager
|
||||
pub struct MetacacheManager {
|
||||
buckets: HashMap<String, Arc<RwLock<BucketMetacache>>>,
|
||||
trash: HashMap<String, Metacache>,
|
||||
}
|
||||
|
||||
impl Default for MetacacheManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl MetacacheManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
buckets: HashMap::new(),
|
||||
trash: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create bucket metacache
|
||||
pub fn get_bucket(&mut self, bucket: &str) -> Arc<RwLock<BucketMetacache>> {
|
||||
if let Some(bm) = self.buckets.get(bucket) {
|
||||
return bm.clone();
|
||||
}
|
||||
|
||||
let bm = Arc::new(RwLock::new(BucketMetacache::new(bucket.to_string())));
|
||||
self.buckets.insert(bucket.to_string(), bm.clone());
|
||||
bm
|
||||
}
|
||||
|
||||
/// Find cache for given options
|
||||
pub async fn find_cache(&self, opts: &ListPathOptions) -> Metacache {
|
||||
if let Some(bm) = self.buckets.get(&opts.bucket) {
|
||||
let mut bm = bm.write().await;
|
||||
bm.find_cache(opts)
|
||||
} else {
|
||||
// Return empty cache if bucket not found
|
||||
Metacache {
|
||||
id: opts.id.clone().unwrap_or_default(),
|
||||
bucket: opts.bucket.clone(),
|
||||
root: opts.base_dir.clone(),
|
||||
filter: opts.filter_prefix.clone(),
|
||||
status: ScanStatus::None,
|
||||
started: SystemTime::now(),
|
||||
ended: None,
|
||||
last_handout: SystemTime::now(),
|
||||
last_update: SystemTime::now(),
|
||||
error: None,
|
||||
file_not_found: false,
|
||||
recursive: opts.recursive,
|
||||
data_version: 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update cache entry
|
||||
pub async fn update_cache_entry(&mut self, update: Metacache) -> Result<Metacache> {
|
||||
// Check trash first
|
||||
if let Some(mut meta) = self.trash.get(&update.id).cloned() {
|
||||
meta.update(&update);
|
||||
return Ok(meta);
|
||||
}
|
||||
|
||||
// Get or create bucket metacache
|
||||
let bm = self.get_bucket(&update.bucket);
|
||||
let mut bm = bm.write().await;
|
||||
bm.update_cache_entry(update)
|
||||
}
|
||||
|
||||
/// Cleanup outdated entries
|
||||
pub async fn cleanup(&mut self) {
|
||||
const MAX_RUNNING_AGE: Duration = Duration::from_secs(3600);
|
||||
|
||||
// Cleanup buckets
|
||||
for bm in self.buckets.values() {
|
||||
let mut bm = bm.write().await;
|
||||
bm.cleanup();
|
||||
}
|
||||
|
||||
// Cleanup trash
|
||||
let mut to_remove = Vec::new();
|
||||
for (id, cache) in &self.trash {
|
||||
if let Ok(elapsed) = SystemTime::now().duration_since(cache.last_update) {
|
||||
if elapsed > MAX_RUNNING_AGE {
|
||||
to_remove.push(id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for id in to_remove {
|
||||
self.trash.remove(&id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get cache path for storage
|
||||
fn get_cache_path(bucket: &str, id: &str) -> String {
|
||||
format!("buckets/{}/.metacache/{}", bucket, id)
|
||||
}
|
||||
|
||||
/// Save cache entries to storage
|
||||
pub async fn save_cache_entries(&self, store: Arc<ECStore>, cache: &Metacache, entries: &[MetaCacheEntry]) -> Result<()> {
|
||||
let path = Self::get_cache_path(&cache.bucket, &cache.id);
|
||||
|
||||
// Create a writer that writes to store
|
||||
let mut writer = Vec::new();
|
||||
let mut cache_writer = MetacacheWriter::new(&mut writer);
|
||||
|
||||
for entry in entries {
|
||||
cache_writer.write_obj(entry).await?;
|
||||
}
|
||||
cache_writer.close().await?;
|
||||
|
||||
// Write to store
|
||||
use crate::store_api::PutObjReader;
|
||||
let mut reader = PutObjReader::from_vec(writer);
|
||||
store
|
||||
.put_object(
|
||||
RUSTFS_META_BUCKET,
|
||||
&path,
|
||||
&mut reader,
|
||||
&ObjectOptions {
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!("saved cache entries for {}: {} entries", cache.id, entries.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stream cache entries from storage
|
||||
pub async fn stream_cache_entries(
|
||||
&self,
|
||||
store: Arc<ECStore>,
|
||||
cache: &Metacache,
|
||||
marker: Option<String>,
|
||||
limit: usize,
|
||||
) -> Result<MetaCacheEntriesSorted> {
|
||||
let path = Self::get_cache_path(&cache.bucket, &cache.id);
|
||||
|
||||
// Read from store
|
||||
use crate::store_api::ObjectIO;
|
||||
use http::HeaderMap;
|
||||
let mut reader = store
|
||||
.get_object_reader(
|
||||
RUSTFS_META_BUCKET,
|
||||
&path,
|
||||
None,
|
||||
HeaderMap::new(),
|
||||
&ObjectOptions {
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut cache_reader = MetacacheReader::new(&mut reader);
|
||||
let mut entries = Vec::new();
|
||||
let mut last_skipped: Option<String> = None;
|
||||
|
||||
while entries.len() < limit {
|
||||
match cache_reader.peek().await {
|
||||
Ok(Some(entry)) => {
|
||||
// Skip entries before marker (not equal)
|
||||
// Marker is the last object from previous page, so we should start from the next object (> marker)
|
||||
// This matches the behavior of gather_results which uses < marker
|
||||
if let Some(ref m) = marker {
|
||||
if entry.name <= *m {
|
||||
last_skipped = Some(entry.name.clone());
|
||||
// peek() already consumed the entry, so we just continue
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Add the entry (peek already read it)
|
||||
entries.push(Some(entry));
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
warn!("error reading cache entry: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(MetaCacheEntriesSorted {
|
||||
o: rustfs_filemeta::MetaCacheEntries(entries),
|
||||
list_id: Some(cache.id.clone()),
|
||||
reuse: true,
|
||||
last_skipped_entry: last_skipped,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Global metacache manager instance
|
||||
static GLOBAL_METACACHE_MANAGER: OnceLock<Arc<RwLock<MetacacheManager>>> = OnceLock::new();
|
||||
|
||||
/// Initialize global metacache manager
|
||||
pub fn init_metacache_manager() -> Arc<RwLock<MetacacheManager>> {
|
||||
GLOBAL_METACACHE_MANAGER
|
||||
.get_or_init(|| Arc::new(RwLock::new(MetacacheManager::new())))
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Get global metacache manager
|
||||
pub fn get_metacache_manager() -> Result<Arc<RwLock<MetacacheManager>>> {
|
||||
GLOBAL_METACACHE_MANAGER
|
||||
.get()
|
||||
.cloned()
|
||||
.ok_or_else(|| Error::other("metacache manager not initialized"))
|
||||
}
|
||||
@@ -18,6 +18,7 @@ use lazy_static::lazy_static;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub mod metacache_set;
|
||||
pub mod metacache_manager;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref LIST_PATH_RAW_CANCEL_TOKEN: Arc<CancellationToken> = Arc::new(CancellationToken::new());
|
||||
|
||||
@@ -271,6 +271,9 @@ impl ECStore {
|
||||
|
||||
set_object_layer(ec.clone()).await;
|
||||
|
||||
// Initialize metacache manager
|
||||
crate::cache_value::metacache_manager::init_metacache_manager();
|
||||
|
||||
Ok(ec)
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -41,6 +41,143 @@ impl ApiError {
|
||||
source: Some(error.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn error_code_to_message(code: &S3ErrorCode) -> String {
|
||||
match code {
|
||||
S3ErrorCode::InvalidRequest => "Invalid Request".to_string(),
|
||||
S3ErrorCode::InvalidArgument => "Invalid argument".to_string(),
|
||||
S3ErrorCode::InvalidStorageClass => "Invalid storage class.".to_string(),
|
||||
S3ErrorCode::AccessDenied => "Access Denied.".to_string(),
|
||||
S3ErrorCode::BadDigest => "The Content-Md5 you specified did not match what we received.".to_string(),
|
||||
S3ErrorCode::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size.".to_string(),
|
||||
S3ErrorCode::EntityTooLarge => "Your proposed upload exceeds the maximum allowed object size.".to_string(),
|
||||
S3ErrorCode::InternalError => "We encountered an internal error, please try again.".to_string(),
|
||||
S3ErrorCode::InvalidAccessKeyId => "The Access Key Id you provided does not exist in our records.".to_string(),
|
||||
S3ErrorCode::InvalidBucketName => "The specified bucket is not valid.".to_string(),
|
||||
S3ErrorCode::InvalidDigest => "The Content-Md5 you specified is not valid.".to_string(),
|
||||
S3ErrorCode::InvalidRange => "The requested range is not satisfiable".to_string(),
|
||||
S3ErrorCode::MalformedXML => "The XML you provided was not well-formed or did not validate against our published schema.".to_string(),
|
||||
S3ErrorCode::MissingContentLength => "You must provide the Content-Length HTTP header.".to_string(),
|
||||
S3ErrorCode::MissingSecurityHeader => "Your request was missing a required header".to_string(),
|
||||
S3ErrorCode::MissingRequestBodyError => "Request body is empty.".to_string(),
|
||||
S3ErrorCode::NoSuchBucket => "The specified bucket does not exist".to_string(),
|
||||
S3ErrorCode::NoSuchBucketPolicy => "The bucket policy does not exist".to_string(),
|
||||
S3ErrorCode::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist".to_string(),
|
||||
S3ErrorCode::NoSuchKey => "The specified key does not exist.".to_string(),
|
||||
S3ErrorCode::NoSuchUpload => "The specified multipart upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.".to_string(),
|
||||
S3ErrorCode::NoSuchVersion => "The specified version does not exist.".to_string(),
|
||||
S3ErrorCode::NotImplemented => "A header you provided implies functionality that is not implemented".to_string(),
|
||||
S3ErrorCode::PreconditionFailed => "At least one of the pre-conditions you specified did not hold".to_string(),
|
||||
S3ErrorCode::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided. Check your key and signing method.".to_string(),
|
||||
S3ErrorCode::MethodNotAllowed => "The specified method is not allowed against this resource.".to_string(),
|
||||
S3ErrorCode::InvalidPart => "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.".to_string(),
|
||||
S3ErrorCode::InvalidPartOrder => "The list of parts was not in ascending order. The parts list must be specified in order by part number.".to_string(),
|
||||
S3ErrorCode::InvalidObjectState => "The operation is not valid for the current state of the object.".to_string(),
|
||||
S3ErrorCode::AuthorizationHeaderMalformed => "The authorization header is malformed; the region is wrong; expecting 'us-east-1'.".to_string(),
|
||||
S3ErrorCode::MalformedPOSTRequest => "The body of your POST request is not well-formed multipart/form-data.".to_string(),
|
||||
S3ErrorCode::BucketNotEmpty => "The bucket you tried to delete is not empty".to_string(),
|
||||
S3ErrorCode::BucketAlreadyExists => "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Please select a different name and try again.".to_string(),
|
||||
S3ErrorCode::BucketAlreadyOwnedByYou => "Your previous request to create the named bucket succeeded and you already own it.".to_string(),
|
||||
S3ErrorCode::AllAccessDisabled => "All access to this resource has been disabled.".to_string(),
|
||||
S3ErrorCode::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document.".to_string(),
|
||||
S3ErrorCode::IncompleteBody => "You did not provide the number of bytes specified by the Content-Length HTTP header.".to_string(),
|
||||
S3ErrorCode::RequestTimeTooSkewed => "The difference between the request time and the server's time is too large.".to_string(),
|
||||
S3ErrorCode::InvalidRegion => "Region does not match.".to_string(),
|
||||
S3ErrorCode::SlowDown => "Resource requested is unreadable, please reduce your request rate".to_string(),
|
||||
S3ErrorCode::KeyTooLongError => "Your key is too long".to_string(),
|
||||
S3ErrorCode::NoSuchTagSet => "The TagSet does not exist".to_string(),
|
||||
S3ErrorCode::ObjectLockConfigurationNotFoundError => "Object Lock configuration does not exist for this bucket".to_string(),
|
||||
S3ErrorCode::InvalidBucketState => "Object Lock configuration cannot be enabled on existing buckets".to_string(),
|
||||
S3ErrorCode::NoSuchCORSConfiguration => "The CORS configuration does not exist".to_string(),
|
||||
S3ErrorCode::NoSuchWebsiteConfiguration => "The specified bucket does not have a website configuration".to_string(),
|
||||
S3ErrorCode::NoSuchObjectLockConfiguration => "The specified object does not have a ObjectLock configuration".to_string(),
|
||||
S3ErrorCode::MetadataTooLarge => "Your metadata headers exceed the maximum allowed metadata size.".to_string(),
|
||||
S3ErrorCode::ServiceUnavailable => "The service is unavailable. Please retry.".to_string(),
|
||||
S3ErrorCode::Busy => "The service is unavailable. Please retry.".to_string(),
|
||||
S3ErrorCode::EmptyRequestBody => "Request body cannot be empty.".to_string(),
|
||||
S3ErrorCode::UnauthorizedAccess => "You are not authorized to perform this operation".to_string(),
|
||||
S3ErrorCode::ExpressionTooLong => "The SQL expression is too long: The maximum byte-length for the SQL expression is 256 KB.".to_string(),
|
||||
S3ErrorCode::IllegalSqlFunctionArgument => "Illegal argument was used in the SQL function.".to_string(),
|
||||
S3ErrorCode::InvalidKeyPath => "Key path in the SQL expression is invalid.".to_string(),
|
||||
S3ErrorCode::InvalidCompressionFormat => "The file is not in a supported compression format. Only GZIP is supported at this time.".to_string(),
|
||||
S3ErrorCode::InvalidFileHeaderInfo => "The FileHeaderInfo is invalid. Only NONE, USE, and IGNORE are supported.".to_string(),
|
||||
S3ErrorCode::InvalidJsonType => "The JsonType is invalid. Only DOCUMENT and LINES are supported at this time.".to_string(),
|
||||
S3ErrorCode::InvalidQuoteFields => "The QuoteFields is invalid. Only ALWAYS and ASNEEDED are supported.".to_string(),
|
||||
S3ErrorCode::InvalidRequestParameter => "The value of a parameter in SelectRequest element is invalid. Check the service API documentation and try again.".to_string(),
|
||||
S3ErrorCode::InvalidDataSource => "Invalid data source type. Only CSV and JSON are supported at this time.".to_string(),
|
||||
S3ErrorCode::InvalidExpressionType => "The ExpressionType is invalid. Only SQL expressions are supported at this time.".to_string(),
|
||||
S3ErrorCode::InvalidDataType => "The SQL expression contains an invalid data type.".to_string(),
|
||||
S3ErrorCode::InvalidTextEncoding => "Invalid encoding type. Only UTF-8 encoding is supported at this time.".to_string(),
|
||||
S3ErrorCode::InvalidTableAlias => "The SQL expression contains an invalid table alias.".to_string(),
|
||||
S3ErrorCode::MissingRequiredParameter => "The SelectRequest entity is missing a required parameter. Check the service documentation and try again.".to_string(),
|
||||
S3ErrorCode::ObjectSerializationConflict => "The SelectRequest entity can only contain one of CSV or JSON. Check the service documentation and try again.".to_string(),
|
||||
S3ErrorCode::UnsupportedSqlOperation => "Encountered an unsupported SQL operation.".to_string(),
|
||||
S3ErrorCode::UnsupportedSqlStructure => "Encountered an unsupported SQL structure. Check the SQL Reference.".to_string(),
|
||||
S3ErrorCode::UnsupportedSyntax => "Encountered invalid syntax.".to_string(),
|
||||
S3ErrorCode::UnsupportedRangeHeader => "Range header is not supported for this operation.".to_string(),
|
||||
S3ErrorCode::LexerInvalidChar => "The SQL expression contains an invalid character.".to_string(),
|
||||
S3ErrorCode::LexerInvalidOperator => "The SQL expression contains an invalid literal.".to_string(),
|
||||
S3ErrorCode::LexerInvalidLiteral => "The SQL expression contains an invalid operator.".to_string(),
|
||||
S3ErrorCode::LexerInvalidIONLiteral => "The SQL expression contains an invalid operator.".to_string(),
|
||||
S3ErrorCode::ParseExpectedDatePart => "Did not find the expected date part in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedKeyword => "Did not find the expected keyword in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedTokenType => "Did not find the expected token in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpected2TokenTypes => "Did not find the expected token in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedNumber => "Did not find the expected number in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedRightParenBuiltinFunctionCall => "Did not find the expected right parenthesis character in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedTypeName => "Did not find the expected type name in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedWhenClause => "Did not find the expected WHEN clause in the SQL expression. CASE is not supported.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedToken => "The SQL expression contains an unsupported token.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedLiteralsGroupBy => "The SQL expression contains an unsupported use of GROUP BY.".to_string(),
|
||||
S3ErrorCode::ParseExpectedMember => "The SQL expression contains an unsupported use of MEMBER.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedSelect => "The SQL expression contains an unsupported use of SELECT.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedCase => "The SQL expression contains an unsupported use of CASE.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedCaseClause => "The SQL expression contains an unsupported use of CASE.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedAlias => "The SQL expression contains an unsupported use of ALIAS.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedSyntax => "The SQL expression contains unsupported syntax.".to_string(),
|
||||
S3ErrorCode::ParseUnknownOperator => "The SQL expression contains an invalid operator.".to_string(),
|
||||
S3ErrorCode::ParseMissingIdentAfterAt => "Did not find the expected identifier after the @ symbol in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseUnexpectedOperator => "The SQL expression contains an unexpected operator.".to_string(),
|
||||
S3ErrorCode::ParseUnexpectedTerm => "The SQL expression contains an unexpected term.".to_string(),
|
||||
S3ErrorCode::ParseUnexpectedToken => "The SQL expression contains an unexpected token.".to_string(),
|
||||
S3ErrorCode::ParseExpectedExpression => "Did not find the expected SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedLeftParenAfterCast => "Did not find expected the left parenthesis in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedLeftParenValueConstructor => "Did not find expected the left parenthesis in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedLeftParenBuiltinFunctionCall => "Did not find the expected left parenthesis in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedArgumentDelimiter => "Did not find the expected argument delimiter in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseCastArity => "The SQL expression CAST has incorrect arity.".to_string(),
|
||||
S3ErrorCode::ParseInvalidTypeParam => "The SQL expression contains an invalid parameter value.".to_string(),
|
||||
S3ErrorCode::ParseEmptySelect => "The SQL expression contains an empty SELECT.".to_string(),
|
||||
S3ErrorCode::ParseSelectMissingFrom => "GROUP is not supported in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedIdentForGroupName => "GROUP is not supported in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedIdentForAlias => "Did not find the expected identifier for the alias in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseUnsupportedCallWithStar => "Only COUNT with (*) as a parameter is supported in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseNonUnaryAgregateFunctionCall => "Only one argument is supported for aggregate functions in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseMalformedJoin => "JOIN is not supported in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseExpectedIdentForAt => "Did not find the expected identifier for AT name in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseAsteriskIsNotAloneInSelectList => "Other expressions are not allowed in the SELECT list when '*' is used without dot notation in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseCannotMixSqbAndWildcardInSelectList => "Cannot mix [] and * in the same expression in a SELECT list in SQL expression.".to_string(),
|
||||
S3ErrorCode::ParseInvalidContextForWildcardInSelectList => "Invalid use of * in SELECT list in the SQL expression.".to_string(),
|
||||
S3ErrorCode::IncorrectSqlFunctionArgumentType => "Incorrect type of arguments in function call in the SQL expression.".to_string(),
|
||||
S3ErrorCode::ValueParseFailure => "Time stamp parse failure in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorInvalidArguments => "Incorrect number of arguments in the function call in the SQL expression.".to_string(),
|
||||
S3ErrorCode::IntegerOverflow => "Int overflow or underflow in the SQL expression.".to_string(),
|
||||
S3ErrorCode::LikeInvalidInputs => "Invalid argument given to the LIKE clause in the SQL expression.".to_string(),
|
||||
S3ErrorCode::CastFailed => "Attempt to convert from one data type to another using CAST failed in the SQL expression.".to_string(),
|
||||
S3ErrorCode::InvalidCast => "Attempt to convert from one data type to another using CAST failed in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorInvalidTimestampFormatPattern => "Time stamp format pattern requires additional fields in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorInvalidTimestampFormatPatternSymbolForParsing => "Time stamp format pattern contains a valid format symbol that cannot be applied to time stamp parsing in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorTimestampFormatPatternDuplicateFields => "Time stamp format pattern contains multiple format specifiers representing the time stamp field in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorTimestampFormatPatternHourClockAmPmMismatch => "Time stamp format pattern contains unterminated token in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorUnterminatedTimestampFormatPatternToken => "Time stamp format pattern contains an invalid token in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorInvalidTimestampFormatPatternToken => "Time stamp format pattern contains an invalid token in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorInvalidTimestampFormatPatternSymbol => "Time stamp format pattern contains an invalid symbol in the SQL expression.".to_string(),
|
||||
S3ErrorCode::EvaluatorBindingDoesNotExist => "A column name or a path provided does not exist in the SQL expression".to_string(),
|
||||
S3ErrorCode::InvalidColumnIndex => "The column index is invalid. Please check the service documentation and try again.".to_string(),
|
||||
S3ErrorCode::UnsupportedFunction => "Encountered an unsupported SQL function.".to_string(),
|
||||
_ => code.as_str().to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ApiError> for S3Error {
|
||||
@@ -87,9 +224,14 @@ impl From<StorageError> for ApiError {
|
||||
_ => S3ErrorCode::InternalError,
|
||||
};
|
||||
|
||||
let message = if code == S3ErrorCode::InternalError {
|
||||
err.to_string()
|
||||
} else {
|
||||
ApiError::error_code_to_message(&code)
|
||||
};
|
||||
ApiError {
|
||||
code,
|
||||
message: err.to_string(),
|
||||
message,
|
||||
source: Some(Box::new(err)),
|
||||
}
|
||||
}
|
||||
@@ -186,7 +328,6 @@ mod tests {
|
||||
let api_error: ApiError = storage_error.into();
|
||||
|
||||
assert_eq!(api_error.code, S3ErrorCode::NoSuchBucket);
|
||||
assert!(api_error.message.contains("test-bucket"));
|
||||
assert!(api_error.source.is_some());
|
||||
|
||||
// Test that source can be downcast back to StorageError
|
||||
|
||||
@@ -2165,6 +2165,9 @@ impl S3 for FS {
|
||||
|
||||
let prefix = prefix.unwrap_or_default();
|
||||
let max_keys = max_keys.unwrap_or(1000);
|
||||
if max_keys < 0 {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InvalidArgument, "Invalid max keys".to_string()));
|
||||
}
|
||||
|
||||
let delimiter = delimiter.filter(|v| !v.is_empty());
|
||||
let start_after = start_after.filter(|v| !v.is_empty());
|
||||
@@ -2363,7 +2366,7 @@ impl S3 for FS {
|
||||
bucket,
|
||||
key,
|
||||
content_length,
|
||||
content_type,
|
||||
// content_type,
|
||||
tagging,
|
||||
metadata,
|
||||
version_id,
|
||||
@@ -2392,6 +2395,10 @@ impl S3 for FS {
|
||||
}
|
||||
};
|
||||
|
||||
if size == -1 {
|
||||
return Err(s3_error!(UnexpectedContent));
|
||||
}
|
||||
|
||||
let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string()))));
|
||||
|
||||
// let body = Box::new(StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string())))));
|
||||
@@ -2436,9 +2443,9 @@ impl S3 for FS {
|
||||
|
||||
let mut metadata = metadata.unwrap_or_default();
|
||||
|
||||
if let Some(content_type) = content_type {
|
||||
metadata.insert("content-type".to_string(), content_type.to_string());
|
||||
}
|
||||
// if let Some(content_type) = content_type {
|
||||
// metadata.insert("content-type".to_string(), content_type.to_string());
|
||||
// }
|
||||
|
||||
extract_metadata_from_mime_with_object_name(&req.headers, &mut metadata, true, Some(&key));
|
||||
|
||||
@@ -3659,11 +3666,6 @@ impl S3 for FS {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
// Validate tag_set length doesn't exceed 10
|
||||
if tagging.tag_set.len() > 10 {
|
||||
return Err(s3_error!(InvalidArgument, "Object tags cannot be greater than 10"));
|
||||
}
|
||||
|
||||
let mut tag_keys = std::collections::HashSet::with_capacity(tagging.tag_set.len());
|
||||
for tag in &tagging.tag_set {
|
||||
let key = tag
|
||||
|
||||
Reference in New Issue
Block a user