mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
test: cover replication resync file compatibility
This commit is contained in:
@@ -498,10 +498,12 @@ mod tests {
|
||||
fn test_credentials_expiration_serialize_as_rfc3339() {
|
||||
use time::OffsetDateTime;
|
||||
|
||||
let mut c = Credentials::default();
|
||||
c.access_key = "ak".to_string();
|
||||
c.secret_key = "sk12345678".to_string();
|
||||
c.expiration = Some(OffsetDateTime::now_utc());
|
||||
let c = Credentials {
|
||||
access_key: "ak".to_string(),
|
||||
secret_key: "sk12345678".to_string(),
|
||||
expiration: Some(OffsetDateTime::now_utc()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let json = serde_json::to_string(&c).expect("serialize");
|
||||
assert!(
|
||||
|
||||
@@ -24,6 +24,8 @@ use tracing::{debug, info, warn};
|
||||
|
||||
/// IAM config prefix under meta bucket (e.g. config/iam/).
|
||||
const IAM_CONFIG_PREFIX: &str = "config/iam";
|
||||
const REPLICATION_META_DIR: &str = ".replication";
|
||||
const RESYNC_META_FILE: &str = "resync.bin";
|
||||
|
||||
/// Migrates bucket metadata from legacy format to RustFS.
|
||||
/// Uses list_bucket (from disk volumes) to get bucket names, since list_objects_v2 on the legacy
|
||||
@@ -62,46 +64,62 @@ pub async fn try_migrate_bucket_metadata<S: StorageAPI>(store: Arc<S>) {
|
||||
|
||||
for bucket in buckets {
|
||||
let meta_path = format!("{BUCKET_META_PREFIX}{SLASH_SEPARATOR}{bucket}{SLASH_SEPARATOR}{BUCKET_METADATA_FILE}");
|
||||
if store
|
||||
.get_object_info(RUSTFS_META_BUCKET, &meta_path, &ObjectOptions::default())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
debug!("Bucket metadata already exists in RustFS, skip: {bucket}");
|
||||
continue;
|
||||
}
|
||||
let mut rd = match store
|
||||
.get_object_reader(MIGRATING_META_BUCKET, &meta_path, None, h.clone(), &opts)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
debug!("read migrating bucket metadata {bucket}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
migrate_one_if_missing(store.clone(), &opts, &h, &meta_path, &format!("bucket metadata: {bucket}")).await;
|
||||
|
||||
let data = match rd.read_all().await {
|
||||
Ok(d) if !d.is_empty() => d,
|
||||
Ok(_) => continue,
|
||||
Err(e) => {
|
||||
debug!("read migrating bucket metadata {bucket} body: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = store
|
||||
.put_object(RUSTFS_META_BUCKET, &meta_path, &mut PutObjReader::from_vec(data), &opts)
|
||||
.await
|
||||
{
|
||||
warn!("write bucket metadata {bucket}: {e}");
|
||||
} else {
|
||||
info!("Migrated bucket metadata: {bucket}");
|
||||
}
|
||||
let resync_path = format!(
|
||||
"{BUCKET_META_PREFIX}{SLASH_SEPARATOR}{bucket}{SLASH_SEPARATOR}{REPLICATION_META_DIR}{SLASH_SEPARATOR}{RESYNC_META_FILE}"
|
||||
);
|
||||
migrate_one_if_missing(store.clone(), &opts, &h, &resync_path, &format!("bucket replication resync: {bucket}")).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Migrates IAM config from legacy `.minio.sys/config/iam/` to `.rustfs.sys/config/iam/`.
|
||||
async fn migrate_one_if_missing<S: StorageAPI>(
|
||||
store: Arc<S>,
|
||||
opts: &ObjectOptions,
|
||||
headers: &HeaderMap,
|
||||
path: &str,
|
||||
label: &str,
|
||||
) {
|
||||
if store
|
||||
.get_object_info(RUSTFS_META_BUCKET, path, &ObjectOptions::default())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
debug!("{label} already exists in RustFS, skip");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut rd = match store
|
||||
.get_object_reader(MIGRATING_META_BUCKET, path, None, headers.clone(), opts)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
debug!("read migrating {label}: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let data = match rd.read_all().await {
|
||||
Ok(d) if !d.is_empty() => d,
|
||||
Ok(_) => return,
|
||||
Err(e) => {
|
||||
debug!("read migrating {label} body: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = store
|
||||
.put_object(RUSTFS_META_BUCKET, path, &mut PutObjReader::from_vec(data), opts)
|
||||
.await
|
||||
{
|
||||
warn!("write {label}: {e}");
|
||||
} else {
|
||||
info!("Migrated {label}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Migrates IAM config from legacy meta bucket `config/iam/` to RustFS meta bucket.
|
||||
/// Lists all objects under the IAM prefix in the source, copies each to the target if not present.
|
||||
/// Skips objects that already exist in RustFS (idempotent).
|
||||
/// If list_objects_v2 on the legacy bucket fails (e.g. format differs), migration is skipped.
|
||||
|
||||
@@ -20,8 +20,8 @@ use crate::bucket::replication::ResyncStatusType;
|
||||
use crate::bucket::replication::replicate_delete;
|
||||
use crate::bucket::replication::replicate_object;
|
||||
use crate::bucket::replication::replication_resyncer::{
|
||||
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, ReplicationConfig, ReplicationResyncer,
|
||||
get_heal_replicate_object_info,
|
||||
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, REPLICATION_DIR, RESYNC_FILE_NAME, ReplicationConfig,
|
||||
ReplicationResyncer, decode_resync_file, get_heal_replicate_object_info,
|
||||
};
|
||||
use crate::bucket::replication::replication_state::ReplicationStats;
|
||||
use crate::config::com::read_config;
|
||||
@@ -861,17 +861,8 @@ async fn load_bucket_resync_metadata<S: StorageAPI>(
|
||||
bucket: &str,
|
||||
obj_api: Arc<S>,
|
||||
) -> Result<BucketReplicationResyncStatus, EcstoreError> {
|
||||
use std::convert::TryInto;
|
||||
|
||||
let mut brs = BucketReplicationResyncStatus::new();
|
||||
|
||||
// Constants that would be defined elsewhere
|
||||
const REPLICATION_DIR: &str = "replication";
|
||||
const RESYNC_FILE_NAME: &str = "resync.bin";
|
||||
const RESYNC_META_FORMAT: u16 = 1;
|
||||
const RESYNC_META_VERSION: u16 = 1;
|
||||
const RESYNC_META_VERSION_V1: u16 = 1;
|
||||
|
||||
let resync_dir_path = format!("{BUCKET_META_PREFIX}/{bucket}/{REPLICATION_DIR}");
|
||||
let resync_file_path = format!("{resync_dir_path}/{RESYNC_FILE_NAME}");
|
||||
|
||||
@@ -886,27 +877,7 @@ async fn load_bucket_resync_metadata<S: StorageAPI>(
|
||||
return Ok(brs);
|
||||
}
|
||||
|
||||
if data.len() <= 4 {
|
||||
return Err(EcstoreError::CorruptedFormat);
|
||||
}
|
||||
|
||||
// Read resync meta header
|
||||
let format = u16::from_le_bytes(data[0..2].try_into().unwrap());
|
||||
if format != RESYNC_META_FORMAT {
|
||||
return Err(EcstoreError::CorruptedFormat);
|
||||
}
|
||||
|
||||
let version = u16::from_le_bytes(data[2..4].try_into().unwrap());
|
||||
if version != RESYNC_META_VERSION {
|
||||
return Err(EcstoreError::CorruptedFormat);
|
||||
}
|
||||
|
||||
// Parse data
|
||||
brs = BucketReplicationResyncStatus::unmarshal_msg(&data[4..])?;
|
||||
|
||||
if brs.version != RESYNC_META_VERSION_V1 {
|
||||
return Err(EcstoreError::CorruptedFormat);
|
||||
}
|
||||
brs = decode_resync_file(&data)?;
|
||||
|
||||
Ok(brs)
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::bucket::bucket_target_sys::{
|
||||
AdvancedPutOptions, BucketTargetSys, PutObjectOptions, PutObjectPartOptions, RemoveObjectOptions, TargetClient,
|
||||
};
|
||||
use crate::bucket::metadata_sys;
|
||||
use crate::bucket::msgp_decode::{read_msgp_ext8_time, skip_msgp_value, write_msgp_time};
|
||||
use crate::bucket::replication::ResyncStatusType;
|
||||
use crate::bucket::replication::{ObjectOpts, ReplicationConfigurationExt as _};
|
||||
use crate::bucket::tagging::decode_tags_to_map;
|
||||
@@ -69,6 +70,7 @@ use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Cursor, Read};
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
@@ -80,10 +82,10 @@ use tokio_util::io::ReaderStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const REPLICATION_DIR: &str = ".replication";
|
||||
const RESYNC_FILE_NAME: &str = "resync.bin";
|
||||
const RESYNC_META_FORMAT: u16 = 1;
|
||||
const RESYNC_META_VERSION: u16 = 1;
|
||||
pub(crate) const REPLICATION_DIR: &str = ".replication";
|
||||
pub(crate) const RESYNC_FILE_NAME: &str = "resync.bin";
|
||||
pub(crate) const RESYNC_META_FORMAT: u16 = 1;
|
||||
pub(crate) const RESYNC_META_VERSION: u16 = 1;
|
||||
const RESYNC_TIME_INTERVAL: TokioDuration = TokioDuration::from_secs(60);
|
||||
|
||||
static WARNED_MONITOR_UNINIT: std::sync::Once = std::sync::Once::new();
|
||||
@@ -139,14 +141,199 @@ impl BucketReplicationResyncStatus {
|
||||
}
|
||||
|
||||
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
|
||||
Ok(rmp_serde::to_vec(&self)?)
|
||||
let mut wr = Vec::new();
|
||||
rmp::encode::write_map_len(&mut wr, 4)?;
|
||||
rmp::encode::write_str(&mut wr, "v")?;
|
||||
rmp::encode::write_i32(&mut wr, i32::from(self.version))?;
|
||||
rmp::encode::write_str(&mut wr, "brs")?;
|
||||
rmp::encode::write_map_len(&mut wr, self.targets_map.len() as u32)?;
|
||||
for (arn, status) in &self.targets_map {
|
||||
rmp::encode::write_str(&mut wr, arn)?;
|
||||
status.marshal_wire_msg(&mut wr)?;
|
||||
}
|
||||
rmp::encode::write_str(&mut wr, "id")?;
|
||||
rmp::encode::write_i32(&mut wr, self.id)?;
|
||||
rmp::encode::write_str(&mut wr, "lu")?;
|
||||
write_msgp_time(&mut wr, self.last_update.unwrap_or(OffsetDateTime::UNIX_EPOCH))?;
|
||||
Ok(wr)
|
||||
}
|
||||
|
||||
pub fn unmarshal_msg(data: &[u8]) -> Result<Self> {
|
||||
let mut rd = Cursor::new(data);
|
||||
let mut out = Self::new();
|
||||
let mut fields = rmp::decode::read_map_len(&mut rd)?;
|
||||
|
||||
while fields > 0 {
|
||||
fields -= 1;
|
||||
let key = read_msgp_str(&mut rd)?;
|
||||
match key.as_str() {
|
||||
"v" => {
|
||||
let v: i32 = rmp::decode::read_int(&mut rd)?;
|
||||
out.version = u16::try_from(v).map_err(|_| Error::other("invalid resync version"))?;
|
||||
}
|
||||
"brs" => {
|
||||
let map_len = rmp::decode::read_map_len(&mut rd)?;
|
||||
let mut targets = HashMap::with_capacity(map_len as usize);
|
||||
for _ in 0..map_len {
|
||||
let arn = read_msgp_str(&mut rd)?;
|
||||
let status = TargetReplicationResyncStatus::unmarshal_wire_msg(&mut rd)?;
|
||||
targets.insert(arn, status);
|
||||
}
|
||||
out.targets_map = targets;
|
||||
}
|
||||
"id" => {
|
||||
out.id = rmp::decode::read_int::<i32, _>(&mut rd)?;
|
||||
}
|
||||
"lu" => {
|
||||
out.last_update = read_msgp_time_or_nil(&mut rd)?;
|
||||
}
|
||||
_ => skip_msgp_value(&mut rd)?,
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn unmarshal_legacy_msg(data: &[u8]) -> Result<Self> {
|
||||
Ok(rmp_serde::from_slice(data)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn encode_resync_file(status: &BucketReplicationResyncStatus) -> Result<Vec<u8>> {
|
||||
let payload = status.marshal_msg()?;
|
||||
let mut data = Vec::with_capacity(4 + payload.len());
|
||||
let mut major = [0u8; 2];
|
||||
byteorder::LittleEndian::write_u16(&mut major, RESYNC_META_FORMAT);
|
||||
data.extend_from_slice(&major);
|
||||
let mut minor = [0u8; 2];
|
||||
byteorder::LittleEndian::write_u16(&mut minor, RESYNC_META_VERSION);
|
||||
data.extend_from_slice(&minor);
|
||||
data.extend_from_slice(&payload);
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
pub(crate) fn decode_resync_file(data: &[u8]) -> Result<BucketReplicationResyncStatus> {
|
||||
if data.len() <= 4 {
|
||||
return Err(Error::CorruptedFormat);
|
||||
}
|
||||
|
||||
let mut major = [0u8; 2];
|
||||
major.copy_from_slice(&data[0..2]);
|
||||
if byteorder::LittleEndian::read_u16(&major) != RESYNC_META_FORMAT {
|
||||
return Err(Error::CorruptedFormat);
|
||||
}
|
||||
|
||||
let mut minor = [0u8; 2];
|
||||
minor.copy_from_slice(&data[2..4]);
|
||||
if byteorder::LittleEndian::read_u16(&minor) != RESYNC_META_VERSION {
|
||||
return Err(Error::CorruptedFormat);
|
||||
}
|
||||
|
||||
let status = match BucketReplicationResyncStatus::unmarshal_msg(&data[4..]) {
|
||||
Ok(v) => v,
|
||||
Err(_) => BucketReplicationResyncStatus::unmarshal_legacy_msg(&data[4..])?,
|
||||
};
|
||||
if status.version != RESYNC_META_VERSION {
|
||||
return Err(Error::CorruptedFormat);
|
||||
}
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
impl TargetReplicationResyncStatus {
|
||||
fn marshal_wire_msg(&self, wr: &mut Vec<u8>) -> Result<()> {
|
||||
rmp::encode::write_map_len(wr, 11)?;
|
||||
rmp::encode::write_str(wr, "st")?;
|
||||
write_msgp_time(wr, self.start_time.unwrap_or(OffsetDateTime::UNIX_EPOCH))?;
|
||||
rmp::encode::write_str(wr, "lst")?;
|
||||
write_msgp_time(wr, self.last_update.unwrap_or(OffsetDateTime::UNIX_EPOCH))?;
|
||||
rmp::encode::write_str(wr, "id")?;
|
||||
rmp::encode::write_str(wr, &self.resync_id)?;
|
||||
rmp::encode::write_str(wr, "rdt")?;
|
||||
write_msgp_time(wr, self.resync_before_date.unwrap_or(OffsetDateTime::UNIX_EPOCH))?;
|
||||
rmp::encode::write_str(wr, "rst")?;
|
||||
rmp::encode::write_i32(wr, resync_status_to_i32(self.resync_status))?;
|
||||
rmp::encode::write_str(wr, "fs")?;
|
||||
rmp::encode::write_i64(wr, self.failed_size)?;
|
||||
rmp::encode::write_str(wr, "frc")?;
|
||||
rmp::encode::write_i64(wr, self.failed_count)?;
|
||||
rmp::encode::write_str(wr, "rs")?;
|
||||
rmp::encode::write_i64(wr, self.replicated_size)?;
|
||||
rmp::encode::write_str(wr, "rrc")?;
|
||||
rmp::encode::write_i64(wr, self.replicated_count)?;
|
||||
rmp::encode::write_str(wr, "bkt")?;
|
||||
rmp::encode::write_str(wr, &self.bucket)?;
|
||||
rmp::encode::write_str(wr, "obj")?;
|
||||
rmp::encode::write_str(wr, &self.object)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unmarshal_wire_msg<R: Read>(rd: &mut R) -> Result<Self> {
|
||||
let mut out = Self::new();
|
||||
let mut fields = rmp::decode::read_map_len(rd)?;
|
||||
|
||||
while fields > 0 {
|
||||
fields -= 1;
|
||||
let key = read_msgp_str(rd)?;
|
||||
match key.as_str() {
|
||||
"st" => out.start_time = read_msgp_time_or_nil(rd)?,
|
||||
"lst" => out.last_update = read_msgp_time_or_nil(rd)?,
|
||||
"id" => out.resync_id = read_msgp_str(rd)?,
|
||||
"rdt" => out.resync_before_date = read_msgp_time_or_nil(rd)?,
|
||||
"rst" => {
|
||||
let v: i32 = rmp::decode::read_int(rd)?;
|
||||
out.resync_status = resync_status_from_i32(v)?;
|
||||
}
|
||||
"fs" => out.failed_size = rmp::decode::read_int(rd)?,
|
||||
"frc" => out.failed_count = rmp::decode::read_int(rd)?,
|
||||
"rs" => out.replicated_size = rmp::decode::read_int(rd)?,
|
||||
"rrc" => out.replicated_count = rmp::decode::read_int(rd)?,
|
||||
"bkt" => out.bucket = read_msgp_str(rd)?,
|
||||
"obj" => out.object = read_msgp_str(rd)?,
|
||||
_ => skip_msgp_value(rd)?,
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
fn read_msgp_str<R: Read>(rd: &mut R) -> Result<String> {
|
||||
let len = rmp::decode::read_str_len(rd)? as usize;
|
||||
let mut buf = vec![0u8; len];
|
||||
rd.read_exact(&mut buf)?;
|
||||
Ok(String::from_utf8(buf)?)
|
||||
}
|
||||
|
||||
fn read_msgp_time_or_nil<R: Read>(rd: &mut R) -> Result<Option<OffsetDateTime>> {
|
||||
let marker = rmp::decode::read_marker(rd).map_err(|e| Error::other(format!("{e:?}")))?;
|
||||
match marker {
|
||||
rmp::Marker::Null => Ok(None),
|
||||
rmp::Marker::Ext8 => Ok(Some(read_msgp_ext8_time(rd)?)),
|
||||
other => Err(Error::other(format!("expected time ext or nil, got marker: {other:?}"))),
|
||||
}
|
||||
}
|
||||
|
||||
fn resync_status_to_i32(status: ResyncStatusType) -> i32 {
|
||||
match status {
|
||||
ResyncStatusType::NoResync => 0,
|
||||
ResyncStatusType::ResyncPending => 1,
|
||||
ResyncStatusType::ResyncCanceled => 2,
|
||||
ResyncStatusType::ResyncStarted => 3,
|
||||
ResyncStatusType::ResyncCompleted => 4,
|
||||
ResyncStatusType::ResyncFailed => 5,
|
||||
}
|
||||
}
|
||||
|
||||
fn resync_status_from_i32(code: i32) -> Result<ResyncStatusType> {
|
||||
match code {
|
||||
0 => Ok(ResyncStatusType::NoResync),
|
||||
1 => Ok(ResyncStatusType::ResyncPending),
|
||||
2 => Ok(ResyncStatusType::ResyncCanceled),
|
||||
3 => Ok(ResyncStatusType::ResyncStarted),
|
||||
4 => Ok(ResyncStatusType::ResyncCompleted),
|
||||
5 => Ok(ResyncStatusType::ResyncFailed),
|
||||
_ => Err(Error::other(format!("invalid resync status code: {code}"))),
|
||||
}
|
||||
}
|
||||
|
||||
static RESYNC_WORKER_COUNT: usize = 10;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -690,19 +877,7 @@ pub async fn get_heal_replicate_object_info(oi: &ObjectInfo, rcfg: &ReplicationC
|
||||
}
|
||||
|
||||
async fn save_resync_status<S: StorageAPI>(bucket: &str, status: &BucketReplicationResyncStatus, api: Arc<S>) -> Result<()> {
|
||||
let buf = status.marshal_msg()?;
|
||||
|
||||
let mut data = Vec::new();
|
||||
|
||||
let mut major = [0u8; 2];
|
||||
byteorder::LittleEndian::write_u16(&mut major, RESYNC_META_FORMAT);
|
||||
data.extend_from_slice(&major);
|
||||
|
||||
let mut minor = [0u8; 2];
|
||||
byteorder::LittleEndian::write_u16(&mut minor, RESYNC_META_VERSION);
|
||||
data.extend_from_slice(&minor);
|
||||
|
||||
data.extend_from_slice(&buf);
|
||||
let data = encode_resync_file(status)?;
|
||||
|
||||
let config_file = path_join_buf(&[BUCKET_META_PREFIX, bucket, REPLICATION_DIR, RESYNC_FILE_NAME]);
|
||||
save_config(api, &config_file, data).await?;
|
||||
@@ -2695,7 +2870,7 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
|
||||
// If KMS key ID replication is enabled (as by default)
|
||||
// we include the object's KMS key ID. In any case, we
|
||||
// always set the SSE-KMS header. If no KMS key ID is
|
||||
// specified, MinIO is supposed to use whatever default
|
||||
// specified, the server uses the default applicable
|
||||
// config applies on the site or bucket.
|
||||
// TODO: Implement SSE-KMS support with key ID replication
|
||||
// let key_id = if kms::replicate_key_id() {
|
||||
@@ -2950,6 +3125,9 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::bucket::msgp_decode::write_msgp_time;
|
||||
use std::collections::HashMap;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[test]
|
||||
fn test_part_range_spec_from_actual_size() {
|
||||
@@ -2964,4 +3142,135 @@ mod tests {
|
||||
assert!(part_range_spec_from_actual_size(0, 0).is_err());
|
||||
assert!(part_range_spec_from_actual_size(0, -1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unmarshal_resync_payload() {
|
||||
let start = OffsetDateTime::from_unix_timestamp(1_700_000_000).expect("valid ts");
|
||||
let last = OffsetDateTime::from_unix_timestamp(1_700_000_123).expect("valid ts");
|
||||
let before = OffsetDateTime::from_unix_timestamp(1_699_000_000).expect("valid ts");
|
||||
let bucket_last = OffsetDateTime::from_unix_timestamp(1_700_111_111).expect("valid ts");
|
||||
|
||||
let mut payload = Vec::new();
|
||||
rmp::encode::write_map_len(&mut payload, 4).expect("write map");
|
||||
rmp::encode::write_str(&mut payload, "v").expect("write key");
|
||||
rmp::encode::write_i32(&mut payload, 1).expect("write version");
|
||||
rmp::encode::write_str(&mut payload, "brs").expect("write key");
|
||||
rmp::encode::write_map_len(&mut payload, 1).expect("write target map");
|
||||
rmp::encode::write_str(&mut payload, "arn:replication::1:dest").expect("write arn");
|
||||
rmp::encode::write_map_len(&mut payload, 11).expect("write target");
|
||||
rmp::encode::write_str(&mut payload, "st").expect("write key");
|
||||
write_msgp_time(&mut payload, start).expect("write time");
|
||||
rmp::encode::write_str(&mut payload, "lst").expect("write key");
|
||||
write_msgp_time(&mut payload, last).expect("write time");
|
||||
rmp::encode::write_str(&mut payload, "id").expect("write key");
|
||||
rmp::encode::write_str(&mut payload, "resync-1").expect("write id");
|
||||
rmp::encode::write_str(&mut payload, "rdt").expect("write key");
|
||||
write_msgp_time(&mut payload, before).expect("write time");
|
||||
rmp::encode::write_str(&mut payload, "rst").expect("write key");
|
||||
rmp::encode::write_i32(&mut payload, 3).expect("write status");
|
||||
rmp::encode::write_str(&mut payload, "fs").expect("write key");
|
||||
rmp::encode::write_i64(&mut payload, 11).expect("write fs");
|
||||
rmp::encode::write_str(&mut payload, "frc").expect("write key");
|
||||
rmp::encode::write_i64(&mut payload, 2).expect("write frc");
|
||||
rmp::encode::write_str(&mut payload, "rs").expect("write key");
|
||||
rmp::encode::write_i64(&mut payload, 101).expect("write rs");
|
||||
rmp::encode::write_str(&mut payload, "rrc").expect("write key");
|
||||
rmp::encode::write_i64(&mut payload, 9).expect("write rrc");
|
||||
rmp::encode::write_str(&mut payload, "bkt").expect("write key");
|
||||
rmp::encode::write_str(&mut payload, "bucket-a").expect("write bucket");
|
||||
rmp::encode::write_str(&mut payload, "obj").expect("write key");
|
||||
rmp::encode::write_str(&mut payload, "object-a").expect("write obj");
|
||||
rmp::encode::write_str(&mut payload, "id").expect("write key");
|
||||
rmp::encode::write_i32(&mut payload, 42).expect("write id");
|
||||
rmp::encode::write_str(&mut payload, "lu").expect("write key");
|
||||
write_msgp_time(&mut payload, bucket_last).expect("write lu");
|
||||
|
||||
let got = BucketReplicationResyncStatus::unmarshal_msg(&payload).expect("decode");
|
||||
assert_eq!(got.version, 1);
|
||||
assert_eq!(got.id, 42);
|
||||
assert_eq!(got.last_update, Some(bucket_last));
|
||||
let tgt = got.targets_map.get("arn:replication::1:dest").expect("target exists");
|
||||
assert_eq!(tgt.resync_id, "resync-1");
|
||||
assert_eq!(tgt.resync_status, ResyncStatusType::ResyncStarted);
|
||||
assert_eq!(tgt.bucket, "bucket-a");
|
||||
assert_eq!(tgt.object, "object-a");
|
||||
assert_eq!(tgt.start_time, Some(start));
|
||||
assert_eq!(tgt.last_update, Some(last));
|
||||
assert_eq!(tgt.resync_before_date, Some(before));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unmarshal_legacy_resync_payload() {
|
||||
let mut status = BucketReplicationResyncStatus::new();
|
||||
status.id = 7;
|
||||
status.version = 1;
|
||||
status.last_update = Some(OffsetDateTime::from_unix_timestamp(1_700_222_222).expect("valid ts"));
|
||||
status.targets_map = HashMap::from([(
|
||||
"legacy-arn".to_string(),
|
||||
TargetReplicationResyncStatus {
|
||||
resync_id: "legacy-1".to_string(),
|
||||
resync_status: ResyncStatusType::ResyncCompleted,
|
||||
..Default::default()
|
||||
},
|
||||
)]);
|
||||
|
||||
let old_payload = rmp_serde::to_vec(&status).expect("legacy encode");
|
||||
let got = BucketReplicationResyncStatus::unmarshal_legacy_msg(&old_payload).expect("legacy decode");
|
||||
assert_eq!(got.id, 7);
|
||||
assert_eq!(got.version, 1);
|
||||
assert_eq!(got.targets_map["legacy-arn"].resync_id, "legacy-1");
|
||||
assert_eq!(got.targets_map["legacy-arn"].resync_status, ResyncStatusType::ResyncCompleted);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resync_file_roundtrip_wire_format() {
|
||||
let mut status = BucketReplicationResyncStatus::new();
|
||||
status.id = 19;
|
||||
status.last_update = Some(OffsetDateTime::from_unix_timestamp(1_700_333_333).expect("valid ts"));
|
||||
status.targets_map = HashMap::from([(
|
||||
"arn:replication::1:dest".to_string(),
|
||||
TargetReplicationResyncStatus {
|
||||
resync_id: "wire-1".to_string(),
|
||||
resync_status: ResyncStatusType::ResyncStarted,
|
||||
replicated_count: 5,
|
||||
..Default::default()
|
||||
},
|
||||
)]);
|
||||
|
||||
let bytes = encode_resync_file(&status).expect("encode file");
|
||||
assert_eq!(&bytes[0..2], &RESYNC_META_FORMAT.to_le_bytes());
|
||||
assert_eq!(&bytes[2..4], &RESYNC_META_VERSION.to_le_bytes());
|
||||
|
||||
let got = decode_resync_file(&bytes).expect("decode file");
|
||||
assert_eq!(got.version, RESYNC_META_VERSION);
|
||||
assert_eq!(got.id, 19);
|
||||
assert_eq!(got.targets_map["arn:replication::1:dest"].resync_id, "wire-1");
|
||||
assert_eq!(got.targets_map["arn:replication::1:dest"].replicated_count, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_resync_file_decodes_legacy_payload() {
|
||||
let mut status = BucketReplicationResyncStatus::new();
|
||||
status.id = 7;
|
||||
status.version = RESYNC_META_VERSION;
|
||||
status.targets_map = HashMap::from([(
|
||||
"legacy-arn".to_string(),
|
||||
TargetReplicationResyncStatus {
|
||||
resync_id: "legacy-v1".to_string(),
|
||||
resync_status: ResyncStatusType::ResyncCompleted,
|
||||
..Default::default()
|
||||
},
|
||||
)]);
|
||||
|
||||
let legacy_payload = rmp_serde::to_vec(&status).expect("legacy encode");
|
||||
let mut file_bytes = Vec::new();
|
||||
file_bytes.extend_from_slice(&RESYNC_META_FORMAT.to_le_bytes());
|
||||
file_bytes.extend_from_slice(&RESYNC_META_VERSION.to_le_bytes());
|
||||
file_bytes.extend_from_slice(&legacy_payload);
|
||||
|
||||
let got = decode_resync_file(&file_bytes).expect("decode legacy");
|
||||
assert_eq!(got.id, 7);
|
||||
assert_eq!(got.targets_map["legacy-arn"].resync_id, "legacy-v1");
|
||||
assert_eq!(got.targets_map["legacy-arn"].resync_status, ResyncStatusType::ResyncCompleted);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user