webhook add auth_token

This commit is contained in:
houseme
2025-03-27 23:18:09 +08:00
parent d12817a772
commit 2c8b9a8323
3 changed files with 22 additions and 10 deletions

View File

@@ -18,7 +18,8 @@ batch_timeout_ms = 1000 # Default is 1000ms if not specified
[sinks.webhook]
enabled = false
url = "http://localhost:8080/webhook"
endpoint = "http://localhost:8080/webhook"
auth_token = ""
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified

View File

@@ -51,7 +51,8 @@ pub struct KafkaSinkConfig {
#[derive(Debug, Deserialize, Clone, Default)]
pub struct WebhookSinkConfig {
pub enabled: bool,
pub url: String,
pub endpoint: String,
pub auth_token: String,
pub max_retries: Option<usize>, // Maximum number of retry times, default 3
pub retry_delay_ms: Option<u64>, // Retry the delay cardinality, default 100ms
}

View File

@@ -179,7 +179,8 @@ impl Drop for KafkaSink {
#[cfg(feature = "webhook")]
/// Webhook Sink Implementation
pub struct WebhookSink {
url: String,
endpoint: String,
auth_token: String,
client: reqwest::Client,
max_retries: usize,
retry_delay_ms: u64,
@@ -187,9 +188,10 @@ pub struct WebhookSink {
#[cfg(feature = "webhook")]
impl WebhookSink {
pub fn new(url: String, max_retries: usize, retry_delay_ms: u64) -> Self {
pub fn new(endpoint: String, auth_token: String, max_retries: usize, retry_delay_ms: u64) -> Self {
WebhookSink {
url,
endpoint,
auth_token,
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
@@ -205,11 +207,18 @@ impl WebhookSink {
impl Sink for WebhookSink {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut retries = 0;
let url = self.url.clone();
let url = self.endpoint.clone();
let entry_clone = entry.clone();
let auth_value = reqwest::header::HeaderValue::from_str(format!("Bearer {}", self.auth_token.clone()).as_str()).unwrap();
while retries < self.max_retries {
match self.client.post(&url).json(&entry_clone).send().await {
match self
.client
.post(&url)
.header(reqwest::header::AUTHORIZATION, auth_value.clone())
.json(&entry_clone)
.send()
.await
{
Ok(response) if response.status().is_success() => {
return;
}
@@ -234,7 +243,7 @@ impl Drop for WebhookSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to log that the sink is being dropped
eprintln!("Dropping WebhookSink with URL: {}", self.url);
eprintln!("Dropping WebhookSink with URL: {}", self.endpoint);
}
}
@@ -410,7 +419,8 @@ pub fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
#[cfg(feature = "webhook")]
if config.sinks.webhook.enabled {
sinks.push(Arc::new(WebhookSink::new(
config.sinks.webhook.url.clone(),
config.sinks.webhook.endpoint.clone(),
config.sinks.webhook.auth_token.clone(),
config.sinks.webhook.max_retries.unwrap_or(3),
config.sinks.webhook.retry_delay_ms.unwrap_or(100),
)));