From 2c8b9a83231947df829c89b6fdfafdfd9e37f9fc Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 27 Mar 2025 23:18:09 +0800 Subject: [PATCH] webhook add auth_token --- config/obs.example.toml | 3 ++- packages/obs/src/config.rs | 3 ++- packages/obs/src/sink.rs | 26 ++++++++++++++++++-------- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/config/obs.example.toml b/config/obs.example.toml index 57f45444..4b107e15 100644 --- a/config/obs.example.toml +++ b/config/obs.example.toml @@ -18,7 +18,8 @@ batch_timeout_ms = 1000 # Default is 1000ms if not specified [sinks.webhook] enabled = false -url = "http://localhost:8080/webhook" +endpoint = "http://localhost:8080/webhook" +auth_token = "" batch_size = 100 # Default is 3 if not specified batch_timeout_ms = 1000 # Default is 100ms if not specified diff --git a/packages/obs/src/config.rs b/packages/obs/src/config.rs index 3acc2efd..88078fe4 100644 --- a/packages/obs/src/config.rs +++ b/packages/obs/src/config.rs @@ -51,7 +51,8 @@ pub struct KafkaSinkConfig { #[derive(Debug, Deserialize, Clone, Default)] pub struct WebhookSinkConfig { pub enabled: bool, - pub url: String, + pub endpoint: String, + pub auth_token: String, pub max_retries: Option, // Maximum number of retry times, default 3 pub retry_delay_ms: Option, // Retry the delay cardinality, default 100ms } diff --git a/packages/obs/src/sink.rs b/packages/obs/src/sink.rs index 8eb1830d..f362d815 100644 --- a/packages/obs/src/sink.rs +++ b/packages/obs/src/sink.rs @@ -179,7 +179,8 @@ impl Drop for KafkaSink { #[cfg(feature = "webhook")] /// Webhook Sink Implementation pub struct WebhookSink { - url: String, + endpoint: String, + auth_token: String, client: reqwest::Client, max_retries: usize, retry_delay_ms: u64, @@ -187,9 +188,10 @@ pub struct WebhookSink { #[cfg(feature = "webhook")] impl WebhookSink { - pub fn new(url: String, max_retries: usize, retry_delay_ms: u64) -> Self { + pub fn new(endpoint: String, auth_token: String, max_retries: usize, retry_delay_ms: u64) -> Self { WebhookSink { - url, + endpoint, + auth_token, client: reqwest::Client::builder() .timeout(std::time::Duration::from_secs(10)) .build() @@ -205,11 +207,18 @@ impl WebhookSink { impl Sink for WebhookSink { async fn write(&self, entry: &UnifiedLogEntry) { let mut retries = 0; - let url = self.url.clone(); + let url = self.endpoint.clone(); let entry_clone = entry.clone(); - + let auth_value = reqwest::header::HeaderValue::from_str(format!("Bearer {}", self.auth_token.clone()).as_str()).unwrap(); while retries < self.max_retries { - match self.client.post(&url).json(&entry_clone).send().await { + match self + .client + .post(&url) + .header(reqwest::header::AUTHORIZATION, auth_value.clone()) + .json(&entry_clone) + .send() + .await + { Ok(response) if response.status().is_success() => { return; } @@ -234,7 +243,7 @@ impl Drop for WebhookSink { fn drop(&mut self) { // Perform any necessary cleanup here // For example, you might want to log that the sink is being dropped - eprintln!("Dropping WebhookSink with URL: {}", self.url); + eprintln!("Dropping WebhookSink with URL: {}", self.endpoint); } } @@ -410,7 +419,8 @@ pub fn create_sinks(config: &AppConfig) -> Vec> { #[cfg(feature = "webhook")] if config.sinks.webhook.enabled { sinks.push(Arc::new(WebhookSink::new( - config.sinks.webhook.url.clone(), + config.sinks.webhook.endpoint.clone(), + config.sinks.webhook.auth_token.clone(), config.sinks.webhook.max_retries.unwrap_or(3), config.sinks.webhook.retry_delay_ms.unwrap_or(100), )));