diff --git a/crates/ecstore/src/client/transition_api.rs b/crates/ecstore/src/client/transition_api.rs index 0dcc7a3e..58806beb 100644 --- a/crates/ecstore/src/client/transition_api.rs +++ b/crates/ecstore/src/client/transition_api.rs @@ -317,7 +317,7 @@ impl TransitionClient { //} let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, self.random); - while let Some(v) = retry_timer.next().await { + while retry_timer.next().await.is_some() { let req = self.new_request(&method, metadata).await?; resp = self.doit(req).await?; diff --git a/crates/utils/src/net.rs b/crates/utils/src/net.rs index 1bc0adcc..e1e2955c 100644 --- a/crates/utils/src/net.rs +++ b/crates/utils/src/net.rs @@ -15,8 +15,6 @@ use bytes::Bytes; use futures::pin_mut; use futures::{Stream, StreamExt}; -use hyper::client::conn::http2::Builder; -use hyper_util::rt::TokioExecutor; use std::net::Ipv6Addr; use std::sync::LazyLock; use std::{ @@ -144,10 +142,6 @@ pub fn get_endpoint_url(endpoint: &str, secure: bool) -> Result Builder { - todo!(); -} - const ALLOWED_CUSTOM_QUERY_PREFIX: &str = "x-"; pub fn is_custom_query_value(qs_key: &str) -> bool { diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index d8d69e03..cd653582 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -20,7 +20,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::time::interval; +use tokio::time::{Interval, MissedTickBehavior, interval}; pub const MAX_RETRY: i64 = 10; pub const MAX_JITTER: f64 = 1.0; @@ -29,22 +29,28 @@ pub const NO_JITTER: f64 = 0.0; pub const DEFAULT_RETRY_UNIT: Duration = Duration::from_millis(200); pub const DEFAULT_RETRY_CAP: Duration = Duration::from_secs(1); +#[derive(Debug)] pub struct RetryTimer { base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64, + max_retry: i64, rem: i64, + timer: Option, } impl RetryTimer { pub fn new(max_retry: i64, base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64) -> Self { + //println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); Self { base_sleep, max_sleep, jitter, random, + max_retry, rem: max_retry, + timer: None, } } } @@ -53,26 +59,52 @@ impl Stream for RetryTimer { type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER); + if self.rem == 0 { + return Poll::Ready(None); + } - let attempt = MAX_RETRY - self.rem; + let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER); + let attempt = self.max_retry - self.rem; let mut sleep = self.base_sleep * (1 << attempt); if sleep > self.max_sleep { sleep = self.max_sleep; } if (jitter - NO_JITTER).abs() > 1e-9 { - sleep -= sleep * self.random as u32 * jitter as u32; + //println!("\njitter: {:?}", jitter); + //println!("sleep: {sleep:?}"); + //println!("0000: {:?}", self.random as f64 * jitter / 100_f64); + let sleep_ms = sleep.as_millis() as u64; + sleep = Duration::from_millis(sleep_ms - (sleep_ms as f64 * (self.random as f64 * jitter / 100_f64)) as u64); + } + //println!("sleep: {sleep:?}"); + + if self.timer.is_none() { + let mut timer = interval(sleep); + timer.set_missed_tick_behavior(MissedTickBehavior::Delay); + self.timer = Some(timer); + //println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); } - if self.rem == 0 { - return Poll::Ready(None); - } - - self.rem -= 1; - let mut t = interval(sleep); - match t.poll_tick(cx) { - Poll::Ready(_) => Poll::Ready(Some(())), - Poll::Pending => Poll::Pending, + let mut timer = self.timer.as_mut().unwrap(); + match Pin::new(&mut timer).poll_tick(cx) { + Poll::Ready(_) => { + //println!("ready"); + //println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); + self.rem -= 1; + if self.rem > 0 { + let mut new_timer = interval(sleep); + new_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); + new_timer.reset(); + self.timer = Some(new_timer); + //println!("time1: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); + } + Poll::Ready(Some(())) + } + Poll::Pending => { + //println!("pending"); + //println!("time2: {:?}", std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()); + Poll::Pending + } } } } @@ -136,3 +168,27 @@ pub fn is_request_error_retryable(_err: std::io::Error) -> bool { true*/ todo!(); } + +#[cfg(test)] +#[allow(unused_imports)] +mod tests { + use super::*; + use futures::{Future, StreamExt}; + use rand::Rng; + use std::time::UNIX_EPOCH; + + #[tokio::test] + async fn test_retry() { + let req_retry = 10; + let random = rand::rng().random_range(0..=100); + + let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, random); + println!("retry_timer: {retry_timer:?}"); + while retry_timer.next().await.is_some() { + println!( + "\ntime: {:?}", + std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() + ); + } + } +}