Feature up/ilm (#61)

* fix delete-marker expiration. add api_restore.
This commit is contained in:
likewu
2025-07-06 12:31:08 +08:00
committed by GitHub
parent d41ccc1551
commit 2b079ae065
19 changed files with 1072 additions and 30 deletions

View File

@@ -1,4 +1,3 @@
#![allow(unused_imports)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
@@ -41,7 +41,7 @@ const ERR_LIFECYCLE_DUPLICATE_ID: &str = "Rule ID must be unique. Found same ID
const _ERR_XML_NOT_WELL_FORMED: &str =
"The XML you provided was not well-formed or did not validate against our published schema";
const ERR_LIFECYCLE_BUCKET_LOCKED: &str =
"ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an object locked bucket";
"ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an retention bucket";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IlmAction {
@@ -102,30 +102,30 @@ impl RuleValidate for LifecycleRule {
}
fn validate_status(&self) -> Result<()> {
if self.Status.len() == 0 {
return errEmptyRuleStatus;
if self.status.len() == 0 {
return ErrEmptyRuleStatus;
}
if self.Status != Enabled && self.Status != Disabled {
return errInvalidRuleStatus;
if self.status != Enabled && self.status != Disabled {
return ErrInvalidRuleStatus;
}
Ok(())
}
fn validate_expiration(&self) -> Result<()> {
self.Expiration.Validate();
self.expiration.validate();
}
fn validate_noncurrent_expiration(&self) -> Result<()> {
self.NoncurrentVersionExpiration.Validate()
self.noncurrent_version_expiration.validate()
}
fn validate_prefix_and_filter(&self) -> Result<()> {
if !self.Prefix.set && self.Filter.IsEmpty() || self.Prefix.set && !self.Filter.IsEmpty() {
return errXMLNotWellFormed;
if !self.prefix.set && self.Filter.isempty() || self.prefix.set && !self.filter.isempty() {
return ErrXMLNotWellFormed;
}
if !self.Prefix.set {
return self.Filter.Validate();
if !self.prefix.set {
return self.filter.validate();
}
Ok(())
}
@@ -267,7 +267,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
r.validate()?;
if let Some(expiration) = r.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if lr_retention && (!expired_object_delete_marker) {
if lr_retention && (expired_object_delete_marker) {
return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED));
}
}

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -0,0 +1,180 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use s3s::dto::Owner;
use std::io::Cursor;
use std::collections::HashMap;
use tokio::io::BufReader;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use crate::client::{
api_error_response::{err_invalid_argument, http_resp_to_error_response},
api_get_options::GetObjectOptions,
transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient},
};
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
struct Grantee {
id: String,
display_name: String,
uri: String,
}
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
struct Grant {
grantee: Grantee,
permission: String,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct AccessControlList {
pub grant: Vec<Grant>,
pub permission: String,
}
#[derive(Debug, Default, serde::Deserialize)]
pub struct AccessControlPolicy {
#[serde(skip)]
owner: Owner,
pub access_control_list: AccessControlList,
}
impl TransitionClient {
pub async fn get_object_acl(&self, bucket_name: &str, object_name: &str) -> Result<ObjectInfo, std::io::Error> {
let mut url_values = HashMap::new();
url_values.insert("acl".to_string(), "".to_string());
let mut resp = self
.execute_method(
http::Method::GET,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: HeaderMap::new(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
content_md5_base64: "".to_string(),
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.await?;
if resp.status() != http::StatusCode::OK {
let b = resp.body().bytes().expect("err").to_vec();
return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, object_name)));
}
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let mut res = match serde_xml_rs::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
}
};
let mut obj_info = self.stat_object(bucket_name, object_name, &GetObjectOptions::default()).await?;
obj_info.owner.display_name = res.owner.display_name.clone();
obj_info.owner.id = res.owner.id.clone();
//obj_info.grant.extend(res.access_control_list.grant);
let canned_acl = get_canned_acl(&res);
if canned_acl != "" {
obj_info.metadata.insert("X-Amz-Acl", HeaderValue::from_str(&canned_acl).unwrap());
return Ok(obj_info);
}
let grant_acl = get_amz_grant_acl(&res);
/*for (k, v) in grant_acl {
obj_info.metadata.insert(HeaderName::from_bytes(k.as_bytes()).unwrap(), HeaderValue::from_str(&v.to_string()).unwrap());
}*/
Ok(obj_info)
}
}
fn get_canned_acl(ac_policy: &AccessControlPolicy) -> String {
let grants = ac_policy.access_control_list.grant.clone();
if grants.len() == 1 {
if grants[0].grantee.uri == "" && grants[0].permission == "FULL_CONTROL" {
return "private".to_string();
}
} else if grants.len() == 2 {
for g in grants {
if g.grantee.uri == "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" && &g.permission == "READ" {
return "authenticated-read".to_string();
}
if g.grantee.uri == "http://acs.amazonaws.com/groups/global/AllUsers" && &g.permission == "READ" {
return "public-read".to_string();
}
if g.permission == "READ" && g.grantee.id == ac_policy.owner.id.clone().unwrap() {
return "bucket-owner-read".to_string();
}
}
} else if grants.len() == 3 {
for g in grants {
if g.grantee.uri == "http://acs.amazonaws.com/groups/global/AllUsers" && g.permission == "WRITE" {
return "public-read-write".to_string();
}
}
}
"".to_string()
}
pub fn get_amz_grant_acl(ac_policy: &AccessControlPolicy) -> HashMap<String, Vec<String>> {
let grants = ac_policy.access_control_list.grant.clone();
let mut res = HashMap::<String, Vec<String>>::new();
for g in grants {
let mut id = "id=".to_string();
id.push_str(&g.grantee.id);
let permission: &str = &g.permission;
match permission {
"READ" => {
res.entry("X-Amz-Grant-Read".to_string()).or_insert(vec![]).push(id);
}
"WRITE" => {
res.entry("X-Amz-Grant-Write".to_string()).or_insert(vec![]).push(id);
}
"READ_ACP" => {
res.entry("X-Amz-Grant-Read-Acp".to_string()).or_insert(vec![]).push(id);
}
"WRITE_ACP" => {
res.entry("X-Amz-Grant-Write-Acp".to_string()).or_insert(vec![]).push(id);
}
"FULL_CONTROL" => {
res.entry("X-Amz-Grant-Full-Control".to_string()).or_insert(vec![]).push(id);
}
_ => (),
}
}
res
}

View File

@@ -0,0 +1,229 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use time::OffsetDateTime;
use std::io::Cursor;
use std::collections::HashMap;
use tokio::io::BufReader;
use s3s::{Body, dto::Owner};
use s3s::header::{X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_DELETE_MARKER, X_AMZ_METADATA_DIRECTIVE, X_AMZ_VERSION_ID,
X_AMZ_REQUEST_CHARGED, X_AMZ_RESTORE, X_AMZ_PART_NUMBER_MARKER, X_AMZ_MAX_PARTS,};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use crate::client::constants::{GET_OBJECT_ATTRIBUTES_MAX_PARTS, GET_OBJECT_ATTRIBUTES_TAGS, ISO8601_DATEFORMAT};
use crate::client::{
api_error_response::err_invalid_argument,
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
api_get_object_acl::AccessControlPolicy,
};
struct ObjectAttributesOptions {
max_parts: i64,
version_id: String,
part_number_marker: i64,
//server_side_encryption: encrypt::ServerSide,
}
struct ObjectAttributes {
version_id: String,
last_modified: OffsetDateTime,
object_attributes_response: ObjectAttributesResponse,
}
impl ObjectAttributes {
fn new() -> Self {
Self {
version_id: "".to_string(),
last_modified: OffsetDateTime::now_utc(),
object_attributes_response: ObjectAttributesResponse::new(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
pub struct Checksum {
checksum_crc32: String,
checksum_crc32c: String,
checksum_sha1: String,
checksum_sha256: String,
}
impl Checksum {
fn new() -> Self {
Self {
checksum_crc32: "".to_string(),
checksum_crc32c: "".to_string(),
checksum_sha1: "".to_string(),
checksum_sha256: "".to_string(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
struct ObjectParts {
parts_count: i64,
part_number_marker: i64,
next_part_number_marker: i64,
max_parts: i64,
is_truncated: bool,
parts: Vec<ObjectAttributePart>,
}
impl ObjectParts {
fn new() -> Self {
Self {
parts_count: 0,
part_number_marker: 0,
next_part_number_marker: 0,
max_parts: 0,
is_truncated: false,
parts: Vec::new(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
struct ObjectAttributesResponse {
etag: String,
storage_class: String,
object_size: i64,
checksum: Checksum,
object_parts: ObjectParts,
}
impl ObjectAttributesResponse {
fn new() -> Self {
Self {
etag: "".to_string(),
storage_class: "".to_string(),
object_size: 0,
checksum: Checksum::new(),
object_parts: ObjectParts::new(),
}
}
}
#[derive(Debug, Default, serde::Deserialize)]
struct ObjectAttributePart {
checksum_crc32: String,
checksum_crc32c: String,
checksum_sha1: String,
checksum_sha256: String,
part_number: i64,
size: i64,
}
impl ObjectAttributes {
pub async fn parse_response(&mut self, resp: &mut http::Response<Body>) -> Result<(), std::io::Error> {
let h = resp.headers();
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
self.last_modified = mod_time;
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let mut response = match serde_xml_rs::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
}
};
self.object_attributes_response = response;
Ok(())
}
}
impl TransitionClient {
pub async fn get_object_attributes(&self, bucket_name: &str, object_name: &str, opts: ObjectAttributesOptions) -> Result<ObjectAttributes, std::io::Error> {
let mut url_values = HashMap::new();
url_values.insert("attributes".to_string(), "".to_string());
if opts.version_id != "" {
url_values.insert("versionId".to_string(), opts.version_id);
}
let mut headers = HeaderMap::new();
headers.insert(X_AMZ_OBJECT_ATTRIBUTES, HeaderValue::from_str(GET_OBJECT_ATTRIBUTES_TAGS).unwrap());
if opts.part_number_marker > 0 {
headers.insert(X_AMZ_PART_NUMBER_MARKER, HeaderValue::from_str(&opts.part_number_marker.to_string()).unwrap());
}
if opts.max_parts > 0 {
headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&opts.max_parts.to_string()).unwrap());
} else {
headers.insert(X_AMZ_MAX_PARTS, HeaderValue::from_str(&GET_OBJECT_ATTRIBUTES_MAX_PARTS.to_string()).unwrap());
}
/*if opts.server_side_encryption.is_some() {
opts.server_side_encryption.Marshal(headers);
}*/
let mut resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: headers,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.await?;
let h = resp.headers();
let has_etag = h.get("ETag").unwrap().to_str().unwrap();
if !has_etag.is_empty() {
return Err(std::io::Error::other("get_object_attributes is not supported by the current endpoint version"));
}
if resp.status() != http::StatusCode::OK {
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let err_body = String::from_utf8(b).unwrap();
let mut er = match serde_xml_rs::from_str::<AccessControlPolicy>(&err_body) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
}
};
return Err(std::io::Error::other(er.access_control_list.permission));
}
let mut oa = ObjectAttributes::new();
oa.parse_response(&mut resp).await?;
Ok(oa)
}
}

View File

@@ -0,0 +1,141 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::HeaderMap;
use std::io::Cursor;
use tokio::io::BufReader;
#[cfg(not(windows))]
use std::os::unix::fs::PermissionsExt;
#[cfg(not(windows))]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(not(windows))]
use std::os::unix::fs::MetadataExt;
#[cfg(windows)]
use std::os::windows::fs::MetadataExt;
use crate::client::{
api_error_response::err_invalid_argument,
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
impl TransitionClient {
pub async fn fget_object(&self, bucket_name: &str, object_name: &str, file_path: &str, opts: GetObjectOptions) -> Result<(), std::io::Error> {
match std::fs::metadata(file_path) {
Ok(file_path_stat) => {
let ft = file_path_stat.file_type();
if ft.is_dir() {
return Err(std::io::Error::other(err_invalid_argument("filename is a directory.")));
}
},
Err(err) => {
return Err(std::io::Error::other(err));
}
}
let path = std::path::Path::new(file_path);
if let Some(parent) = path.parent() {
if let Some(object_dir) = parent.file_name() {
match std::fs::create_dir_all(object_dir) {
Ok(_) => {
let dir = std::path::Path::new(object_dir);
if let Ok(dir_stat) = dir.metadata() {
#[cfg(not(windows))]
dir_stat.permissions().set_mode(0o700);
}
}
Err(err) => {
return Err(std::io::Error::other(err));
}
}
}
}
let object_stat = match self.stat_object(bucket_name, object_name, &opts).await {
Ok(object_stat) => object_stat,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
let mut file_part_path = file_path.to_string();
file_part_path.push_str(""/*sum_sha256_hex(object_stat.etag.as_bytes())*/);
file_part_path.push_str(".part.rustfs");
#[cfg(not(windows))]
let file_part = match std::fs::OpenOptions::new().mode(0o600).open(file_part_path.clone()) {
Ok(file_part) => file_part,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
#[cfg(windows)]
let file_part = match std::fs::OpenOptions::new().open(file_part_path.clone()) {
Ok(file_part) => file_part,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
let mut close_and_remove = true;
/*defer(|| {
if close_and_remove {
_ = file_part.close();
let _ = std::fs::remove(file_part_path);
}
});*/
let st = match file_part.metadata() {
Ok(st) => st,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
let mut opts = opts;
#[cfg(windows)]
if st.file_size() > 0 {
opts.set_range(st.file_size() as i64, 0);
}
let object_reader = match self.get_object(bucket_name, object_name, &opts) {
Ok(object_reader) => object_reader,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
/*if let Err(err) = std::fs::copy(file_part, object_reader) {
return Err(std::io::Error::other(err));
}*/
close_and_remove = false;
/*if let Err(err) = file_part.close() {
return Err(std::io::Error::other(err));
}*/
if let Err(err) = std::fs::rename(file_part_path, file_path) {
return Err(std::io::Error::other(err));
}
Ok(())
}
}

View File

@@ -29,9 +29,9 @@ use crate::client::api_error_response::err_invalid_argument;
#[derive(Default)]
#[allow(dead_code)]
pub struct AdvancedGetOptions {
replication_deletemarker: bool,
is_replication_ready_for_deletemarker: bool,
replication_proxy_request: String,
pub replication_delete_marker: bool,
pub is_replication_ready_for_delete_marker: bool,
pub replication_proxy_request: String,
}
pub struct GetObjectOptions {

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -0,0 +1,163 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::HeaderMap;
use std::io::Cursor;
use tokio::io::BufReader;
use std::collections::HashMap;
use crate::client::{
api_error_response::{err_invalid_argument, http_resp_to_error_response}, api_get_object_acl::AccessControlList, api_get_options::GetObjectOptions, transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient}
};
const TIER_STANDARD: &str = "Standard";
const TIER_BULK: &str = "Bulk";
const TIER_EXPEDITED: &str = "Expedited";
#[derive(Debug, Default, serde::Serialize)]
struct GlacierJobParameters {
tier: String,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct Encryption {
encryption_type: String,
kms_context: String,
kms_key_id: String,
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct MetadataEntry {
name: String,
value: String,
}
#[derive(Debug, Default, serde::Serialize)]
struct S3 {
access_control_list: AccessControlList,
bucket_name: String,
prefix: String,
canned_acl: String,
encryption: Encryption,
storage_class: String,
//tagging: Tags,
user_metadata: MetadataEntry,
}
#[derive(Debug, Default, serde::Serialize)]
struct SelectParameters {
expression_type: String,
expression: String,
//input_serialization: SelectObjectInputSerialization,
//output_serialization: SelectObjectOutputSerialization,
}
#[derive(Debug, Default, serde::Serialize)]
struct OutputLocation(S3);
#[derive(Debug, Default, serde::Serialize)]
struct RestoreRequest {
restore_type: String,
tier: String,
days: i64,
glacier_job_parameters: GlacierJobParameters,
description: String,
select_parameters: SelectParameters,
output_location: OutputLocation,
}
impl RestoreRequest {
fn set_days(&mut self, v: i64) {
self.days = v;
}
fn set_glacier_job_parameters(&mut self, v: GlacierJobParameters) {
self.glacier_job_parameters = v;
}
fn set_type(&mut self, v: &str) {
self.restore_type = v.to_string();
}
fn set_tier(&mut self, v: &str) {
self.tier = v.to_string();
}
fn set_description(&mut self, v: &str) {
self.description = v.to_string();
}
fn set_select_parameters(&mut self, v: SelectParameters) {
self.select_parameters = v;
}
fn set_output_location(&mut self, v: OutputLocation) {
self.output_location = v;
}
}
impl TransitionClient {
pub async fn restore_object(&self, bucket_name: &str, object_name: &str, version_id: &str, restore_req: &RestoreRequest) -> Result<(), std::io::Error> {
let restore_request = match serde_xml_rs::to_string(restore_req) {
Ok(buf) => buf,
Err(e) => {
return Err(std::io::Error::other(e));
}
};
let restore_request_bytes = restore_request.as_bytes().to_vec();
let mut url_values = HashMap::new();
url_values.insert("restore".to_string(), "".to_string());
if version_id != "" {
url_values.insert("versionId".to_string(), version_id.to_string());
}
let restore_request_buffer = Bytes::from(restore_request_bytes.clone());
let resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: HeaderMap::new(),
content_sha256_hex: "".to_string(), //sum_sha256_hex(&restore_request_bytes),
content_md5_base64: "".to_string(), //sum_md5_base64(&restore_request_bytes),
content_body: ReaderImpl::Body(restore_request_buffer),
content_length: restore_request_bytes.len() as i64,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.await?;
let b = resp.body().bytes().expect("err").to_vec();
if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, "")));
}
Ok(())
}
}

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -0,0 +1,153 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use uuid::Uuid;
use std::{collections::HashMap, str::FromStr};
use tokio::io::BufReader;
use s3s::header::{X_AMZ_DELETE_MARKER, X_AMZ_VERSION_ID};
use crate::client::{
api_error_response::{err_invalid_argument, http_resp_to_error_response, ErrorResponse},
api_get_options::GetObjectOptions,
transition_api::{to_object_info, ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient},
};
impl TransitionClient {
pub async fn bucket_exists(&self, bucket_name: &str) -> Result<bool, std::io::Error> {
let resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: "".to_string(),
query_values: HashMap::new(),
custom_header: HeaderMap::new(),
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.await;
if let Ok(resp) = resp {
let b = resp.body().bytes().expect("err").to_vec();
let resperr = http_resp_to_error_response(resp, b, bucket_name, "");
/*if to_error_response(resperr).code == "NoSuchBucket" {
return Ok(false);
}
if resp.status_code() != http::StatusCode::OK {
return Ok(false);
}*/
}
Ok(true)
}
pub async fn stat_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result<ObjectInfo, std::io::Error> {
let mut headers = opts.header();
if opts.internal.replication_delete_marker {
headers.insert("X-Source-DeleteMarker", HeaderValue::from_str("true").unwrap());
}
if opts.internal.is_replication_ready_for_delete_marker {
headers.insert("X-Check-Replication-Ready", HeaderValue::from_str("true").unwrap());
}
let resp = self
.execute_method(
http::Method::HEAD,
&mut RequestMetadata {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: opts.to_query_values(),
custom_header: headers,
content_sha256_hex: EMPTY_STRING_SHA256_HASH.to_string(),
content_md5_base64: "".to_string(),
content_body: ReaderImpl::Body(Bytes::new()),
content_length: 0,
stream_sha256: false,
trailer: HeaderMap::new(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),
bucket_location: Default::default(),
expires: Default::default(),
},
)
.await;
match resp {
Ok(resp) => {
let h = resp.headers();
let delete_marker = if let Some(x_amz_delete_marker) = h.get(X_AMZ_DELETE_MARKER.as_str()) {
x_amz_delete_marker.to_str().unwrap() == "true"
} else { false };
let replication_ready = if let Some(x_amz_delete_marker) = h.get("X-Replication-Ready") {
x_amz_delete_marker.to_str().unwrap() == "true"
} else { false };
if resp.status() != http::StatusCode::OK && resp.status() != http::StatusCode::PARTIAL_CONTENT {
if resp.status() == http::StatusCode::METHOD_NOT_ALLOWED && opts.version_id != "" && delete_marker {
let err_resp = ErrorResponse {
status_code: resp.status(),
code: s3s::S3ErrorCode::MethodNotAllowed,
message: "the specified method is not allowed against this resource.".to_string(),
bucket_name: bucket_name.to_string(),
key: object_name.to_string(),
..Default::default()
};
return Ok(ObjectInfo {
version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) {
Ok(v) => v,
Err(e) => { return Err(std::io::Error::other(e)); }
},
is_delete_marker: delete_marker,
..Default::default()
});
//err_resp
}
return Ok(ObjectInfo {
version_id: match Uuid::from_str(h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap()) {
Ok(v) => v,
Err(e) => { return Err(std::io::Error::other(e)); }
},
is_delete_marker: delete_marker,
replication_ready: replication_ready,
..Default::default()
});
//http_resp_to_error_response(resp, bucket_name, object_name)
}
Ok(to_object_info(bucket_name, object_name, h).unwrap())
}
Err(err) => {
return Err(std::io::Error::other(err));
}
}
}
}

View File

@@ -23,6 +23,11 @@ pub mod api_put_object_common;
pub mod api_put_object_multipart;
pub mod api_put_object_streaming;
pub mod api_remove;
pub mod api_restore;
pub mod api_stat;
pub mod api_get_object_acl;
pub mod api_get_object_attributes;
pub mod api_get_object_file;
pub mod api_s3_datatypes;
pub mod bucket_cache;
pub mod constants;

View File

@@ -1,4 +1,3 @@
#![allow(unused_imports)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]

View File

@@ -1,4 +1,3 @@
#![allow(unused_variables, unused_mut, unused_must_use)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_variables, unused_mut, unused_must_use)]
use http::{HeaderMap, StatusCode};
//use iam::get_global_action_cred;
@@ -461,3 +461,182 @@ impl Operation for ClearTier {
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
/*pub struct PostRestoreObject {}
#[async_trait::async_trait]
impl Operation for PostRestoreObject {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let query = {
if let Some(query) = req.uri.query() {
let input: PostRestoreObject =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
input
} else {
PostRestoreObject::default()
}
};
let bucket = params.bucket;
if let Err(e) = un_escape_path(params.object) {
warn!("post restore object failed, e: {:?}", e);
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let get_object_info = store.get_object_info();
if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}
if req.content_length <= 0 {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}
let Some(opts) = post_restore_opts(req, bucket, object) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
let Some(obj_info) = getObjectInfo(ctx, bucket, object, opts) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}
let mut api_err;
let Some(rreq) = parsere_store_request(req.body(), req.content_length) else {
let api_err = errorCodes.ToAPIErr(ErrMalformedXML);
api_err.description = err.Error()
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
let mut status_code = http::StatusCode::OK;
let mut already_restored = false;
if Err(err) = rreq.validate(store) {
api_err = errorCodes.ToAPIErr(ErrMalformedXML)
api_err.description = err.Error()
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
} else {
if obj_info.restore_ongoing && rreq.Type != "SELECT" {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()), "post restore object failed"));
}
if !obj_info.restore_ongoing && !obj_info.restore_expires.unix_timestamp() == 0 {
status_code = http::StatusCode::Accepted;
already_restored = true;
}
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), rreq.days);
let mut metadata = clone_mss(obj_info.user_defined);
if rreq.type != "SELECT" {
obj_info.metadataOnly = true;
metadata[xhttp.AmzRestoreExpiryDays] = rreq.days;
metadata[xhttp.AmzRestoreRequestDate] = OffsetDateTime::now_utc().format(http::TimeFormat);
if already_restored {
metadata[xhttp.AmzRestore] = completedRestoreObj(restore_expiry).String()
} else {
metadata[xhttp.AmzRestore] = ongoingRestoreObj().String()
}
obj_info.user_defined = metadata;
if let Err(err) = store.copy_object(bucket, object, bucket, object, obj_info, ObjectOptions {
version_id: obj_info.version_id,
}, ObjectOptions {
version_id: obj_info.version_id,
m_time: obj_info.mod_time,
}) {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
if already_restored {
return Ok(());
}
}
let restore_object = must_get_uuid();
if rreq.output_location.s3.bucket_name != "" {
w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)}
}
w.WriteHeader(status_code)
send_event(EventArgs {
event_name: event::ObjectRestorePost,
bucket_name: bucket,
object: obj_info,
req_params: extract_req_params(r),
user_agent: req.user_agent(),
host: handlers::get_source_ip(r),
});
tokio::spawn(async move {
if !rreq.SelectParameters.IsEmpty() {
let actual_size = obj_info.get_actual_size();
if actual_size.is_err() {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
let object_rsc = s3select.NewObjectReadSeekCloser(
|offset int64| -> (io.ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return getTransitionedObjectReader(bucket, object, rs, r.Header,
obj_info, ObjectOptions {version_id: obj_info.version_id});
},
actual_size.unwrap(),
);
if err = rreq.SelectParameters.Open(objectRSC); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
let encoded_error_response = encodeResponse(APIErrorResponse {
code: serr.ErrorCode(),
message: serr.ErrorMessage(),
bucket_name: bucket,
key: object,
resource: r.URL.Path,
request_id: w.Header().Get(xhttp.AmzRequestID),
host_id: globalDeploymentID(),
});
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
} else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
return Ok(());
}
let nr = httptest.NewRecorder();
let rw = xhttp.NewResponseRecorder(nr);
rw.log_err_body = true;
rw.log_all_body = true;
rreq.select_parameters.evaluate(rw);
rreq.select_parameters.Close();
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}
let opts = ObjectOptions {
transition: TransitionOptions {
restore_request: rreq,
restore_expiry: restore_expiry,
},
version_id: objInfo.version_id,
}
if Err(err) = store.restore_transitioned_object(bucket, object, opts) {
format!(format!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string()));
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}
send_event(EventArgs {
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
});
});
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}*/

View File

@@ -1977,18 +1977,18 @@ impl S3 for FS {
} = req.input;
let mut lr_retention = false;
let rcfg = metadata_sys::get_object_lock_config(&bucket).await;
/*let rcfg = metadata_sys::get_object_lock_config(&bucket).await;
if let Ok(rcfg) = rcfg {
if let Some(rule) = rcfg.0.rule {
if let Some(retention) = rule.default_retention {
if let Some(mode) = retention.mode {
if mode == ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE) {
//if mode == ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE) {
lr_retention = true;
}
//}
}
}
}
}
}*/
//info!("lifecycle_configuration: {:?}", &lifecycle_configuration);