From db2986028fc573ae3add0a9a3381268dd7599267 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Wed, 4 Mar 2026 15:05:26 -0500 Subject: [PATCH] reverseproxy: Track dynamic upstreams, enable passive healthchecking (#7539) * reverseproxy: Track dynamic upstreams, enable passive healthchecking * Add tests for dynamic upstream tracking, admin endpoint, health checks --- modules/caddyhttp/reverseproxy/admin.go | 13 +- modules/caddyhttp/reverseproxy/admin_test.go | 275 ++++++++++++ .../reverseproxy/dynamic_upstreams_test.go | 345 ++++++++++++++++ modules/caddyhttp/reverseproxy/hosts.go | 61 +++ .../reverseproxy/passive_health_test.go | 391 ++++++++++++++++++ .../caddyhttp/reverseproxy/reverseproxy.go | 24 +- 6 files changed, 1096 insertions(+), 13 deletions(-) create mode 100644 modules/caddyhttp/reverseproxy/admin_test.go create mode 100644 modules/caddyhttp/reverseproxy/dynamic_upstreams_test.go create mode 100644 modules/caddyhttp/reverseproxy/passive_health_test.go diff --git a/modules/caddyhttp/reverseproxy/admin.go b/modules/caddyhttp/reverseproxy/admin.go index 18215f0ae..97dd2827d 100644 --- a/modules/caddyhttp/reverseproxy/admin.go +++ b/modules/caddyhttp/reverseproxy/admin.go @@ -75,7 +75,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er results := []upstreamStatus{} knownHosts := make(map[string]struct{}) - // Iterate over the upstream pool (needs to be fast) + // Iterate over the static upstream pool (needs to be fast) var rangeErr error hosts.Range(func(key, val any) bool { address, ok := key.(string) @@ -121,6 +121,17 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er return rangeErr } + // Also include dynamic upstreams + dynamicHostsMu.RLock() + for address, entry := range dynamicHosts { + results = append(results, upstreamStatus{ + Address: address, + NumRequests: entry.host.NumRequests(), + Fails: entry.host.Fails(), + }) + } + dynamicHostsMu.RUnlock() + err := enc.Encode(results) if err != nil { return caddy.APIError{ diff --git a/modules/caddyhttp/reverseproxy/admin_test.go b/modules/caddyhttp/reverseproxy/admin_test.go new file mode 100644 index 000000000..de9ac967c --- /dev/null +++ b/modules/caddyhttp/reverseproxy/admin_test.go @@ -0,0 +1,275 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reverseproxy + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +// adminHandlerFixture sets up the global host state for an admin endpoint test +// and returns a cleanup function that must be deferred by the caller. +// +// staticAddrs are inserted into the UsagePool (as a static upstream would be). +// dynamicAddrs are inserted into the dynamicHosts map (as a dynamic upstream would be). +func adminHandlerFixture(t *testing.T, staticAddrs, dynamicAddrs []string) func() { + t.Helper() + + for _, addr := range staticAddrs { + u := &Upstream{Dial: addr} + u.fillHost() + } + + dynamicHostsMu.Lock() + for _, addr := range dynamicAddrs { + dynamicHosts[addr] = dynamicHostEntry{host: new(Host), lastSeen: time.Now()} + } + dynamicHostsMu.Unlock() + + return func() { + // Remove static entries from the UsagePool. + for _, addr := range staticAddrs { + _, _ = hosts.Delete(addr) + } + // Remove dynamic entries. + dynamicHostsMu.Lock() + for _, addr := range dynamicAddrs { + delete(dynamicHosts, addr) + } + dynamicHostsMu.Unlock() + } +} + +// callAdminUpstreams fires a GET against handleUpstreams and returns the +// decoded response body. +func callAdminUpstreams(t *testing.T) []upstreamStatus { + t.Helper() + req := httptest.NewRequest(http.MethodGet, "/reverse_proxy/upstreams", nil) + w := httptest.NewRecorder() + + handler := adminUpstreams{} + if err := handler.handleUpstreams(w, req); err != nil { + t.Fatalf("handleUpstreams returned unexpected error: %v", err) + } + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + if ct := w.Header().Get("Content-Type"); ct != "application/json" { + t.Fatalf("expected Content-Type application/json, got %q", ct) + } + + var results []upstreamStatus + if err := json.NewDecoder(w.Body).Decode(&results); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + return results +} + +// resultsByAddress indexes a slice of upstreamStatus by address for easier +// lookup in assertions. +func resultsByAddress(statuses []upstreamStatus) map[string]upstreamStatus { + m := make(map[string]upstreamStatus, len(statuses)) + for _, s := range statuses { + m[s.Address] = s + } + return m +} + +// TestAdminUpstreamsMethodNotAllowed verifies that non-GET methods are rejected. +func TestAdminUpstreamsMethodNotAllowed(t *testing.T) { + for _, method := range []string{http.MethodPost, http.MethodPut, http.MethodDelete} { + req := httptest.NewRequest(method, "/reverse_proxy/upstreams", nil) + w := httptest.NewRecorder() + err := (adminUpstreams{}).handleUpstreams(w, req) + if err == nil { + t.Errorf("method %s: expected an error, got nil", method) + continue + } + apiErr, ok := err.(interface{ HTTPStatus() int }) + if !ok { + // caddy.APIError stores the code in HTTPStatus field, access via the + // exported interface it satisfies indirectly; just check non-nil. + continue + } + if code := apiErr.HTTPStatus(); code != http.StatusMethodNotAllowed { + t.Errorf("method %s: expected 405, got %d", method, code) + } + } +} + +// TestAdminUpstreamsEmpty verifies that an empty response is valid JSON when +// no upstreams are registered. +func TestAdminUpstreamsEmpty(t *testing.T) { + resetDynamicHosts() + + results := callAdminUpstreams(t) + if results == nil { + t.Error("expected non-nil (empty) slice, got nil") + } + if len(results) != 0 { + t.Errorf("expected 0 results with empty pools, got %d", len(results)) + } +} + +// TestAdminUpstreamsStaticOnly verifies that static upstreams (from the +// UsagePool) appear in the response with correct addresses. +func TestAdminUpstreamsStaticOnly(t *testing.T) { + resetDynamicHosts() + cleanup := adminHandlerFixture(t, + []string{"10.0.0.1:80", "10.0.0.2:80"}, + nil, + ) + defer cleanup() + + results := callAdminUpstreams(t) + byAddr := resultsByAddress(results) + + for _, addr := range []string{"10.0.0.1:80", "10.0.0.2:80"} { + if _, ok := byAddr[addr]; !ok { + t.Errorf("expected static upstream %q in response", addr) + } + } + if len(results) != 2 { + t.Errorf("expected exactly 2 results, got %d", len(results)) + } +} + +// TestAdminUpstreamsDynamicOnly verifies that dynamic upstreams (from +// dynamicHosts) appear in the response with correct addresses. +func TestAdminUpstreamsDynamicOnly(t *testing.T) { + resetDynamicHosts() + cleanup := adminHandlerFixture(t, + nil, + []string{"10.0.1.1:80", "10.0.1.2:80"}, + ) + defer cleanup() + + results := callAdminUpstreams(t) + byAddr := resultsByAddress(results) + + for _, addr := range []string{"10.0.1.1:80", "10.0.1.2:80"} { + if _, ok := byAddr[addr]; !ok { + t.Errorf("expected dynamic upstream %q in response", addr) + } + } + if len(results) != 2 { + t.Errorf("expected exactly 2 results, got %d", len(results)) + } +} + +// TestAdminUpstreamsBothPools verifies that static and dynamic upstreams are +// both present in the same response and that there is no overlap or omission. +func TestAdminUpstreamsBothPools(t *testing.T) { + resetDynamicHosts() + cleanup := adminHandlerFixture(t, + []string{"10.0.2.1:80"}, + []string{"10.0.2.2:80"}, + ) + defer cleanup() + + results := callAdminUpstreams(t) + if len(results) != 2 { + t.Fatalf("expected 2 results (1 static + 1 dynamic), got %d", len(results)) + } + + byAddr := resultsByAddress(results) + if _, ok := byAddr["10.0.2.1:80"]; !ok { + t.Error("static upstream missing from response") + } + if _, ok := byAddr["10.0.2.2:80"]; !ok { + t.Error("dynamic upstream missing from response") + } +} + +// TestAdminUpstreamsNoOverlapBetweenPools verifies that an address registered +// only as a static upstream does not also appear as a dynamic entry, and +// vice-versa. +func TestAdminUpstreamsNoOverlapBetweenPools(t *testing.T) { + resetDynamicHosts() + cleanup := adminHandlerFixture(t, + []string{"10.0.3.1:80"}, + []string{"10.0.3.2:80"}, + ) + defer cleanup() + + results := callAdminUpstreams(t) + seen := make(map[string]int) + for _, r := range results { + seen[r.Address]++ + } + for addr, count := range seen { + if count > 1 { + t.Errorf("address %q appeared %d times; expected exactly once", addr, count) + } + } +} + +// TestAdminUpstreamsReportsFailCounts verifies that fail counts accumulated on +// a dynamic upstream's Host are reflected in the response. +func TestAdminUpstreamsReportsFailCounts(t *testing.T) { + resetDynamicHosts() + + const addr = "10.0.4.1:80" + h := new(Host) + _ = h.countFail(3) + + dynamicHostsMu.Lock() + dynamicHosts[addr] = dynamicHostEntry{host: h, lastSeen: time.Now()} + dynamicHostsMu.Unlock() + defer func() { + dynamicHostsMu.Lock() + delete(dynamicHosts, addr) + dynamicHostsMu.Unlock() + }() + + results := callAdminUpstreams(t) + byAddr := resultsByAddress(results) + + status, ok := byAddr[addr] + if !ok { + t.Fatalf("expected %q in response", addr) + } + if status.Fails != 3 { + t.Errorf("expected Fails=3, got %d", status.Fails) + } +} + +// TestAdminUpstreamsReportsNumRequests verifies that the active request count +// for a static upstream is reflected in the response. +func TestAdminUpstreamsReportsNumRequests(t *testing.T) { + resetDynamicHosts() + + const addr = "10.0.4.2:80" + u := &Upstream{Dial: addr} + u.fillHost() + defer func() { _, _ = hosts.Delete(addr) }() + + _ = u.Host.countRequest(2) + defer func() { _ = u.Host.countRequest(-2) }() + + results := callAdminUpstreams(t) + byAddr := resultsByAddress(results) + + status, ok := byAddr[addr] + if !ok { + t.Fatalf("expected %q in response", addr) + } + if status.NumRequests != 2 { + t.Errorf("expected NumRequests=2, got %d", status.NumRequests) + } +} diff --git a/modules/caddyhttp/reverseproxy/dynamic_upstreams_test.go b/modules/caddyhttp/reverseproxy/dynamic_upstreams_test.go new file mode 100644 index 000000000..577eccdb6 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/dynamic_upstreams_test.go @@ -0,0 +1,345 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reverseproxy + +import ( + "sync" + "testing" + "time" + + "github.com/caddyserver/caddy/v2" +) + +// resetDynamicHosts clears global dynamic host state between tests. +func resetDynamicHosts() { + dynamicHostsMu.Lock() + dynamicHosts = make(map[string]dynamicHostEntry) + dynamicHostsMu.Unlock() + // Reset the Once so cleanup goroutine tests can re-trigger if needed. + dynamicHostsCleanerOnce = sync.Once{} +} + +// TestFillDynamicHostCreatesEntry verifies that calling fillDynamicHost on a +// new address inserts an entry into dynamicHosts and assigns a non-nil Host. +func TestFillDynamicHostCreatesEntry(t *testing.T) { + resetDynamicHosts() + + u := &Upstream{Dial: "192.0.2.1:80"} + u.fillDynamicHost() + + if u.Host == nil { + t.Fatal("expected Host to be set after fillDynamicHost") + } + + dynamicHostsMu.RLock() + entry, ok := dynamicHosts["192.0.2.1:80"] + dynamicHostsMu.RUnlock() + + if !ok { + t.Fatal("expected entry in dynamicHosts map") + } + if entry.host != u.Host { + t.Error("dynamicHosts entry host should be the same pointer assigned to Upstream.Host") + } + if entry.lastSeen.IsZero() { + t.Error("expected lastSeen to be set") + } +} + +// TestFillDynamicHostReusesSameHost verifies that two calls for the same address +// return the exact same *Host pointer so that state (e.g. fail counts) is shared. +func TestFillDynamicHostReusesSameHost(t *testing.T) { + resetDynamicHosts() + + u1 := &Upstream{Dial: "192.0.2.2:80"} + u1.fillDynamicHost() + + u2 := &Upstream{Dial: "192.0.2.2:80"} + u2.fillDynamicHost() + + if u1.Host != u2.Host { + t.Error("expected both upstreams to share the same *Host pointer") + } +} + +// TestFillDynamicHostUpdatesLastSeen verifies that a second call for the same +// address advances the lastSeen timestamp. +func TestFillDynamicHostUpdatesLastSeen(t *testing.T) { + resetDynamicHosts() + + u := &Upstream{Dial: "192.0.2.3:80"} + u.fillDynamicHost() + + dynamicHostsMu.RLock() + first := dynamicHosts["192.0.2.3:80"].lastSeen + dynamicHostsMu.RUnlock() + + // Ensure measurable time passes. + time.Sleep(2 * time.Millisecond) + + u2 := &Upstream{Dial: "192.0.2.3:80"} + u2.fillDynamicHost() + + dynamicHostsMu.RLock() + second := dynamicHosts["192.0.2.3:80"].lastSeen + dynamicHostsMu.RUnlock() + + if !second.After(first) { + t.Error("expected lastSeen to be updated on second fillDynamicHost call") + } +} + +// TestFillDynamicHostIndependentAddresses verifies that different addresses get +// independent Host entries. +func TestFillDynamicHostIndependentAddresses(t *testing.T) { + resetDynamicHosts() + + u1 := &Upstream{Dial: "192.0.2.4:80"} + u1.fillDynamicHost() + + u2 := &Upstream{Dial: "192.0.2.5:80"} + u2.fillDynamicHost() + + if u1.Host == u2.Host { + t.Error("different addresses should have different *Host entries") + } +} + +// TestFillDynamicHostPreservesFailCount verifies that fail counts on a dynamic +// host survive across multiple fillDynamicHost calls (simulating sequential +// requests), which is the core behaviour fixed by this change. +func TestFillDynamicHostPreservesFailCount(t *testing.T) { + resetDynamicHosts() + + // First "request": provision and record a failure. + u1 := &Upstream{Dial: "192.0.2.6:80"} + u1.fillDynamicHost() + _ = u1.Host.countFail(1) + + if u1.Host.Fails() != 1 { + t.Fatalf("expected 1 fail, got %d", u1.Host.Fails()) + } + + // Second "request": provision the same address again (new *Upstream, same address). + u2 := &Upstream{Dial: "192.0.2.6:80"} + u2.fillDynamicHost() + + if u2.Host.Fails() != 1 { + t.Errorf("expected fail count to persist across fillDynamicHost calls, got %d", u2.Host.Fails()) + } +} + +// TestProvisionUpstreamDynamic verifies that provisionUpstream with dynamic=true +// uses fillDynamicHost (not the UsagePool) and sets healthCheckPolicy / +// MaxRequests correctly from handler config. +func TestProvisionUpstreamDynamic(t *testing.T) { + resetDynamicHosts() + + passive := &PassiveHealthChecks{ + FailDuration: caddy.Duration(10 * time.Second), + MaxFails: 3, + UnhealthyRequestCount: 5, + } + h := Handler{ + HealthChecks: &HealthChecks{ + Passive: passive, + }, + } + + u := &Upstream{Dial: "192.0.2.7:80"} + h.provisionUpstream(u, true) + + if u.Host == nil { + t.Fatal("Host should be set after provisionUpstream") + } + if u.healthCheckPolicy != passive { + t.Error("healthCheckPolicy should point to the handler's PassiveHealthChecks") + } + if u.MaxRequests != 5 { + t.Errorf("expected MaxRequests=5 from UnhealthyRequestCount, got %d", u.MaxRequests) + } + + // Must be in dynamicHosts, not in the static UsagePool. + dynamicHostsMu.RLock() + _, inDynamic := dynamicHosts["192.0.2.7:80"] + dynamicHostsMu.RUnlock() + if !inDynamic { + t.Error("dynamic upstream should be stored in dynamicHosts") + } + _, inPool := hosts.References("192.0.2.7:80") + if inPool { + t.Error("dynamic upstream should NOT be stored in the static UsagePool") + } +} + +// TestProvisionUpstreamStatic verifies that provisionUpstream with dynamic=false +// uses the UsagePool and does NOT insert into dynamicHosts. +func TestProvisionUpstreamStatic(t *testing.T) { + resetDynamicHosts() + + h := Handler{} + + u := &Upstream{Dial: "192.0.2.8:80"} + h.provisionUpstream(u, false) + + if u.Host == nil { + t.Fatal("Host should be set after provisionUpstream") + } + + refs, inPool := hosts.References("192.0.2.8:80") + if !inPool { + t.Error("static upstream should be in the UsagePool") + } + if refs != 1 { + t.Errorf("expected ref count 1, got %d", refs) + } + + dynamicHostsMu.RLock() + _, inDynamic := dynamicHosts["192.0.2.8:80"] + dynamicHostsMu.RUnlock() + if inDynamic { + t.Error("static upstream should NOT be in dynamicHosts") + } + + // Clean up the pool entry we just added. + _, _ = hosts.Delete("192.0.2.8:80") +} + +// TestDynamicHostHealthyConsultsFails verifies the end-to-end passive health +// check path: after enough failures are recorded against a dynamic upstream's +// shared *Host, Healthy() returns false for a newly provisioned *Upstream with +// the same address. +func TestDynamicHostHealthyConsultsFails(t *testing.T) { + resetDynamicHosts() + + passive := &PassiveHealthChecks{ + FailDuration: caddy.Duration(time.Minute), + MaxFails: 2, + } + h := Handler{ + HealthChecks: &HealthChecks{Passive: passive}, + } + + // First request: provision and record two failures. + u1 := &Upstream{Dial: "192.0.2.9:80"} + h.provisionUpstream(u1, true) + + _ = u1.Host.countFail(1) + _ = u1.Host.countFail(1) + + // Second request: fresh *Upstream, same address. + u2 := &Upstream{Dial: "192.0.2.9:80"} + h.provisionUpstream(u2, true) + + if u2.Healthy() { + t.Error("upstream should be unhealthy after MaxFails failures have been recorded against its shared Host") + } +} + +// TestDynamicHostCleanupEvictsStaleEntries verifies that the cleanup sweep +// removes entries whose lastSeen is older than dynamicHostIdleExpiry. +func TestDynamicHostCleanupEvictsStaleEntries(t *testing.T) { + resetDynamicHosts() + + const addr = "192.0.2.10:80" + + // Insert an entry directly with a lastSeen far in the past. + dynamicHostsMu.Lock() + dynamicHosts[addr] = dynamicHostEntry{ + host: new(Host), + lastSeen: time.Now().Add(-2 * dynamicHostIdleExpiry), + } + dynamicHostsMu.Unlock() + + // Run the cleanup logic inline (same logic as the goroutine). + dynamicHostsMu.Lock() + for a, entry := range dynamicHosts { + if time.Since(entry.lastSeen) > dynamicHostIdleExpiry { + delete(dynamicHosts, a) + } + } + dynamicHostsMu.Unlock() + + dynamicHostsMu.RLock() + _, stillPresent := dynamicHosts[addr] + dynamicHostsMu.RUnlock() + + if stillPresent { + t.Error("stale dynamic host entry should have been evicted by cleanup sweep") + } +} + +// TestDynamicHostCleanupRetainsFreshEntries verifies that the cleanup sweep +// keeps entries whose lastSeen is within dynamicHostIdleExpiry. +func TestDynamicHostCleanupRetainsFreshEntries(t *testing.T) { + resetDynamicHosts() + + const addr = "192.0.2.11:80" + + dynamicHostsMu.Lock() + dynamicHosts[addr] = dynamicHostEntry{ + host: new(Host), + lastSeen: time.Now(), + } + dynamicHostsMu.Unlock() + + // Run the cleanup logic inline. + dynamicHostsMu.Lock() + for a, entry := range dynamicHosts { + if time.Since(entry.lastSeen) > dynamicHostIdleExpiry { + delete(dynamicHosts, a) + } + } + dynamicHostsMu.Unlock() + + dynamicHostsMu.RLock() + _, stillPresent := dynamicHosts[addr] + dynamicHostsMu.RUnlock() + + if !stillPresent { + t.Error("fresh dynamic host entry should be retained by cleanup sweep") + } +} + +// TestDynamicHostConcurrentFillHost verifies that concurrent calls to +// fillDynamicHost for the same address all get the same *Host pointer and +// don't race (run with -race). +func TestDynamicHostConcurrentFillHost(t *testing.T) { + resetDynamicHosts() + + const addr = "192.0.2.12:80" + const goroutines = 50 + + var wg sync.WaitGroup + hosts := make([]*Host, goroutines) + + for i := range goroutines { + wg.Add(1) + go func(idx int) { + defer wg.Done() + u := &Upstream{Dial: addr} + u.fillDynamicHost() + hosts[idx] = u.Host + }(i) + } + wg.Wait() + + first := hosts[0] + for i, h := range hosts { + if h != first { + t.Errorf("goroutine %d got a different *Host pointer; expected all to share the same entry", i) + } + } +} diff --git a/modules/caddyhttp/reverseproxy/hosts.go b/modules/caddyhttp/reverseproxy/hosts.go index fea85946d..8139a7b50 100644 --- a/modules/caddyhttp/reverseproxy/hosts.go +++ b/modules/caddyhttp/reverseproxy/hosts.go @@ -19,7 +19,9 @@ import ( "fmt" "net/netip" "strconv" + "sync" "sync/atomic" + "time" "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/modules/caddyhttp" @@ -132,6 +134,43 @@ func (u *Upstream) fillHost() { u.Host = host } +// fillDynamicHost is like fillHost, but stores the host in the separate +// dynamicHosts map rather than the reference-counted UsagePool. Dynamic +// hosts are not reference-counted; instead, they are retained as long as +// they are actively seen and are evicted by a background cleanup goroutine +// after dynamicHostIdleExpiry of inactivity. This preserves health state +// (e.g. passive fail counts) across sequential requests. +func (u *Upstream) fillDynamicHost() { + dynamicHostsMu.Lock() + entry, ok := dynamicHosts[u.String()] + if ok { + entry.lastSeen = time.Now() + dynamicHosts[u.String()] = entry + u.Host = entry.host + } else { + h := new(Host) + dynamicHosts[u.String()] = dynamicHostEntry{host: h, lastSeen: time.Now()} + u.Host = h + } + dynamicHostsMu.Unlock() + + // ensure the cleanup goroutine is running + dynamicHostsCleanerOnce.Do(func() { + go func() { + for { + time.Sleep(dynamicHostCleanupInterval) + dynamicHostsMu.Lock() + for addr, entry := range dynamicHosts { + if time.Since(entry.lastSeen) > dynamicHostIdleExpiry { + delete(dynamicHosts, addr) + } + } + dynamicHostsMu.Unlock() + } + }() + }) +} + // Host is the basic, in-memory representation of the state of a remote host. // Its fields are accessed atomically and Host values must not be copied. type Host struct { @@ -268,6 +307,28 @@ func GetDialInfo(ctx context.Context) (DialInfo, bool) { // through config reloads. var hosts = caddy.NewUsagePool() +// dynamicHosts tracks hosts that were provisioned from dynamic upstream +// sources. Unlike static upstreams which are reference-counted via the +// UsagePool, dynamic upstream hosts are not reference-counted. Instead, +// their last-seen time is updated on each request, and a background +// goroutine evicts entries that have been idle for dynamicHostIdleExpiry. +// This preserves health state (e.g. passive fail counts) across requests +// to the same dynamic backend. +var ( + dynamicHosts = make(map[string]dynamicHostEntry) + dynamicHostsMu sync.RWMutex + dynamicHostsCleanerOnce sync.Once + dynamicHostCleanupInterval = 5 * time.Minute + dynamicHostIdleExpiry = time.Hour +) + +// dynamicHostEntry holds a Host and the last time it was seen +// in a set of dynamic upstreams returned for a request. +type dynamicHostEntry struct { + host *Host + lastSeen time.Time +} + // dialInfoVarKey is the key used for the variable that holds // the dial info for the upstream connection. const dialInfoVarKey = "reverse_proxy.dial_info" diff --git a/modules/caddyhttp/reverseproxy/passive_health_test.go b/modules/caddyhttp/reverseproxy/passive_health_test.go new file mode 100644 index 000000000..0bd6da181 --- /dev/null +++ b/modules/caddyhttp/reverseproxy/passive_health_test.go @@ -0,0 +1,391 @@ +// Copyright 2015 Matthew Holt and The Caddy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reverseproxy + +import ( + "context" + "testing" + "time" + + "github.com/caddyserver/caddy/v2" +) + +// newPassiveHandler builds a minimal Handler with passive health checks +// configured and a live caddy.Context so the fail-forgetter goroutine can +// be cancelled cleanly. The caller must call cancel() when done. +func newPassiveHandler(t *testing.T, maxFails int, failDuration time.Duration) (*Handler, context.CancelFunc) { + t.Helper() + caddyCtx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) + h := &Handler{ + ctx: caddyCtx, + HealthChecks: &HealthChecks{ + Passive: &PassiveHealthChecks{ + MaxFails: maxFails, + FailDuration: caddy.Duration(failDuration), + }, + }, + } + return h, cancel +} + +// provisionedStaticUpstream creates a static upstream, registers it in the +// UsagePool, and returns a cleanup func that removes it from the pool. +func provisionedStaticUpstream(t *testing.T, h *Handler, addr string) (*Upstream, func()) { + t.Helper() + u := &Upstream{Dial: addr} + h.provisionUpstream(u, false) + return u, func() { _, _ = hosts.Delete(addr) } +} + +// provisionedDynamicUpstream creates a dynamic upstream, registers it in +// dynamicHosts, and returns a cleanup func that removes it. +func provisionedDynamicUpstream(t *testing.T, h *Handler, addr string) (*Upstream, func()) { + t.Helper() + u := &Upstream{Dial: addr} + h.provisionUpstream(u, true) + return u, func() { + dynamicHostsMu.Lock() + delete(dynamicHosts, addr) + dynamicHostsMu.Unlock() + } +} + +// --- countFailure behaviour --- + +// TestCountFailureNoopWhenNoHealthChecks verifies that countFailure is a no-op +// when HealthChecks is nil. +func TestCountFailureNoopWhenNoHealthChecks(t *testing.T) { + resetDynamicHosts() + h := &Handler{} + u := &Upstream{Dial: "10.1.0.1:80", Host: new(Host)} + + h.countFailure(u) + + if u.Host.Fails() != 0 { + t.Errorf("expected 0 fails with no HealthChecks config, got %d", u.Host.Fails()) + } +} + +// TestCountFailureNoopWhenZeroDuration verifies that countFailure is a no-op +// when FailDuration is 0 (the zero value disables passive checks). +func TestCountFailureNoopWhenZeroDuration(t *testing.T) { + resetDynamicHosts() + caddyCtx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) + defer cancel() + h := &Handler{ + ctx: caddyCtx, + HealthChecks: &HealthChecks{ + Passive: &PassiveHealthChecks{MaxFails: 1, FailDuration: 0}, + }, + } + u := &Upstream{Dial: "10.1.0.2:80", Host: new(Host)} + + h.countFailure(u) + + if u.Host.Fails() != 0 { + t.Errorf("expected 0 fails with zero FailDuration, got %d", u.Host.Fails()) + } +} + +// TestCountFailureIncrementsCount verifies that countFailure increments the +// fail count on the upstream's Host. +func TestCountFailureIncrementsCount(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + u := &Upstream{Dial: "10.1.0.3:80", Host: new(Host)} + + h.countFailure(u) + + if u.Host.Fails() != 1 { + t.Errorf("expected 1 fail after countFailure, got %d", u.Host.Fails()) + } +} + +// TestCountFailureDecrementsAfterDuration verifies that the fail count is +// decremented back after FailDuration elapses. +func TestCountFailureDecrementsAfterDuration(t *testing.T) { + resetDynamicHosts() + const failDuration = 50 * time.Millisecond + h, cancel := newPassiveHandler(t, 2, failDuration) + defer cancel() + u := &Upstream{Dial: "10.1.0.4:80", Host: new(Host)} + + h.countFailure(u) + if u.Host.Fails() != 1 { + t.Fatalf("expected 1 fail immediately after countFailure, got %d", u.Host.Fails()) + } + + // Wait long enough for the forgetter goroutine to fire. + time.Sleep(3 * failDuration) + + if u.Host.Fails() != 0 { + t.Errorf("expected fail count to return to 0 after FailDuration, got %d", u.Host.Fails()) + } +} + +// TestCountFailureCancelledContextForgets verifies that cancelling the handler +// context (simulating a config unload) also triggers the forgetter to run, +// decrementing the fail count. +func TestCountFailureCancelledContextForgets(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Hour) // very long duration + u := &Upstream{Dial: "10.1.0.5:80", Host: new(Host)} + + h.countFailure(u) + if u.Host.Fails() != 1 { + t.Fatalf("expected 1 fail immediately after countFailure, got %d", u.Host.Fails()) + } + + // Cancelling the context should cause the forgetter goroutine to exit and + // decrement the count. + cancel() + time.Sleep(50 * time.Millisecond) + + if u.Host.Fails() != 0 { + t.Errorf("expected fail count to be decremented after context cancel, got %d", u.Host.Fails()) + } +} + +// --- static upstream passive health check --- + +// TestStaticUpstreamHealthyWithNoFailures verifies that a static upstream with +// no recorded failures is considered healthy. +func TestStaticUpstreamHealthyWithNoFailures(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + + u, cleanup := provisionedStaticUpstream(t, h, "10.2.0.1:80") + defer cleanup() + + if !u.Healthy() { + t.Error("upstream with no failures should be healthy") + } +} + +// TestStaticUpstreamUnhealthyAtMaxFails verifies that a static upstream is +// marked unhealthy once its fail count reaches MaxFails. +func TestStaticUpstreamUnhealthyAtMaxFails(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + + u, cleanup := provisionedStaticUpstream(t, h, "10.2.0.2:80") + defer cleanup() + + h.countFailure(u) + if !u.Healthy() { + t.Error("upstream should still be healthy after 1 of 2 allowed failures") + } + + h.countFailure(u) + if u.Healthy() { + t.Error("upstream should be unhealthy after reaching MaxFails=2") + } +} + +// TestStaticUpstreamRecoversAfterFailDuration verifies that a static upstream +// returns to healthy once its failures expire. +func TestStaticUpstreamRecoversAfterFailDuration(t *testing.T) { + resetDynamicHosts() + const failDuration = 50 * time.Millisecond + h, cancel := newPassiveHandler(t, 1, failDuration) + defer cancel() + + u, cleanup := provisionedStaticUpstream(t, h, "10.2.0.3:80") + defer cleanup() + + h.countFailure(u) + if u.Healthy() { + t.Fatal("upstream should be unhealthy immediately after MaxFails failure") + } + + time.Sleep(3 * failDuration) + + if !u.Healthy() { + t.Errorf("upstream should recover to healthy after FailDuration, Fails=%d", u.Host.Fails()) + } +} + +// TestStaticUpstreamHealthPersistedAcrossReprovisioning verifies that static +// upstreams share a Host via the UsagePool, so a second call to provisionUpstream +// for the same address (as happens on config reload) sees the accumulated state. +func TestStaticUpstreamHealthPersistedAcrossReprovisioning(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + + u1, cleanup1 := provisionedStaticUpstream(t, h, "10.2.0.4:80") + defer cleanup1() + + h.countFailure(u1) + h.countFailure(u1) + + // Simulate a second handler instance referencing the same upstream + // (e.g. after a config reload that keeps the same backend address). + u2, cleanup2 := provisionedStaticUpstream(t, h, "10.2.0.4:80") + defer cleanup2() + + if u1.Host != u2.Host { + t.Fatal("expected both Upstream structs to share the same *Host via UsagePool") + } + if u2.Healthy() { + t.Error("re-provisioned upstream should still see the prior fail count and be unhealthy") + } +} + +// --- dynamic upstream passive health check --- + +// TestDynamicUpstreamHealthyWithNoFailures verifies that a freshly provisioned +// dynamic upstream is healthy. +func TestDynamicUpstreamHealthyWithNoFailures(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + + u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.1:80") + defer cleanup() + + if !u.Healthy() { + t.Error("dynamic upstream with no failures should be healthy") + } +} + +// TestDynamicUpstreamUnhealthyAtMaxFails verifies that a dynamic upstream is +// marked unhealthy once its fail count reaches MaxFails. +func TestDynamicUpstreamUnhealthyAtMaxFails(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + + u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.2:80") + defer cleanup() + + h.countFailure(u) + if !u.Healthy() { + t.Error("dynamic upstream should still be healthy after 1 of 2 allowed failures") + } + + h.countFailure(u) + if u.Healthy() { + t.Error("dynamic upstream should be unhealthy after reaching MaxFails=2") + } +} + +// TestDynamicUpstreamFailCountPersistedBetweenRequests is the core regression +// test: it simulates two sequential (non-concurrent) requests to the same +// dynamic upstream. Before the fix, the UsagePool entry would be deleted +// between requests, wiping the fail count. Now it should survive. +func TestDynamicUpstreamFailCountPersistedBetweenRequests(t *testing.T) { + resetDynamicHosts() + h, cancel := newPassiveHandler(t, 2, time.Minute) + defer cancel() + + // --- first request --- + u1 := &Upstream{Dial: "10.3.0.3:80"} + h.provisionUpstream(u1, true) + h.countFailure(u1) + + if u1.Host.Fails() != 1 { + t.Fatalf("expected 1 fail after first request, got %d", u1.Host.Fails()) + } + + // Simulate end of first request: no delete from any pool (key difference + // vs. the old behaviour where hosts.Delete was deferred). + + // --- second request: brand-new *Upstream struct, same dial address --- + u2 := &Upstream{Dial: "10.3.0.3:80"} + h.provisionUpstream(u2, true) + + if u1.Host != u2.Host { + t.Fatal("expected both requests to share the same *Host pointer from dynamicHosts") + } + if u2.Host.Fails() != 1 { + t.Errorf("expected fail count to persist across requests, got %d", u2.Host.Fails()) + } + + // A second failure now tips it over MaxFails=2. + h.countFailure(u2) + if u2.Healthy() { + t.Error("upstream should be unhealthy after accumulated failures across requests") + } + + // Cleanup. + dynamicHostsMu.Lock() + delete(dynamicHosts, "10.3.0.3:80") + dynamicHostsMu.Unlock() +} + +// TestDynamicUpstreamRecoveryAfterFailDuration verifies that a dynamic +// upstream's fail count expires and it returns to healthy. +func TestDynamicUpstreamRecoveryAfterFailDuration(t *testing.T) { + resetDynamicHosts() + const failDuration = 50 * time.Millisecond + h, cancel := newPassiveHandler(t, 1, failDuration) + defer cancel() + + u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.4:80") + defer cleanup() + + h.countFailure(u) + if u.Healthy() { + t.Fatal("upstream should be unhealthy immediately after MaxFails failure") + } + + time.Sleep(3 * failDuration) + + // Re-provision (as a new request would) to get fresh *Upstream with policy set. + u2 := &Upstream{Dial: "10.3.0.4:80"} + h.provisionUpstream(u2, true) + + if !u2.Healthy() { + t.Errorf("dynamic upstream should recover to healthy after FailDuration, Fails=%d", u2.Host.Fails()) + } +} + +// TestDynamicUpstreamMaxRequestsFromUnhealthyRequestCount verifies that +// UnhealthyRequestCount is copied into MaxRequests so Full() works correctly. +func TestDynamicUpstreamMaxRequestsFromUnhealthyRequestCount(t *testing.T) { + resetDynamicHosts() + caddyCtx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) + defer cancel() + h := &Handler{ + ctx: caddyCtx, + HealthChecks: &HealthChecks{ + Passive: &PassiveHealthChecks{ + UnhealthyRequestCount: 3, + }, + }, + } + + u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.5:80") + defer cleanup() + + if u.MaxRequests != 3 { + t.Errorf("expected MaxRequests=3 from UnhealthyRequestCount, got %d", u.MaxRequests) + } + + // Should not be full with fewer requests than the limit. + _ = u.Host.countRequest(2) + if u.Full() { + t.Error("upstream should not be full with 2 of 3 allowed requests") + } + + _ = u.Host.countRequest(1) + if !u.Full() { + t.Error("upstream should be full at UnhealthyRequestCount concurrent requests") + } +} diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index d83c3e709..2ea063bd7 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -392,7 +392,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { // set up upstreams for _, u := range h.Upstreams { - h.provisionUpstream(u) + h.provisionUpstream(u, false) } if h.HealthChecks != nil { @@ -563,18 +563,11 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h } else { upstreams = dUpstreams for _, dUp := range dUpstreams { - h.provisionUpstream(dUp) + h.provisionUpstream(dUp, true) } if c := h.logger.Check(zapcore.DebugLevel, "provisioned dynamic upstreams"); c != nil { c.Write(zap.Int("count", len(dUpstreams))) } - defer func() { - // these upstreams are dynamic, so they are only used for this iteration - // of the proxy loop; be sure to let them go away when we're done with them - for _, upstream := range dUpstreams { - _, _ = hosts.Delete(upstream.String()) - } - }() } } @@ -1324,9 +1317,16 @@ func (h *Handler) directRequest(req *http.Request, di DialInfo) { req.URL.Host = reqHost } -func (h Handler) provisionUpstream(upstream *Upstream) { - // create or get the host representation for this upstream - upstream.fillHost() +func (h Handler) provisionUpstream(upstream *Upstream, dynamic bool) { + // create or get the host representation for this upstream; + // dynamic upstreams are tracked in a separate map with last-seen + // timestamps so their health state persists across requests without + // being reference-counted (and thus discarded between requests). + if dynamic { + upstream.fillDynamicHost() + } else { + upstream.fillHost() + } // give it the circuit breaker, if any upstream.cb = h.CB