Fix/addtier (#454)

* fix retry

* fmt

* fix

* fix

* fix

---------

Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
This commit is contained in:
likewu
2025-08-25 10:24:48 +08:00
committed by GitHub
parent e23297f695
commit e00f5be746
3 changed files with 70 additions and 20 deletions

View File

@@ -317,7 +317,7 @@ impl TransitionClient {
//}
let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, self.random);
while let Some(v) = retry_timer.next().await {
while retry_timer.next().await.is_some() {
let req = self.new_request(&method, metadata).await?;
resp = self.doit(req).await?;

View File

@@ -15,8 +15,6 @@
use bytes::Bytes;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use hyper::client::conn::http2::Builder;
use hyper_util::rt::TokioExecutor;
use std::net::Ipv6Addr;
use std::sync::LazyLock;
use std::{
@@ -144,10 +142,6 @@ pub fn get_endpoint_url(endpoint: &str, secure: bool) -> Result<Url, std::io::Er
pub const DEFAULT_DIAL_TIMEOUT: i64 = 5;
pub fn new_remotetarget_http_transport(_insecure: bool) -> Builder<TokioExecutor> {
todo!();
}
const ALLOWED_CUSTOM_QUERY_PREFIX: &str = "x-";
pub fn is_custom_query_value(qs_key: &str) -> bool {

View File

@@ -20,7 +20,7 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::time::interval;
use tokio::time::{Interval, MissedTickBehavior, interval};
pub const MAX_RETRY: i64 = 10;
pub const MAX_JITTER: f64 = 1.0;
@@ -29,22 +29,28 @@ pub const NO_JITTER: f64 = 0.0;
pub const DEFAULT_RETRY_UNIT: Duration = Duration::from_millis(200);
pub const DEFAULT_RETRY_CAP: Duration = Duration::from_secs(1);
#[derive(Debug)]
pub struct RetryTimer {
base_sleep: Duration,
max_sleep: Duration,
jitter: f64,
random: u64,
max_retry: i64,
rem: i64,
timer: Option<Interval>,
}
impl RetryTimer {
pub fn new(max_retry: i64, base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64) -> Self {
//println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
Self {
base_sleep,
max_sleep,
jitter,
random,
max_retry,
rem: max_retry,
timer: None,
}
}
}
@@ -53,26 +59,52 @@ impl Stream for RetryTimer {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER);
if self.rem == 0 {
return Poll::Ready(None);
}
let attempt = MAX_RETRY - self.rem;
let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER);
let attempt = self.max_retry - self.rem;
let mut sleep = self.base_sleep * (1 << attempt);
if sleep > self.max_sleep {
sleep = self.max_sleep;
}
if (jitter - NO_JITTER).abs() > 1e-9 {
sleep -= sleep * self.random as u32 * jitter as u32;
//println!("\njitter: {:?}", jitter);
//println!("sleep: {sleep:?}");
//println!("0000: {:?}", self.random as f64 * jitter / 100_f64);
let sleep_ms = sleep.as_millis() as u64;
sleep = Duration::from_millis(sleep_ms - (sleep_ms as f64 * (self.random as f64 * jitter / 100_f64)) as u64);
}
//println!("sleep: {sleep:?}");
if self.timer.is_none() {
let mut timer = interval(sleep);
timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.timer = Some(timer);
//println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
}
if self.rem == 0 {
return Poll::Ready(None);
}
self.rem -= 1;
let mut t = interval(sleep);
match t.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Pending,
let mut timer = self.timer.as_mut().unwrap();
match Pin::new(&mut timer).poll_tick(cx) {
Poll::Ready(_) => {
//println!("ready");
//println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
self.rem -= 1;
if self.rem > 0 {
let mut new_timer = interval(sleep);
new_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
new_timer.reset();
self.timer = Some(new_timer);
//println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
}
Poll::Ready(Some(()))
}
Poll::Pending => {
//println!("pending");
//println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis());
Poll::Pending
}
}
}
}
@@ -136,3 +168,27 @@ pub fn is_request_error_retryable(_err: std::io::Error) -> bool {
true*/
todo!();
}
#[cfg(test)]
#[allow(unused_imports)]
mod tests {
use super::*;
use futures::{Future, StreamExt};
use rand::Rng;
use std::time::UNIX_EPOCH;
#[tokio::test]
async fn test_retry() {
let req_retry = 10;
let random = rand::rng().random_range(0..=100);
let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, random);
println!("retry_timer: {retry_timer:?}");
while retry_timer.next().await.is_some() {
println!(
"\ntime: {:?}",
std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
);
}
}
}