mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-16 17:20:33 +00:00
* feat: improve code for notify * upgrade starshard version * upgrade version * Fix ETag format to comply with HTTP standards by wrapping with quotes (#592) * Initial plan * Fix ETag format to comply with HTTP standards by wrapping with quotes Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com> * bufigx --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com> Co-authored-by: overtrue <anzhengchao@gmail.com> * Improve lock (#596) * improve lock Signed-off-by: Mu junxiang <1948535941@qq.com> * feat(tests): add wait_for_object_absence helper and improve lifecycle test reliability Signed-off-by: Mu junxiang <1948535941@qq.com> * chore: remove dirty docs Signed-off-by: Mu junxiang <1948535941@qq.com> --------- Signed-off-by: Mu junxiang <1948535941@qq.com> * feat(append): implement object append operations with state tracking (#599) * feat(append): implement object append operations with state tracking Signed-off-by: junxiang Mu <1948535941@qq.com> * chore: rebase Signed-off-by: junxiang Mu <1948535941@qq.com> --------- Signed-off-by: junxiang Mu <1948535941@qq.com> * build(deps): upgrade s3s (#595) Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com> * fix: validate mqtt broker * improve code for `import` * fix * improve * remove logger from `rustfs-obs` crate * remove code for config Observability * fix * improve code * fix comment * up * up * upgrade version * fix * fmt * upgrade tokio version to 1.48.0 * upgrade `datafusion` and `reed-solomon-simd` version * fix * fmt * improve code for notify webhook example * improve code * fix * fix * fmt --------- Signed-off-by: Mu junxiang <1948535941@qq.com> Signed-off-by: junxiang Mu <1948535941@qq.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com> Co-authored-by: overtrue <anzhengchao@gmail.com> Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com> Co-authored-by: Nugine <nugine@foxmail.com> Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
223 lines
7.4 KiB
Rust
223 lines
7.4 KiB
Rust
// Copyright 2024 RustFS Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use axum::{
|
|
Router,
|
|
extract::Json,
|
|
extract::Query,
|
|
http::{HeaderMap, Response, StatusCode},
|
|
routing::{get, post},
|
|
};
|
|
use rustfs_utils::parse_and_resolve_address;
|
|
use serde::Deserialize;
|
|
use serde_json::Value;
|
|
use std::net::SocketAddr;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
use tokio::net::TcpListener;
|
|
|
|
#[derive(Deserialize)]
|
|
struct ResetParams {
|
|
reason: Option<String>,
|
|
}
|
|
|
|
// Define a global variable and count the number of data received
|
|
|
|
static WEBHOOK_COUNT: AtomicU64 = AtomicU64::new(0);
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
// Build an application
|
|
let app = Router::new()
|
|
.route("/webhook", post(receive_webhook))
|
|
.route("/webhook/reset/{reason}", get(reset_webhook_count_with_path))
|
|
.route("/webhook/reset", get(reset_webhook_count))
|
|
.route("/webhook", get(receive_webhook));
|
|
// Start the server
|
|
// let addr = "[0.0.0.0.0.0.0.0]:3020";
|
|
let server_addr = match parse_and_resolve_address(":3020") {
|
|
Ok(addr) => addr,
|
|
Err(e) => {
|
|
eprintln!("Failed to parse address: {e}");
|
|
return;
|
|
}
|
|
};
|
|
let listener = TcpListener::bind(server_addr).await.unwrap();
|
|
println!("Server running on {server_addr}");
|
|
|
|
// Self-checking after the service is started
|
|
tokio::spawn(async move {
|
|
// Give the server some time to start
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
|
|
match is_service_active(server_addr).await {
|
|
Ok(true) => println!("Service health check: Successful - Service is running normally"),
|
|
Ok(false) => eprintln!("Service Health Check: Failed - Service Not Responded"),
|
|
Err(e) => eprintln!("Service health check errors:{e}"),
|
|
}
|
|
});
|
|
|
|
// Create a shutdown signal processing
|
|
tokio::select! {
|
|
result = axum::serve(listener, app) => {
|
|
if let Err(e) = result {
|
|
eprintln!("Server error: {e}");
|
|
}
|
|
}
|
|
_ = tokio::signal::ctrl_c() => {
|
|
println!("Shutting down server...");
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Create a method to reset the value of WEBHOOK_COUNT
|
|
async fn reset_webhook_count_with_path(axum::extract::Path(reason): axum::extract::Path<String>) -> Response<String> {
|
|
// Output the value of the current counter
|
|
let current_count = WEBHOOK_COUNT.load(Ordering::SeqCst);
|
|
println!("Current webhook count: {current_count}");
|
|
|
|
println!("Reset webhook count, reason: {reason}");
|
|
// Reset the counter to 0
|
|
WEBHOOK_COUNT.store(0, Ordering::SeqCst);
|
|
println!("Webhook count has been reset to 0.");
|
|
|
|
Response::builder()
|
|
.header("Foo", "Bar")
|
|
.status(StatusCode::OK)
|
|
.body(format!(
|
|
"Webhook count reset successfully. Previous count: {current_count}. Reason: {reason}"
|
|
))
|
|
.unwrap()
|
|
}
|
|
|
|
/// Create a method to reset the value of WEBHOOK_COUNT
|
|
/// You can reset the counter by calling this method
|
|
async fn reset_webhook_count(Query(params): Query<ResetParams>, headers: HeaderMap) -> Response<String> {
|
|
// Output the value of the current counter
|
|
let current_count = WEBHOOK_COUNT.load(Ordering::SeqCst);
|
|
println!("Current webhook count: {current_count}");
|
|
|
|
let reason = params.reason.unwrap_or_else(|| "Reason not provided".to_string());
|
|
println!("Reset webhook count, reason: {reason}");
|
|
|
|
for header in headers {
|
|
let (key, value) = header;
|
|
println!("Header: {key:?}: {value:?}");
|
|
}
|
|
|
|
println!("Reset webhook count printed headers");
|
|
// Reset the counter to 0
|
|
WEBHOOK_COUNT.store(0, Ordering::SeqCst);
|
|
println!("Webhook count has been reset to 0.");
|
|
Response::builder()
|
|
.header("Foo", "Bar")
|
|
.status(StatusCode::OK)
|
|
.body(format!("Webhook count reset successfully current_count:{current_count}"))
|
|
.unwrap()
|
|
}
|
|
|
|
async fn is_service_active(addr: SocketAddr) -> Result<bool, String> {
|
|
let socket_addr = tokio::net::lookup_host(addr)
|
|
.await
|
|
.map_err(|e| format!("Unable to resolve host:{e}"))?
|
|
.next()
|
|
.ok_or_else(|| "Address not found".to_string())?;
|
|
|
|
println!("Checking service status:{socket_addr}");
|
|
|
|
match tokio::time::timeout(std::time::Duration::from_secs(5), tokio::net::TcpStream::connect(socket_addr)).await {
|
|
Ok(Ok(_)) => Ok(true),
|
|
Ok(Err(e)) => {
|
|
if e.kind() == std::io::ErrorKind::ConnectionRefused {
|
|
Ok(false)
|
|
} else {
|
|
Err(format!("Connection failed:{e}"))
|
|
}
|
|
}
|
|
Err(_) => Err("Connection timeout".to_string()),
|
|
}
|
|
}
|
|
|
|
async fn receive_webhook(Json(payload): Json<Value>) -> StatusCode {
|
|
let start = SystemTime::now();
|
|
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
|
|
|
// get the number of seconds since the unix era
|
|
let seconds = since_the_epoch.as_secs();
|
|
|
|
// Manually calculate year, month, day, hour, minute, and second
|
|
let (year, month, day, hour, minute, second) = convert_seconds_to_date(seconds);
|
|
|
|
// output result
|
|
println!("current time:{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02}");
|
|
println!(
|
|
"received a webhook request time:{} content:\n {}",
|
|
seconds,
|
|
serde_json::to_string_pretty(&payload).unwrap()
|
|
);
|
|
WEBHOOK_COUNT.fetch_add(1, Ordering::SeqCst);
|
|
println!("Total webhook requests received: {}", WEBHOOK_COUNT.load(Ordering::SeqCst));
|
|
StatusCode::OK
|
|
}
|
|
|
|
fn convert_seconds_to_date(seconds: u64) -> (u32, u32, u32, u32, u32, u32) {
|
|
// assume that the time zone is utc
|
|
let seconds_per_minute = 60;
|
|
let seconds_per_hour = 3600;
|
|
let seconds_per_day = 86400;
|
|
|
|
// Calculate the year, month, day, hour, minute, and second corresponding to the number of seconds
|
|
let mut total_seconds = seconds;
|
|
let mut year = 1970;
|
|
let mut month = 1;
|
|
let mut day = 1;
|
|
let mut hour = 0;
|
|
let mut minute = 0;
|
|
let mut second = 0;
|
|
|
|
// calculate year
|
|
while total_seconds >= 31536000 {
|
|
year += 1;
|
|
total_seconds -= 31536000; // simplified processing no leap year considered
|
|
}
|
|
|
|
// calculate month
|
|
let days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
|
|
for m in &days_in_month {
|
|
if total_seconds >= m * seconds_per_day {
|
|
month += 1;
|
|
total_seconds -= m * seconds_per_day;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// calculate the number of days
|
|
day += total_seconds / seconds_per_day;
|
|
total_seconds %= seconds_per_day;
|
|
|
|
// calculate hours
|
|
hour += total_seconds / seconds_per_hour;
|
|
total_seconds %= seconds_per_hour;
|
|
|
|
// calculate minutes
|
|
minute += total_seconds / seconds_per_minute;
|
|
total_seconds %= seconds_per_minute;
|
|
|
|
// calculate the number of seconds
|
|
second += total_seconds;
|
|
|
|
(year as u32, month as u32, day as u32, hour as u32, minute as u32, second as u32)
|
|
}
|