mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
improve code
This commit is contained in:
@@ -103,8 +103,7 @@ impl MQTTTarget {
|
||||
let target_id = TargetID::new(id.clone(), ChannelTargetType::Mqtt.as_str().to_string());
|
||||
let queue_store = if !args.queue_dir.is_empty() {
|
||||
let base_path = PathBuf::from(&args.queue_dir);
|
||||
let unique_dir_name =
|
||||
format!("rustfs-{}-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.name, target_id.id).replace(":", "_");
|
||||
let unique_dir_name = format!("rustfs-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.id).replace(":", "_");
|
||||
// Ensure the directory name is valid for filesystem
|
||||
let specific_queue_path = base_path.join(unique_dir_name);
|
||||
debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target");
|
||||
|
||||
@@ -128,12 +128,8 @@ impl WebhookTarget {
|
||||
|
||||
// Build storage
|
||||
let queue_store = if !args.queue_dir.is_empty() {
|
||||
let queue_dir = PathBuf::from(&args.queue_dir).join(format!(
|
||||
"rustfs-{}-{}-{}",
|
||||
ChannelTargetType::Webhook.as_str(),
|
||||
target_id.name,
|
||||
target_id.id
|
||||
));
|
||||
let queue_dir =
|
||||
PathBuf::from(&args.queue_dir).join(format!("rustfs-{}-{}", ChannelTargetType::Webhook.as_str(), target_id.id));
|
||||
let store = crate::store::QueueStore::<Event>::new(queue_dir, args.queue_limit, STORE_EXTENSION);
|
||||
|
||||
if let Err(e) = store.open() {
|
||||
|
||||
@@ -90,6 +90,33 @@ impl KVS {
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
///Check if KVS is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
/// Returns a list of all keys for the current KVS.
|
||||
/// If the "comment" key does not exist, it will be added.
|
||||
pub fn keys(&self) -> Vec<String> {
|
||||
let mut found_comment = false;
|
||||
let mut keys: Vec<String> = self
|
||||
.0
|
||||
.iter()
|
||||
.map(|kv| {
|
||||
if kv.key == COMMENT_KEY {
|
||||
found_comment = true;
|
||||
}
|
||||
kv.key.clone()
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !found_comment {
|
||||
keys.push(COMMENT_KEY.to_owned());
|
||||
}
|
||||
|
||||
keys
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -112,8 +112,18 @@ impl Operation for ListNotificationTargets {
|
||||
// 3. Get the list of activity targets
|
||||
let active_targets = ns.get_active_targets().await;
|
||||
|
||||
let region = match req.region.clone() {
|
||||
Some(region) => region,
|
||||
None => return Err(s3_error!(InvalidRequest, "region not found")),
|
||||
};
|
||||
let mut data_target_arn_list = Vec::new();
|
||||
for target_id in active_targets.iter() {
|
||||
let target_arn = target_id.to_arn(®ion);
|
||||
data_target_arn_list.push(target_arn.to_string());
|
||||
}
|
||||
|
||||
// 4. Serialize and return the result
|
||||
let data = serde_json::to_vec(&active_targets)
|
||||
let data = serde_json::to_vec(&data_target_arn_list)
|
||||
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {}", e)))?;
|
||||
debug!("ListNotificationTargets call end, response data length: {}", data.len(),);
|
||||
let mut header = HeaderMap::new();
|
||||
|
||||
Reference in New Issue
Block a user