diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index a0ac8c1b..2431d6ba 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -1,4 +1,3 @@ -#![allow(unused_imports)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_mut)] #![allow(unused_assignments)] @@ -41,7 +41,7 @@ const ERR_LIFECYCLE_DUPLICATE_ID: &str = "Rule ID must be unique. Found same ID const _ERR_XML_NOT_WELL_FORMED: &str = "The XML you provided was not well-formed or did not validate against our published schema"; const ERR_LIFECYCLE_BUCKET_LOCKED: &str = - "ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an object locked bucket"; + "ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an retention bucket"; #[derive(Debug, Clone, PartialEq, Eq)] pub enum IlmAction { @@ -102,30 +102,30 @@ impl RuleValidate for LifecycleRule { } fn validate_status(&self) -> Result<()> { - if self.Status.len() == 0 { - return errEmptyRuleStatus; + if self.status.len() == 0 { + return ErrEmptyRuleStatus; } - if self.Status != Enabled && self.Status != Disabled { - return errInvalidRuleStatus; + if self.status != Enabled && self.status != Disabled { + return ErrInvalidRuleStatus; } Ok(()) } fn validate_expiration(&self) -> Result<()> { - self.Expiration.Validate(); + self.expiration.validate(); } fn validate_noncurrent_expiration(&self) -> Result<()> { - self.NoncurrentVersionExpiration.Validate() + self.noncurrent_version_expiration.validate() } fn validate_prefix_and_filter(&self) -> Result<()> { - if !self.Prefix.set && self.Filter.IsEmpty() || self.Prefix.set && !self.Filter.IsEmpty() { - return errXMLNotWellFormed; + if !self.prefix.set && self.Filter.isempty() || self.prefix.set && !self.filter.isempty() { + return ErrXMLNotWellFormed; } - if !self.Prefix.set { - return self.Filter.Validate(); + if !self.prefix.set { + return self.filter.validate(); } Ok(()) } @@ -267,7 +267,7 @@ impl Lifecycle for BucketLifecycleConfiguration { r.validate()?; if let Some(expiration) = r.expiration.as_ref() { if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker { - if lr_retention && (!expired_object_delete_marker) { + if lr_retention && (expired_object_delete_marker) { return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED)); } } diff --git a/crates/ecstore/src/client/api_bucket_policy.rs b/crates/ecstore/src/client/api_bucket_policy.rs index d9adbfe0..8ed9c606 100644 --- a/crates/ecstore/src/client/api_bucket_policy.rs +++ b/crates/ecstore/src/client/api_bucket_policy.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_error_response.rs b/crates/ecstore/src/client/api_error_response.rs index 6dbcbfb0..31ca0645 100644 --- a/crates/ecstore/src/client/api_error_response.rs +++ b/crates/ecstore/src/client/api_error_response.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_get_object_acl.rs b/crates/ecstore/src/client/api_get_object_acl.rs new file mode 100644 index 00000000..1a9167a3 --- /dev/null +++ b/crates/ecstore/src/client/api_get_object_acl.rs @@ -0,0 +1,180 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![allow(unused_imports)] +#![allow(unused_variables)] +#![allow(unused_mut)] +#![allow(unused_assignments)] +#![allow(unused_must_use)] +#![allow(clippy::all)] + +use bytes::Bytes; +use http::{HeaderMap, HeaderValue}; +use s3s::dto::Owner; +use std::io::Cursor; +use std::collections::HashMap; +use tokio::io::BufReader; + +use rustfs_utils::EMPTY_STRING_SHA256_HASH; +use crate::client::{ + api_error_response::{err_invalid_argument, http_resp_to_error_response}, + api_get_options::GetObjectOptions, + transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient}, +}; + +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +struct Grantee { + id: String, + display_name: String, + uri: String, +} + +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +struct Grant { + grantee: Grantee, + permission: String, +} + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] +pub struct AccessControlList { + pub grant: Vec, + pub permission: String, +} + +#[derive(Debug, Default, serde::Deserialize)] +pub struct AccessControlPolicy { + #[serde(skip)] + owner: Owner, + pub access_control_list: AccessControlList, +} + +impl TransitionClient { + pub async fn get_object_acl(&self, bucket_name: &str, object_name: &str) -> Result { + let mut url_values = HashMap::new(); + url_values.insert("acl".to_string(), "".to_string()); + let mut resp = self + .execute_method( + http::Method::GET, + &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + custom_header: HeaderMap::new(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + content_md5_base64: "".to_string(), + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }, + ) + .await?; + + if resp.status() != http::StatusCode::OK { + let b = resp.body().bytes().expect("err").to_vec(); + return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, object_name))); + } + + let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); + let mut res = match serde_xml_rs::from_str::(&String::from_utf8(b).unwrap()) { + Ok(result) => result, + Err(err) => { + return Err(std::io::Error::other(err.to_string())); + } + }; + + let mut obj_info = self.stat_object(bucket_name, object_name, &GetObjectOptions::default()).await?; + + obj_info.owner.display_name = res.owner.display_name.clone(); + obj_info.owner.id = res.owner.id.clone(); + + //obj_info.grant.extend(res.access_control_list.grant); + + let canned_acl = get_canned_acl(&res); + if canned_acl != "" { + obj_info.metadata.insert("X-Amz-Acl", HeaderValue::from_str(&canned_acl).unwrap()); + return Ok(obj_info); + } + + let grant_acl = get_amz_grant_acl(&res); + /*for (k, v) in grant_acl { + obj_info.metadata.insert(HeaderName::from_bytes(k.as_bytes()).unwrap(), HeaderValue::from_str(&v.to_string()).unwrap()); + }*/ + + Ok(obj_info) + } +} + +fn get_canned_acl(ac_policy: &AccessControlPolicy) -> String { + let grants = ac_policy.access_control_list.grant.clone(); + + if grants.len() == 1 { + if grants[0].grantee.uri == "" && grants[0].permission == "FULL_CONTROL" { + return "private".to_string(); + } + } else if grants.len() == 2 { + for g in grants { + if g.grantee.uri == "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" && &g.permission == "READ" { + return "authenticated-read".to_string(); + } + if g.grantee.uri == "http://acs.amazonaws.com/groups/global/AllUsers" && &g.permission == "READ" { + return "public-read".to_string(); + } + if g.permission == "READ" && g.grantee.id == ac_policy.owner.id.clone().unwrap() { + return "bucket-owner-read".to_string(); + } + } + } else if grants.len() == 3 { + for g in grants { + if g.grantee.uri == "http://acs.amazonaws.com/groups/global/AllUsers" && g.permission == "WRITE" { + return "public-read-write".to_string(); + } + } + } + "".to_string() +} + +pub fn get_amz_grant_acl(ac_policy: &AccessControlPolicy) -> HashMap> { + let grants = ac_policy.access_control_list.grant.clone(); + let mut res = HashMap::>::new(); + + for g in grants { + let mut id = "id=".to_string(); + id.push_str(&g.grantee.id); + let permission: &str = &g.permission; + match permission { + "READ" => { + res.entry("X-Amz-Grant-Read".to_string()).or_insert(vec![]).push(id); + } + "WRITE" => { + res.entry("X-Amz-Grant-Write".to_string()).or_insert(vec![]).push(id); + } + "READ_ACP" => { + res.entry("X-Amz-Grant-Read-Acp".to_string()).or_insert(vec![]).push(id); + } + "WRITE_ACP" => { + res.entry("X-Amz-Grant-Write-Acp".to_string()).or_insert(vec![]).push(id); + } + "FULL_CONTROL" => { + res.entry("X-Amz-Grant-Full-Control".to_string()).or_insert(vec![]).push(id); + } + _ => (), + } + } + res +} diff --git a/crates/ecstore/src/client/api_get_object_attributes.rs b/crates/ecstore/src/client/api_get_object_attributes.rs new file mode 100644 index 00000000..9204e7dd --- /dev/null +++ b/crates/ecstore/src/client/api_get_object_attributes.rs @@ -0,0 +1,229 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![allow(unused_imports)] +#![allow(unused_variables)] +#![allow(unused_mut)] +#![allow(unused_assignments)] +#![allow(unused_must_use)] +#![allow(clippy::all)] + +use bytes::Bytes; +use http::{HeaderMap, HeaderValue}; +use time::OffsetDateTime; +use std::io::Cursor; +use std::collections::HashMap; +use tokio::io::BufReader; + +use s3s::{Body, dto::Owner}; +use s3s::header::{X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_DELETE_MARKER, X_AMZ_METADATA_DIRECTIVE, X_AMZ_VERSION_ID, + X_AMZ_REQUEST_CHARGED, X_AMZ_RESTORE, X_AMZ_PART_NUMBER_MARKER, X_AMZ_MAX_PARTS,}; +use rustfs_utils::EMPTY_STRING_SHA256_HASH; +use crate::client::constants::{GET_OBJECT_ATTRIBUTES_MAX_PARTS, GET_OBJECT_ATTRIBUTES_TAGS, ISO8601_DATEFORMAT}; + +use crate::client::{ + api_error_response::err_invalid_argument, + api_get_options::GetObjectOptions, + transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info}, + api_get_object_acl::AccessControlPolicy, +}; + +struct ObjectAttributesOptions { + max_parts: i64, + version_id: String, + part_number_marker: i64, + //server_side_encryption: encrypt::ServerSide, +} + +struct ObjectAttributes { + version_id: String, + last_modified: OffsetDateTime, + object_attributes_response: ObjectAttributesResponse, +} + +impl ObjectAttributes { + fn new() -> Self { + Self { + version_id: "".to_string(), + last_modified: OffsetDateTime::now_utc(), + object_attributes_response: ObjectAttributesResponse::new(), + } + } +} + +#[derive(Debug, Default, serde::Deserialize)] +pub struct Checksum { + checksum_crc32: String, + checksum_crc32c: String, + checksum_sha1: String, + checksum_sha256: String, +} + +impl Checksum { + fn new() -> Self { + Self { + checksum_crc32: "".to_string(), + checksum_crc32c: "".to_string(), + checksum_sha1: "".to_string(), + checksum_sha256: "".to_string(), + } + } +} + +#[derive(Debug, Default, serde::Deserialize)] +struct ObjectParts { + parts_count: i64, + part_number_marker: i64, + next_part_number_marker: i64, + max_parts: i64, + is_truncated: bool, + parts: Vec, +} + +impl ObjectParts { + fn new() -> Self { + Self { + parts_count: 0, + part_number_marker: 0, + next_part_number_marker: 0, + max_parts: 0, + is_truncated: false, + parts: Vec::new(), + } + } +} + +#[derive(Debug, Default, serde::Deserialize)] +struct ObjectAttributesResponse { + etag: String, + storage_class: String, + object_size: i64, + checksum: Checksum, + object_parts: ObjectParts, +} + +impl ObjectAttributesResponse { + fn new() -> Self { + Self { + etag: "".to_string(), + storage_class: "".to_string(), + object_size: 0, + checksum: Checksum::new(), + object_parts: ObjectParts::new(), + } + } +} + +#[derive(Debug, Default, serde::Deserialize)] +struct ObjectAttributePart { + checksum_crc32: String, + checksum_crc32c: String, + checksum_sha1: String, + checksum_sha256: String, + part_number: i64, + size: i64, +} + +impl ObjectAttributes { + pub async fn parse_response(&mut self, resp: &mut http::Response) -> Result<(), std::io::Error> { + let h = resp.headers(); + let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time + self.last_modified = mod_time; + self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string(); + + let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); + let mut response = match serde_xml_rs::from_str::(&String::from_utf8(b).unwrap()) { + Ok(result) => result, + Err(err) => { + return Err(std::io::Error::other(err.to_string())); + } + }; + self.object_attributes_response = response; + + Ok(()) + } +} + +impl TransitionClient { + pub async fn get_object_attributes(&self, bucket_name: &str, object_name: &str, opts: ObjectAttributesOptions) -> Result { + let mut url_values = HashMap::new(); + url_values.insert("attributes".to_string(), "".to_string()); + if opts.version_id != "" { + url_values.insert("versionId".to_string(), opts.version_id); + } + + let mut headers = HeaderMap::new(); + headers.insert(X_AMZ_OBJECT_ATTRIBUTES, HeaderValue::from_str(GET_OBJECT_ATTRIBUTES_TAGS).unwrap()); + + if opts.part_number_marker > 0 { + headers.insert(X_AMZ_PART_NUMBER_MARKER, HeaderValue::from_str(&opts.part_number_marker.to_string()).unwrap()); + } + + if opts.max_parts > 0 { + headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&opts.max_parts.to_string()).unwrap()); + } else { + headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&GET_OBJECT_ATTRIBUTES_MAX_PARTS.to_string()).unwrap()); + } + + /*if opts.server_side_encryption.is_some() { + opts.server_side_encryption.Marshal(headers); + }*/ + + let mut resp = self + .execute_method( + http::Method::HEAD, + &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + custom_header: headers, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_md5_base64: "".to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }, + ) + .await?; + + let h = resp.headers(); + let has_etag = h.get("ETag").unwrap().to_str().unwrap(); + if !has_etag.is_empty() { + return Err(std::io::Error::other("get_object_attributes is not supported by the current endpoint version")); + } + + if resp.status() != http::StatusCode::OK { + let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); + let err_body = String::from_utf8(b).unwrap(); + let mut er = match serde_xml_rs::from_str::(&err_body) { + Ok(result) => result, + Err(err) => { + return Err(std::io::Error::other(err.to_string())); + } + }; + + return Err(std::io::Error::other(er.access_control_list.permission)); + } + + let mut oa = ObjectAttributes::new(); + oa.parse_response(&mut resp).await?; + + Ok(oa) + } +} \ No newline at end of file diff --git a/crates/ecstore/src/client/api_get_object_file.rs b/crates/ecstore/src/client/api_get_object_file.rs new file mode 100644 index 00000000..6bda2069 --- /dev/null +++ b/crates/ecstore/src/client/api_get_object_file.rs @@ -0,0 +1,141 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![allow(unused_imports)] +#![allow(unused_variables)] +#![allow(unused_mut)] +#![allow(unused_assignments)] +#![allow(unused_must_use)] +#![allow(clippy::all)] + +use bytes::Bytes; +use http::HeaderMap; +use std::io::Cursor; +use tokio::io::BufReader; +#[cfg(not(windows))] +use std::os::unix::fs::PermissionsExt; +#[cfg(not(windows))] +use std::os::unix::fs::OpenOptionsExt; +#[cfg(not(windows))] +use std::os::unix::fs::MetadataExt; +#[cfg(windows)] +use std::os::windows::fs::MetadataExt; + +use crate::client::{ + api_error_response::err_invalid_argument, + api_get_options::GetObjectOptions, + transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info}, +}; + +impl TransitionClient { + pub async fn fget_object(&self, bucket_name: &str, object_name: &str, file_path: &str, opts: GetObjectOptions) -> Result<(), std::io::Error> { + match std::fs::metadata(file_path) { + Ok(file_path_stat) => { + let ft = file_path_stat.file_type(); + if ft.is_dir() { + return Err(std::io::Error::other(err_invalid_argument("filename is a directory."))); + } + }, + Err(err) => { + return Err(std::io::Error::other(err)); + } + } + + let path = std::path::Path::new(file_path); + if let Some(parent) = path.parent() { + if let Some(object_dir) = parent.file_name() { + match std::fs::create_dir_all(object_dir) { + Ok(_) => { + let dir = std::path::Path::new(object_dir); + if let Ok(dir_stat) = dir.metadata() { + #[cfg(not(windows))] + dir_stat.permissions().set_mode(0o700); + } + } + Err(err) => { + return Err(std::io::Error::other(err)); + } + } + } + } + + let object_stat = match self.stat_object(bucket_name, object_name, &opts).await { + Ok(object_stat) => object_stat, + Err(err) => { + return Err(std::io::Error::other(err)); + } + }; + + let mut file_part_path = file_path.to_string(); + file_part_path.push_str(""/*sum_sha256_hex(object_stat.etag.as_bytes())*/); + file_part_path.push_str(".part.rustfs"); + + #[cfg(not(windows))] + let file_part = match std::fs::OpenOptions::new().mode(0o600).open(file_part_path.clone()) { + Ok(file_part) => file_part, + Err(err) => { + return Err(std::io::Error::other(err)); + } + }; + #[cfg(windows)] + let file_part = match std::fs::OpenOptions::new().open(file_part_path.clone()) { + Ok(file_part) => file_part, + Err(err) => { + return Err(std::io::Error::other(err)); + } + }; + + let mut close_and_remove = true; + /*defer(|| { + if close_and_remove { + _ = file_part.close(); + let _ = std::fs::remove(file_part_path); + } + });*/ + + let st = match file_part.metadata() { + Ok(st) => st, + Err(err) => { + return Err(std::io::Error::other(err)); + } + }; + + let mut opts = opts; + #[cfg(windows)] + if st.file_size() > 0 { + opts.set_range(st.file_size() as i64, 0); + } + + let object_reader = match self.get_object(bucket_name, object_name, &opts) { + Ok(object_reader) => object_reader, + Err(err) => { + return Err(std::io::Error::other(err)); + } + }; + + /*if let Err(err) = std::fs::copy(file_part, object_reader) { + return Err(std::io::Error::other(err)); + }*/ + + close_and_remove = false; + /*if let Err(err) = file_part.close() { + return Err(std::io::Error::other(err)); + }*/ + + if let Err(err) = std::fs::rename(file_part_path, file_path) { + return Err(std::io::Error::other(err)); + } + + Ok(()) + } +} \ No newline at end of file diff --git a/crates/ecstore/src/client/api_get_options.rs b/crates/ecstore/src/client/api_get_options.rs index 324d3006..3692b29b 100644 --- a/crates/ecstore/src/client/api_get_options.rs +++ b/crates/ecstore/src/client/api_get_options.rs @@ -29,9 +29,9 @@ use crate::client::api_error_response::err_invalid_argument; #[derive(Default)] #[allow(dead_code)] pub struct AdvancedGetOptions { - replication_deletemarker: bool, - is_replication_ready_for_deletemarker: bool, - replication_proxy_request: String, + pub replication_delete_marker: bool, + pub is_replication_ready_for_delete_marker: bool, + pub replication_proxy_request: String, } pub struct GetObjectOptions { diff --git a/crates/ecstore/src/client/api_list.rs b/crates/ecstore/src/client/api_list.rs index bd5259e9..f978f063 100644 --- a/crates/ecstore/src/client/api_list.rs +++ b/crates/ecstore/src/client/api_list.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_put_object.rs b/crates/ecstore/src/client/api_put_object.rs index 0cfa0378..b0d81c6e 100644 --- a/crates/ecstore/src/client/api_put_object.rs +++ b/crates/ecstore/src/client/api_put_object.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_put_object_multipart.rs b/crates/ecstore/src/client/api_put_object_multipart.rs index 4bc68529..67854c72 100644 --- a/crates/ecstore/src/client/api_put_object_multipart.rs +++ b/crates/ecstore/src/client/api_put_object_multipart.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_put_object_streaming.rs b/crates/ecstore/src/client/api_put_object_streaming.rs index fa6d1c89..2e56dcac 100644 --- a/crates/ecstore/src/client/api_put_object_streaming.rs +++ b/crates/ecstore/src/client/api_put_object_streaming.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_remove.rs b/crates/ecstore/src/client/api_remove.rs index 853ee641..a6845229 100644 --- a/crates/ecstore/src/client/api_remove.rs +++ b/crates/ecstore/src/client/api_remove.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_restore.rs b/crates/ecstore/src/client/api_restore.rs new file mode 100644 index 00000000..ad11fcf3 --- /dev/null +++ b/crates/ecstore/src/client/api_restore.rs @@ -0,0 +1,163 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![allow(unused_imports)] +#![allow(unused_variables)] +#![allow(unused_mut)] +#![allow(unused_assignments)] +#![allow(unused_must_use)] +#![allow(clippy::all)] + +use bytes::Bytes; +use http::HeaderMap; +use std::io::Cursor; +use tokio::io::BufReader; +use std::collections::HashMap; + +use crate::client::{ + api_error_response::{err_invalid_argument, http_resp_to_error_response}, api_get_object_acl::AccessControlList, api_get_options::GetObjectOptions, transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient} +}; + +const TIER_STANDARD: &str = "Standard"; +const TIER_BULK: &str = "Bulk"; +const TIER_EXPEDITED: &str = "Expedited"; + +#[derive(Debug, Default, serde::Serialize)] +struct GlacierJobParameters { + tier: String, +} + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] +struct Encryption { + encryption_type: String, + kms_context: String, + kms_key_id: String, +} + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] +struct MetadataEntry { + name: String, + value: String, +} + +#[derive(Debug, Default, serde::Serialize)] +struct S3 { + access_control_list: AccessControlList, + bucket_name: String, + prefix: String, + canned_acl: String, + encryption: Encryption, + storage_class: String, + //tagging: Tags, + user_metadata: MetadataEntry, +} + +#[derive(Debug, Default, serde::Serialize)] +struct SelectParameters { + expression_type: String, + expression: String, + //input_serialization: SelectObjectInputSerialization, + //output_serialization: SelectObjectOutputSerialization, +} + +#[derive(Debug, Default, serde::Serialize)] +struct OutputLocation(S3); + +#[derive(Debug, Default, serde::Serialize)] +struct RestoreRequest { + restore_type: String, + tier: String, + days: i64, + glacier_job_parameters: GlacierJobParameters, + description: String, + select_parameters: SelectParameters, + output_location: OutputLocation, +} + +impl RestoreRequest { + fn set_days(&mut self, v: i64) { + self.days = v; + } + + fn set_glacier_job_parameters(&mut self, v: GlacierJobParameters) { + self.glacier_job_parameters = v; + } + + fn set_type(&mut self, v: &str) { + self.restore_type = v.to_string(); + } + + fn set_tier(&mut self, v: &str) { + self.tier = v.to_string(); + } + + fn set_description(&mut self, v: &str) { + self.description = v.to_string(); + } + + fn set_select_parameters(&mut self, v: SelectParameters) { + self.select_parameters = v; + } + + fn set_output_location(&mut self, v: OutputLocation) { + self.output_location = v; + } +} + +impl TransitionClient { + pub async fn restore_object(&self, bucket_name: &str, object_name: &str, version_id: &str, restore_req: &RestoreRequest) -> Result<(), std::io::Error> { + let restore_request = match serde_xml_rs::to_string(restore_req) { + Ok(buf) => buf, + Err(e) => { + return Err(std::io::Error::other(e)); + } + }; + let restore_request_bytes = restore_request.as_bytes().to_vec(); + + let mut url_values = HashMap::new(); + url_values.insert("restore".to_string(), "".to_string()); + if version_id != "" { + url_values.insert("versionId".to_string(), version_id.to_string()); + } + + let restore_request_buffer = Bytes::from(restore_request_bytes.clone()); + let resp = self + .execute_method( + http::Method::HEAD, + &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: url_values, + custom_header: HeaderMap::new(), + content_sha256_hex: "".to_string(), //sum_sha256_hex(&restore_request_bytes), + content_md5_base64: "".to_string(), //sum_md5_base64(&restore_request_bytes), + content_body: ReaderImpl::Body(restore_request_buffer), + content_length: restore_request_bytes.len() as i64, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }, + ) + .await?; + + let b = resp.body().bytes().expect("err").to_vec(); + if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK { + return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, ""))); + } + Ok(()) + } +} \ No newline at end of file diff --git a/crates/ecstore/src/client/api_s3_datatypes.rs b/crates/ecstore/src/client/api_s3_datatypes.rs index a026c731..ba26325e 100644 --- a/crates/ecstore/src/client/api_s3_datatypes.rs +++ b/crates/ecstore/src/client/api_s3_datatypes.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/ecstore/src/client/api_stat.rs b/crates/ecstore/src/client/api_stat.rs new file mode 100644 index 00000000..b9f209f3 --- /dev/null +++ b/crates/ecstore/src/client/api_stat.rs @@ -0,0 +1,153 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![allow(unused_imports)] +#![allow(unused_variables)] +#![allow(unused_mut)] +#![allow(unused_assignments)] +#![allow(unused_must_use)] +#![allow(clippy::all)] + +use bytes::Bytes; +use http::{HeaderMap, HeaderValue}; +use rustfs_utils::EMPTY_STRING_SHA256_HASH; +use uuid::Uuid; +use std::{collections::HashMap, str::FromStr}; +use tokio::io::BufReader; + +use s3s::header::{X_AMZ_DELETE_MARKER, X_AMZ_VERSION_ID}; +use crate::client::{ + api_error_response::{err_invalid_argument, http_resp_to_error_response, ErrorResponse}, + api_get_options::GetObjectOptions, + transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient}, +}; + +impl TransitionClient { + pub async fn bucket_exists(&self, bucket_name: &str) -> Result { + let resp = self + .execute_method( + http::Method::HEAD, + &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: "".to_string(), + query_values: HashMap::new(), + custom_header: HeaderMap::new(), + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_md5_base64: "".to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }, + ) + .await; + + if let Ok(resp) = resp { + let b = resp.body().bytes().expect("err").to_vec(); + let resperr = http_resp_to_error_response(resp, b, bucket_name, ""); + /*if to_error_response(resperr).code == "NoSuchBucket" { + return Ok(false); + } + if resp.status_code() != http::StatusCode::OK { + return Ok(false); + }*/ + } + Ok(true) + } + + pub async fn stat_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result { + let mut headers = opts.header(); + if opts.internal.replication_delete_marker { + headers.insert("X-Source-DeleteMarker", HeaderValue::from_str("true").unwrap()); + } + if opts.internal.is_replication_ready_for_delete_marker { + headers.insert("X-Check-Replication-Ready", HeaderValue::from_str("true").unwrap()); + } + + let resp = self + .execute_method( + http::Method::HEAD, + &mut RequestMetadata { + bucket_name: bucket_name.to_string(), + object_name: object_name.to_string(), + query_values: opts.to_query_values(), + custom_header: headers, + content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(), + content_md5_base64: "".to_string(), + content_body: ReaderImpl::Body(Bytes::new()), + content_length: 0, + stream_sha256: false, + trailer: HeaderMap::new(), + pre_sign_url: Default::default(), + add_crc: Default::default(), + extra_pre_sign_header: Default::default(), + bucket_location: Default::default(), + expires: Default::default(), + }, + ) + .await; + + match resp { + Ok(resp) => { + let h = resp.headers(); + let delete_marker = if let Some(x_amz_delete_marker) = h.get(X_AMZ_DELETE_MARKER.as_str()) { + x_amz_delete_marker.to_str().unwrap() == "true" + } else { false }; + let replication_ready = if let Some(x_amz_delete_marker) = h.get("X-Replication-Ready") { + x_amz_delete_marker.to_str().unwrap() == "true" + } else { false }; + if resp.status() != http::StatusCode::OK && resp.status() != http::StatusCode::PARTIAL_CONTENT { + if resp.status() == http::StatusCode::METHOD_NOT_ALLOWED && opts.version_id != "" && delete_marker { + let err_resp = ErrorResponse { + status_code: resp.status(), + code: s3s::S3ErrorCode::MethodNotAllowed, + message: "the specified method is not allowed against this resource.".to_string(), + bucket_name: bucket_name.to_string(), + key: object_name.to_string(), + ..Default::default() + }; + return Ok(ObjectInfo { + version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) { + Ok(v) => v, + Err(e) => { return Err(std::io::Error::other(e)); } + }, + is_delete_marker: delete_marker, + ..Default::default() + }); + //err_resp + } + return Ok(ObjectInfo { + version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) { + Ok(v) => v, + Err(e) => { return Err(std::io::Error::other(e)); } + }, + is_delete_marker: delete_marker, + replication_ready: replication_ready, + ..Default::default() + }); + //http_resp_to_error_response(resp, bucket_name, object_name) + } + + Ok(to_object_info(bucket_name, object_name, h).unwrap()) + } + Err(err) => { + return Err(std::io::Error::other(err)); + } + } + } +} diff --git a/crates/ecstore/src/client/mod.rs b/crates/ecstore/src/client/mod.rs index 25892d2f..7cc781a6 100644 --- a/crates/ecstore/src/client/mod.rs +++ b/crates/ecstore/src/client/mod.rs @@ -23,6 +23,11 @@ pub mod api_put_object_common; pub mod api_put_object_multipart; pub mod api_put_object_streaming; pub mod api_remove; +pub mod api_restore; +pub mod api_stat; +pub mod api_get_object_acl; +pub mod api_get_object_attributes; +pub mod api_get_object_file; pub mod api_s3_datatypes; pub mod bucket_cache; pub mod constants; diff --git a/crates/ecstore/src/tier/tier.rs b/crates/ecstore/src/tier/tier.rs index 3d034f69..fdfed498 100644 --- a/crates/ecstore/src/tier/tier.rs +++ b/crates/ecstore/src/tier/tier.rs @@ -1,4 +1,3 @@ -#![allow(unused_imports)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_mut)] #![allow(unused_assignments)] diff --git a/rustfs/src/admin/handlers/tier.rs b/rustfs/src/admin/handlers/tier.rs index ff1efa10..35c8b4c1 100644 --- a/rustfs/src/admin/handlers/tier.rs +++ b/rustfs/src/admin/handlers/tier.rs @@ -1,4 +1,3 @@ -#![allow(unused_variables, unused_mut, unused_must_use)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#![allow(unused_variables, unused_mut, unused_must_use)] use http::{HeaderMap, StatusCode}; //use iam::get_global_action_cred; @@ -461,3 +461,182 @@ impl Operation for ClearTier { Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) } } + +/*pub struct PostRestoreObject {} +#[async_trait::async_trait] +impl Operation for PostRestoreObject { + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + let query = { + if let Some(query) = req.uri.query() { + let input: PostRestoreObject = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; + input + } else { + PostRestoreObject::default() + } + }; + + let bucket = params.bucket; + if let Err(e) = un_escape_path(params.object) { + warn!("post restore object failed, e: {:?}", e); + return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed")); + } + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + let get_object_info = store.get_object_info(); + + if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) { + return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed")); + } + + if req.content_length <= 0 { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + } + let Some(opts) = post_restore_opts(req, bucket, object) else { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + }; + + let Some(obj_info) = getObjectInfo(ctx, bucket, object, opts) else { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + }; + + if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + } + + let mut api_err; + let Some(rreq) = parsere_store_request(req.body(), req.content_length) else { + let api_err = errorCodes.ToAPIErr(ErrMalformedXML); + api_err.description = err.Error() + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + }; + let mut status_code = http::StatusCode::OK; + let mut already_restored = false; + if Err(err) = rreq.validate(store) { + api_err = errorCodes.ToAPIErr(ErrMalformedXML) + api_err.description = err.Error() + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + } else { + if obj_info.restore_ongoing && rreq.Type != "SELECT" { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()), "post restore object failed")); + } + if !obj_info.restore_ongoing && !obj_info.restore_expires.unix_timestamp() == 0 { + status_code = http::StatusCode::Accepted; + already_restored = true; + } + } + let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), rreq.days); + let mut metadata = clone_mss(obj_info.user_defined); + + if rreq.type != "SELECT" { + obj_info.metadataOnly = true; + metadata[xhttp.AmzRestoreExpiryDays] = rreq.days; + metadata[xhttp.AmzRestoreRequestDate] = OffsetDateTime::now_utc().format(http::TimeFormat); + if already_restored { + metadata[xhttp.AmzRestore] = completedRestoreObj(restore_expiry).String() + } else { + metadata[xhttp.AmzRestore] = ongoingRestoreObj().String() + } + obj_info.user_defined = metadata; + if let Err(err) = store.copy_object(bucket, object, bucket, object, obj_info, ObjectOptions { + version_id: obj_info.version_id, + }, ObjectOptions { + version_id: obj_info.version_id, + m_time: obj_info.mod_time, + }) { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); + } + if already_restored { + return Ok(()); + } + } + + let restore_object = must_get_uuid(); + if rreq.output_location.s3.bucket_name != "" { + w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)} + } + w.WriteHeader(status_code) + send_event(EventArgs { + event_name: event::ObjectRestorePost, + bucket_name: bucket, + object: obj_info, + req_params: extract_req_params(r), + user_agent: req.user_agent(), + host: handlers::get_source_ip(r), + }); + tokio::spawn(async move { + if !rreq.SelectParameters.IsEmpty() { + let actual_size = obj_info.get_actual_size(); + if actual_size.is_err() { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); + } + + let object_rsc = s3select.NewObjectReadSeekCloser( + |offset int64| -> (io.ReadCloser, error) { + rs := &HTTPRangeSpec{ + IsSuffixLength: false, + Start: offset, + End: -1, + } + return getTransitionedObjectReader(bucket, object, rs, r.Header, + obj_info, ObjectOptions {version_id: obj_info.version_id}); + }, + actual_size.unwrap(), + ); + if err = rreq.SelectParameters.Open(objectRSC); err != nil { + if serr, ok := err.(s3select.SelectError); ok { + let encoded_error_response = encodeResponse(APIErrorResponse { + code: serr.ErrorCode(), + message: serr.ErrorMessage(), + bucket_name: bucket, + key: object, + resource: r.URL.Path, + request_id: w.Header().Get(xhttp.AmzRequestID), + host_id: globalDeploymentID(), + }); + //writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML) + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); + } else { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); + } + return Ok(()); + } + let nr = httptest.NewRecorder(); + let rw = xhttp.NewResponseRecorder(nr); + rw.log_err_body = true; + rw.log_all_body = true; + rreq.select_parameters.evaluate(rw); + rreq.select_parameters.Close(); + return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); + } + let opts = ObjectOptions { + transition: TransitionOptions { + restore_request: rreq, + restore_expiry: restore_expiry, + }, + version_id: objInfo.version_id, + } + if Err(err) = store.restore_transitioned_object(bucket, object, opts) { + format!(format!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string())); + return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); + } + + send_event(EventArgs { + EventName: event.ObjectRestoreCompleted, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }); + }); + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + } +}*/ diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index f33dd426..60561264 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1977,18 +1977,18 @@ impl S3 for FS { } = req.input; let mut lr_retention = false; - let rcfg = metadata_sys::get_object_lock_config(&bucket).await; + /*let rcfg = metadata_sys::get_object_lock_config(&bucket).await; if let Ok(rcfg) = rcfg { if let Some(rule) = rcfg.0.rule { if let Some(retention) = rule.default_retention { if let Some(mode) = retention.mode { - if mode == ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE) { + //if mode == ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE) { lr_retention = true; - } + //} } } } - } + }*/ //info!("lifecycle_configuration: {:?}", &lifecycle_configuration);