reverseproxy: Track dynamic upstreams, enable passive healthchecking (#7539)

* reverseproxy: Track dynamic upstreams, enable passive healthchecking

* Add tests for dynamic upstream tracking, admin endpoint, health checks
This commit is contained in:
Francis Lavoie
2026-03-04 15:05:26 -05:00
committed by GitHub
parent 7e83775e3a
commit db2986028f
6 changed files with 1096 additions and 13 deletions

View File

@@ -75,7 +75,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
results := []upstreamStatus{}
knownHosts := make(map[string]struct{})
// Iterate over the upstream pool (needs to be fast)
// Iterate over the static upstream pool (needs to be fast)
var rangeErr error
hosts.Range(func(key, val any) bool {
address, ok := key.(string)
@@ -121,6 +121,17 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
return rangeErr
}
// Also include dynamic upstreams
dynamicHostsMu.RLock()
for address, entry := range dynamicHosts {
results = append(results, upstreamStatus{
Address: address,
NumRequests: entry.host.NumRequests(),
Fails: entry.host.Fails(),
})
}
dynamicHostsMu.RUnlock()
err := enc.Encode(results)
if err != nil {
return caddy.APIError{

View File

@@ -0,0 +1,275 @@
// Copyright 2015 Matthew Holt and The Caddy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reverseproxy
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
// adminHandlerFixture sets up the global host state for an admin endpoint test
// and returns a cleanup function that must be deferred by the caller.
//
// staticAddrs are inserted into the UsagePool (as a static upstream would be).
// dynamicAddrs are inserted into the dynamicHosts map (as a dynamic upstream would be).
func adminHandlerFixture(t *testing.T, staticAddrs, dynamicAddrs []string) func() {
t.Helper()
for _, addr := range staticAddrs {
u := &Upstream{Dial: addr}
u.fillHost()
}
dynamicHostsMu.Lock()
for _, addr := range dynamicAddrs {
dynamicHosts[addr] = dynamicHostEntry{host: new(Host), lastSeen: time.Now()}
}
dynamicHostsMu.Unlock()
return func() {
// Remove static entries from the UsagePool.
for _, addr := range staticAddrs {
_, _ = hosts.Delete(addr)
}
// Remove dynamic entries.
dynamicHostsMu.Lock()
for _, addr := range dynamicAddrs {
delete(dynamicHosts, addr)
}
dynamicHostsMu.Unlock()
}
}
// callAdminUpstreams fires a GET against handleUpstreams and returns the
// decoded response body.
func callAdminUpstreams(t *testing.T) []upstreamStatus {
t.Helper()
req := httptest.NewRequest(http.MethodGet, "/reverse_proxy/upstreams", nil)
w := httptest.NewRecorder()
handler := adminUpstreams{}
if err := handler.handleUpstreams(w, req); err != nil {
t.Fatalf("handleUpstreams returned unexpected error: %v", err)
}
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
if ct := w.Header().Get("Content-Type"); ct != "application/json" {
t.Fatalf("expected Content-Type application/json, got %q", ct)
}
var results []upstreamStatus
if err := json.NewDecoder(w.Body).Decode(&results); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
return results
}
// resultsByAddress indexes a slice of upstreamStatus by address for easier
// lookup in assertions.
func resultsByAddress(statuses []upstreamStatus) map[string]upstreamStatus {
m := make(map[string]upstreamStatus, len(statuses))
for _, s := range statuses {
m[s.Address] = s
}
return m
}
// TestAdminUpstreamsMethodNotAllowed verifies that non-GET methods are rejected.
func TestAdminUpstreamsMethodNotAllowed(t *testing.T) {
for _, method := range []string{http.MethodPost, http.MethodPut, http.MethodDelete} {
req := httptest.NewRequest(method, "/reverse_proxy/upstreams", nil)
w := httptest.NewRecorder()
err := (adminUpstreams{}).handleUpstreams(w, req)
if err == nil {
t.Errorf("method %s: expected an error, got nil", method)
continue
}
apiErr, ok := err.(interface{ HTTPStatus() int })
if !ok {
// caddy.APIError stores the code in HTTPStatus field, access via the
// exported interface it satisfies indirectly; just check non-nil.
continue
}
if code := apiErr.HTTPStatus(); code != http.StatusMethodNotAllowed {
t.Errorf("method %s: expected 405, got %d", method, code)
}
}
}
// TestAdminUpstreamsEmpty verifies that an empty response is valid JSON when
// no upstreams are registered.
func TestAdminUpstreamsEmpty(t *testing.T) {
resetDynamicHosts()
results := callAdminUpstreams(t)
if results == nil {
t.Error("expected non-nil (empty) slice, got nil")
}
if len(results) != 0 {
t.Errorf("expected 0 results with empty pools, got %d", len(results))
}
}
// TestAdminUpstreamsStaticOnly verifies that static upstreams (from the
// UsagePool) appear in the response with correct addresses.
func TestAdminUpstreamsStaticOnly(t *testing.T) {
resetDynamicHosts()
cleanup := adminHandlerFixture(t,
[]string{"10.0.0.1:80", "10.0.0.2:80"},
nil,
)
defer cleanup()
results := callAdminUpstreams(t)
byAddr := resultsByAddress(results)
for _, addr := range []string{"10.0.0.1:80", "10.0.0.2:80"} {
if _, ok := byAddr[addr]; !ok {
t.Errorf("expected static upstream %q in response", addr)
}
}
if len(results) != 2 {
t.Errorf("expected exactly 2 results, got %d", len(results))
}
}
// TestAdminUpstreamsDynamicOnly verifies that dynamic upstreams (from
// dynamicHosts) appear in the response with correct addresses.
func TestAdminUpstreamsDynamicOnly(t *testing.T) {
resetDynamicHosts()
cleanup := adminHandlerFixture(t,
nil,
[]string{"10.0.1.1:80", "10.0.1.2:80"},
)
defer cleanup()
results := callAdminUpstreams(t)
byAddr := resultsByAddress(results)
for _, addr := range []string{"10.0.1.1:80", "10.0.1.2:80"} {
if _, ok := byAddr[addr]; !ok {
t.Errorf("expected dynamic upstream %q in response", addr)
}
}
if len(results) != 2 {
t.Errorf("expected exactly 2 results, got %d", len(results))
}
}
// TestAdminUpstreamsBothPools verifies that static and dynamic upstreams are
// both present in the same response and that there is no overlap or omission.
func TestAdminUpstreamsBothPools(t *testing.T) {
resetDynamicHosts()
cleanup := adminHandlerFixture(t,
[]string{"10.0.2.1:80"},
[]string{"10.0.2.2:80"},
)
defer cleanup()
results := callAdminUpstreams(t)
if len(results) != 2 {
t.Fatalf("expected 2 results (1 static + 1 dynamic), got %d", len(results))
}
byAddr := resultsByAddress(results)
if _, ok := byAddr["10.0.2.1:80"]; !ok {
t.Error("static upstream missing from response")
}
if _, ok := byAddr["10.0.2.2:80"]; !ok {
t.Error("dynamic upstream missing from response")
}
}
// TestAdminUpstreamsNoOverlapBetweenPools verifies that an address registered
// only as a static upstream does not also appear as a dynamic entry, and
// vice-versa.
func TestAdminUpstreamsNoOverlapBetweenPools(t *testing.T) {
resetDynamicHosts()
cleanup := adminHandlerFixture(t,
[]string{"10.0.3.1:80"},
[]string{"10.0.3.2:80"},
)
defer cleanup()
results := callAdminUpstreams(t)
seen := make(map[string]int)
for _, r := range results {
seen[r.Address]++
}
for addr, count := range seen {
if count > 1 {
t.Errorf("address %q appeared %d times; expected exactly once", addr, count)
}
}
}
// TestAdminUpstreamsReportsFailCounts verifies that fail counts accumulated on
// a dynamic upstream's Host are reflected in the response.
func TestAdminUpstreamsReportsFailCounts(t *testing.T) {
resetDynamicHosts()
const addr = "10.0.4.1:80"
h := new(Host)
_ = h.countFail(3)
dynamicHostsMu.Lock()
dynamicHosts[addr] = dynamicHostEntry{host: h, lastSeen: time.Now()}
dynamicHostsMu.Unlock()
defer func() {
dynamicHostsMu.Lock()
delete(dynamicHosts, addr)
dynamicHostsMu.Unlock()
}()
results := callAdminUpstreams(t)
byAddr := resultsByAddress(results)
status, ok := byAddr[addr]
if !ok {
t.Fatalf("expected %q in response", addr)
}
if status.Fails != 3 {
t.Errorf("expected Fails=3, got %d", status.Fails)
}
}
// TestAdminUpstreamsReportsNumRequests verifies that the active request count
// for a static upstream is reflected in the response.
func TestAdminUpstreamsReportsNumRequests(t *testing.T) {
resetDynamicHosts()
const addr = "10.0.4.2:80"
u := &Upstream{Dial: addr}
u.fillHost()
defer func() { _, _ = hosts.Delete(addr) }()
_ = u.Host.countRequest(2)
defer func() { _ = u.Host.countRequest(-2) }()
results := callAdminUpstreams(t)
byAddr := resultsByAddress(results)
status, ok := byAddr[addr]
if !ok {
t.Fatalf("expected %q in response", addr)
}
if status.NumRequests != 2 {
t.Errorf("expected NumRequests=2, got %d", status.NumRequests)
}
}

View File

@@ -0,0 +1,345 @@
// Copyright 2015 Matthew Holt and The Caddy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reverseproxy
import (
"sync"
"testing"
"time"
"github.com/caddyserver/caddy/v2"
)
// resetDynamicHosts clears global dynamic host state between tests.
func resetDynamicHosts() {
dynamicHostsMu.Lock()
dynamicHosts = make(map[string]dynamicHostEntry)
dynamicHostsMu.Unlock()
// Reset the Once so cleanup goroutine tests can re-trigger if needed.
dynamicHostsCleanerOnce = sync.Once{}
}
// TestFillDynamicHostCreatesEntry verifies that calling fillDynamicHost on a
// new address inserts an entry into dynamicHosts and assigns a non-nil Host.
func TestFillDynamicHostCreatesEntry(t *testing.T) {
resetDynamicHosts()
u := &Upstream{Dial: "192.0.2.1:80"}
u.fillDynamicHost()
if u.Host == nil {
t.Fatal("expected Host to be set after fillDynamicHost")
}
dynamicHostsMu.RLock()
entry, ok := dynamicHosts["192.0.2.1:80"]
dynamicHostsMu.RUnlock()
if !ok {
t.Fatal("expected entry in dynamicHosts map")
}
if entry.host != u.Host {
t.Error("dynamicHosts entry host should be the same pointer assigned to Upstream.Host")
}
if entry.lastSeen.IsZero() {
t.Error("expected lastSeen to be set")
}
}
// TestFillDynamicHostReusesSameHost verifies that two calls for the same address
// return the exact same *Host pointer so that state (e.g. fail counts) is shared.
func TestFillDynamicHostReusesSameHost(t *testing.T) {
resetDynamicHosts()
u1 := &Upstream{Dial: "192.0.2.2:80"}
u1.fillDynamicHost()
u2 := &Upstream{Dial: "192.0.2.2:80"}
u2.fillDynamicHost()
if u1.Host != u2.Host {
t.Error("expected both upstreams to share the same *Host pointer")
}
}
// TestFillDynamicHostUpdatesLastSeen verifies that a second call for the same
// address advances the lastSeen timestamp.
func TestFillDynamicHostUpdatesLastSeen(t *testing.T) {
resetDynamicHosts()
u := &Upstream{Dial: "192.0.2.3:80"}
u.fillDynamicHost()
dynamicHostsMu.RLock()
first := dynamicHosts["192.0.2.3:80"].lastSeen
dynamicHostsMu.RUnlock()
// Ensure measurable time passes.
time.Sleep(2 * time.Millisecond)
u2 := &Upstream{Dial: "192.0.2.3:80"}
u2.fillDynamicHost()
dynamicHostsMu.RLock()
second := dynamicHosts["192.0.2.3:80"].lastSeen
dynamicHostsMu.RUnlock()
if !second.After(first) {
t.Error("expected lastSeen to be updated on second fillDynamicHost call")
}
}
// TestFillDynamicHostIndependentAddresses verifies that different addresses get
// independent Host entries.
func TestFillDynamicHostIndependentAddresses(t *testing.T) {
resetDynamicHosts()
u1 := &Upstream{Dial: "192.0.2.4:80"}
u1.fillDynamicHost()
u2 := &Upstream{Dial: "192.0.2.5:80"}
u2.fillDynamicHost()
if u1.Host == u2.Host {
t.Error("different addresses should have different *Host entries")
}
}
// TestFillDynamicHostPreservesFailCount verifies that fail counts on a dynamic
// host survive across multiple fillDynamicHost calls (simulating sequential
// requests), which is the core behaviour fixed by this change.
func TestFillDynamicHostPreservesFailCount(t *testing.T) {
resetDynamicHosts()
// First "request": provision and record a failure.
u1 := &Upstream{Dial: "192.0.2.6:80"}
u1.fillDynamicHost()
_ = u1.Host.countFail(1)
if u1.Host.Fails() != 1 {
t.Fatalf("expected 1 fail, got %d", u1.Host.Fails())
}
// Second "request": provision the same address again (new *Upstream, same address).
u2 := &Upstream{Dial: "192.0.2.6:80"}
u2.fillDynamicHost()
if u2.Host.Fails() != 1 {
t.Errorf("expected fail count to persist across fillDynamicHost calls, got %d", u2.Host.Fails())
}
}
// TestProvisionUpstreamDynamic verifies that provisionUpstream with dynamic=true
// uses fillDynamicHost (not the UsagePool) and sets healthCheckPolicy /
// MaxRequests correctly from handler config.
func TestProvisionUpstreamDynamic(t *testing.T) {
resetDynamicHosts()
passive := &PassiveHealthChecks{
FailDuration: caddy.Duration(10 * time.Second),
MaxFails: 3,
UnhealthyRequestCount: 5,
}
h := Handler{
HealthChecks: &HealthChecks{
Passive: passive,
},
}
u := &Upstream{Dial: "192.0.2.7:80"}
h.provisionUpstream(u, true)
if u.Host == nil {
t.Fatal("Host should be set after provisionUpstream")
}
if u.healthCheckPolicy != passive {
t.Error("healthCheckPolicy should point to the handler's PassiveHealthChecks")
}
if u.MaxRequests != 5 {
t.Errorf("expected MaxRequests=5 from UnhealthyRequestCount, got %d", u.MaxRequests)
}
// Must be in dynamicHosts, not in the static UsagePool.
dynamicHostsMu.RLock()
_, inDynamic := dynamicHosts["192.0.2.7:80"]
dynamicHostsMu.RUnlock()
if !inDynamic {
t.Error("dynamic upstream should be stored in dynamicHosts")
}
_, inPool := hosts.References("192.0.2.7:80")
if inPool {
t.Error("dynamic upstream should NOT be stored in the static UsagePool")
}
}
// TestProvisionUpstreamStatic verifies that provisionUpstream with dynamic=false
// uses the UsagePool and does NOT insert into dynamicHosts.
func TestProvisionUpstreamStatic(t *testing.T) {
resetDynamicHosts()
h := Handler{}
u := &Upstream{Dial: "192.0.2.8:80"}
h.provisionUpstream(u, false)
if u.Host == nil {
t.Fatal("Host should be set after provisionUpstream")
}
refs, inPool := hosts.References("192.0.2.8:80")
if !inPool {
t.Error("static upstream should be in the UsagePool")
}
if refs != 1 {
t.Errorf("expected ref count 1, got %d", refs)
}
dynamicHostsMu.RLock()
_, inDynamic := dynamicHosts["192.0.2.8:80"]
dynamicHostsMu.RUnlock()
if inDynamic {
t.Error("static upstream should NOT be in dynamicHosts")
}
// Clean up the pool entry we just added.
_, _ = hosts.Delete("192.0.2.8:80")
}
// TestDynamicHostHealthyConsultsFails verifies the end-to-end passive health
// check path: after enough failures are recorded against a dynamic upstream's
// shared *Host, Healthy() returns false for a newly provisioned *Upstream with
// the same address.
func TestDynamicHostHealthyConsultsFails(t *testing.T) {
resetDynamicHosts()
passive := &PassiveHealthChecks{
FailDuration: caddy.Duration(time.Minute),
MaxFails: 2,
}
h := Handler{
HealthChecks: &HealthChecks{Passive: passive},
}
// First request: provision and record two failures.
u1 := &Upstream{Dial: "192.0.2.9:80"}
h.provisionUpstream(u1, true)
_ = u1.Host.countFail(1)
_ = u1.Host.countFail(1)
// Second request: fresh *Upstream, same address.
u2 := &Upstream{Dial: "192.0.2.9:80"}
h.provisionUpstream(u2, true)
if u2.Healthy() {
t.Error("upstream should be unhealthy after MaxFails failures have been recorded against its shared Host")
}
}
// TestDynamicHostCleanupEvictsStaleEntries verifies that the cleanup sweep
// removes entries whose lastSeen is older than dynamicHostIdleExpiry.
func TestDynamicHostCleanupEvictsStaleEntries(t *testing.T) {
resetDynamicHosts()
const addr = "192.0.2.10:80"
// Insert an entry directly with a lastSeen far in the past.
dynamicHostsMu.Lock()
dynamicHosts[addr] = dynamicHostEntry{
host: new(Host),
lastSeen: time.Now().Add(-2 * dynamicHostIdleExpiry),
}
dynamicHostsMu.Unlock()
// Run the cleanup logic inline (same logic as the goroutine).
dynamicHostsMu.Lock()
for a, entry := range dynamicHosts {
if time.Since(entry.lastSeen) > dynamicHostIdleExpiry {
delete(dynamicHosts, a)
}
}
dynamicHostsMu.Unlock()
dynamicHostsMu.RLock()
_, stillPresent := dynamicHosts[addr]
dynamicHostsMu.RUnlock()
if stillPresent {
t.Error("stale dynamic host entry should have been evicted by cleanup sweep")
}
}
// TestDynamicHostCleanupRetainsFreshEntries verifies that the cleanup sweep
// keeps entries whose lastSeen is within dynamicHostIdleExpiry.
func TestDynamicHostCleanupRetainsFreshEntries(t *testing.T) {
resetDynamicHosts()
const addr = "192.0.2.11:80"
dynamicHostsMu.Lock()
dynamicHosts[addr] = dynamicHostEntry{
host: new(Host),
lastSeen: time.Now(),
}
dynamicHostsMu.Unlock()
// Run the cleanup logic inline.
dynamicHostsMu.Lock()
for a, entry := range dynamicHosts {
if time.Since(entry.lastSeen) > dynamicHostIdleExpiry {
delete(dynamicHosts, a)
}
}
dynamicHostsMu.Unlock()
dynamicHostsMu.RLock()
_, stillPresent := dynamicHosts[addr]
dynamicHostsMu.RUnlock()
if !stillPresent {
t.Error("fresh dynamic host entry should be retained by cleanup sweep")
}
}
// TestDynamicHostConcurrentFillHost verifies that concurrent calls to
// fillDynamicHost for the same address all get the same *Host pointer and
// don't race (run with -race).
func TestDynamicHostConcurrentFillHost(t *testing.T) {
resetDynamicHosts()
const addr = "192.0.2.12:80"
const goroutines = 50
var wg sync.WaitGroup
hosts := make([]*Host, goroutines)
for i := range goroutines {
wg.Add(1)
go func(idx int) {
defer wg.Done()
u := &Upstream{Dial: addr}
u.fillDynamicHost()
hosts[idx] = u.Host
}(i)
}
wg.Wait()
first := hosts[0]
for i, h := range hosts {
if h != first {
t.Errorf("goroutine %d got a different *Host pointer; expected all to share the same entry", i)
}
}
}

View File

@@ -19,7 +19,9 @@ import (
"fmt"
"net/netip"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
@@ -132,6 +134,43 @@ func (u *Upstream) fillHost() {
u.Host = host
}
// fillDynamicHost is like fillHost, but stores the host in the separate
// dynamicHosts map rather than the reference-counted UsagePool. Dynamic
// hosts are not reference-counted; instead, they are retained as long as
// they are actively seen and are evicted by a background cleanup goroutine
// after dynamicHostIdleExpiry of inactivity. This preserves health state
// (e.g. passive fail counts) across sequential requests.
func (u *Upstream) fillDynamicHost() {
dynamicHostsMu.Lock()
entry, ok := dynamicHosts[u.String()]
if ok {
entry.lastSeen = time.Now()
dynamicHosts[u.String()] = entry
u.Host = entry.host
} else {
h := new(Host)
dynamicHosts[u.String()] = dynamicHostEntry{host: h, lastSeen: time.Now()}
u.Host = h
}
dynamicHostsMu.Unlock()
// ensure the cleanup goroutine is running
dynamicHostsCleanerOnce.Do(func() {
go func() {
for {
time.Sleep(dynamicHostCleanupInterval)
dynamicHostsMu.Lock()
for addr, entry := range dynamicHosts {
if time.Since(entry.lastSeen) > dynamicHostIdleExpiry {
delete(dynamicHosts, addr)
}
}
dynamicHostsMu.Unlock()
}
}()
})
}
// Host is the basic, in-memory representation of the state of a remote host.
// Its fields are accessed atomically and Host values must not be copied.
type Host struct {
@@ -268,6 +307,28 @@ func GetDialInfo(ctx context.Context) (DialInfo, bool) {
// through config reloads.
var hosts = caddy.NewUsagePool()
// dynamicHosts tracks hosts that were provisioned from dynamic upstream
// sources. Unlike static upstreams which are reference-counted via the
// UsagePool, dynamic upstream hosts are not reference-counted. Instead,
// their last-seen time is updated on each request, and a background
// goroutine evicts entries that have been idle for dynamicHostIdleExpiry.
// This preserves health state (e.g. passive fail counts) across requests
// to the same dynamic backend.
var (
dynamicHosts = make(map[string]dynamicHostEntry)
dynamicHostsMu sync.RWMutex
dynamicHostsCleanerOnce sync.Once
dynamicHostCleanupInterval = 5 * time.Minute
dynamicHostIdleExpiry = time.Hour
)
// dynamicHostEntry holds a Host and the last time it was seen
// in a set of dynamic upstreams returned for a request.
type dynamicHostEntry struct {
host *Host
lastSeen time.Time
}
// dialInfoVarKey is the key used for the variable that holds
// the dial info for the upstream connection.
const dialInfoVarKey = "reverse_proxy.dial_info"

View File

@@ -0,0 +1,391 @@
// Copyright 2015 Matthew Holt and The Caddy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reverseproxy
import (
"context"
"testing"
"time"
"github.com/caddyserver/caddy/v2"
)
// newPassiveHandler builds a minimal Handler with passive health checks
// configured and a live caddy.Context so the fail-forgetter goroutine can
// be cancelled cleanly. The caller must call cancel() when done.
func newPassiveHandler(t *testing.T, maxFails int, failDuration time.Duration) (*Handler, context.CancelFunc) {
t.Helper()
caddyCtx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()})
h := &Handler{
ctx: caddyCtx,
HealthChecks: &HealthChecks{
Passive: &PassiveHealthChecks{
MaxFails: maxFails,
FailDuration: caddy.Duration(failDuration),
},
},
}
return h, cancel
}
// provisionedStaticUpstream creates a static upstream, registers it in the
// UsagePool, and returns a cleanup func that removes it from the pool.
func provisionedStaticUpstream(t *testing.T, h *Handler, addr string) (*Upstream, func()) {
t.Helper()
u := &Upstream{Dial: addr}
h.provisionUpstream(u, false)
return u, func() { _, _ = hosts.Delete(addr) }
}
// provisionedDynamicUpstream creates a dynamic upstream, registers it in
// dynamicHosts, and returns a cleanup func that removes it.
func provisionedDynamicUpstream(t *testing.T, h *Handler, addr string) (*Upstream, func()) {
t.Helper()
u := &Upstream{Dial: addr}
h.provisionUpstream(u, true)
return u, func() {
dynamicHostsMu.Lock()
delete(dynamicHosts, addr)
dynamicHostsMu.Unlock()
}
}
// --- countFailure behaviour ---
// TestCountFailureNoopWhenNoHealthChecks verifies that countFailure is a no-op
// when HealthChecks is nil.
func TestCountFailureNoopWhenNoHealthChecks(t *testing.T) {
resetDynamicHosts()
h := &Handler{}
u := &Upstream{Dial: "10.1.0.1:80", Host: new(Host)}
h.countFailure(u)
if u.Host.Fails() != 0 {
t.Errorf("expected 0 fails with no HealthChecks config, got %d", u.Host.Fails())
}
}
// TestCountFailureNoopWhenZeroDuration verifies that countFailure is a no-op
// when FailDuration is 0 (the zero value disables passive checks).
func TestCountFailureNoopWhenZeroDuration(t *testing.T) {
resetDynamicHosts()
caddyCtx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()})
defer cancel()
h := &Handler{
ctx: caddyCtx,
HealthChecks: &HealthChecks{
Passive: &PassiveHealthChecks{MaxFails: 1, FailDuration: 0},
},
}
u := &Upstream{Dial: "10.1.0.2:80", Host: new(Host)}
h.countFailure(u)
if u.Host.Fails() != 0 {
t.Errorf("expected 0 fails with zero FailDuration, got %d", u.Host.Fails())
}
}
// TestCountFailureIncrementsCount verifies that countFailure increments the
// fail count on the upstream's Host.
func TestCountFailureIncrementsCount(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
u := &Upstream{Dial: "10.1.0.3:80", Host: new(Host)}
h.countFailure(u)
if u.Host.Fails() != 1 {
t.Errorf("expected 1 fail after countFailure, got %d", u.Host.Fails())
}
}
// TestCountFailureDecrementsAfterDuration verifies that the fail count is
// decremented back after FailDuration elapses.
func TestCountFailureDecrementsAfterDuration(t *testing.T) {
resetDynamicHosts()
const failDuration = 50 * time.Millisecond
h, cancel := newPassiveHandler(t, 2, failDuration)
defer cancel()
u := &Upstream{Dial: "10.1.0.4:80", Host: new(Host)}
h.countFailure(u)
if u.Host.Fails() != 1 {
t.Fatalf("expected 1 fail immediately after countFailure, got %d", u.Host.Fails())
}
// Wait long enough for the forgetter goroutine to fire.
time.Sleep(3 * failDuration)
if u.Host.Fails() != 0 {
t.Errorf("expected fail count to return to 0 after FailDuration, got %d", u.Host.Fails())
}
}
// TestCountFailureCancelledContextForgets verifies that cancelling the handler
// context (simulating a config unload) also triggers the forgetter to run,
// decrementing the fail count.
func TestCountFailureCancelledContextForgets(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Hour) // very long duration
u := &Upstream{Dial: "10.1.0.5:80", Host: new(Host)}
h.countFailure(u)
if u.Host.Fails() != 1 {
t.Fatalf("expected 1 fail immediately after countFailure, got %d", u.Host.Fails())
}
// Cancelling the context should cause the forgetter goroutine to exit and
// decrement the count.
cancel()
time.Sleep(50 * time.Millisecond)
if u.Host.Fails() != 0 {
t.Errorf("expected fail count to be decremented after context cancel, got %d", u.Host.Fails())
}
}
// --- static upstream passive health check ---
// TestStaticUpstreamHealthyWithNoFailures verifies that a static upstream with
// no recorded failures is considered healthy.
func TestStaticUpstreamHealthyWithNoFailures(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
u, cleanup := provisionedStaticUpstream(t, h, "10.2.0.1:80")
defer cleanup()
if !u.Healthy() {
t.Error("upstream with no failures should be healthy")
}
}
// TestStaticUpstreamUnhealthyAtMaxFails verifies that a static upstream is
// marked unhealthy once its fail count reaches MaxFails.
func TestStaticUpstreamUnhealthyAtMaxFails(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
u, cleanup := provisionedStaticUpstream(t, h, "10.2.0.2:80")
defer cleanup()
h.countFailure(u)
if !u.Healthy() {
t.Error("upstream should still be healthy after 1 of 2 allowed failures")
}
h.countFailure(u)
if u.Healthy() {
t.Error("upstream should be unhealthy after reaching MaxFails=2")
}
}
// TestStaticUpstreamRecoversAfterFailDuration verifies that a static upstream
// returns to healthy once its failures expire.
func TestStaticUpstreamRecoversAfterFailDuration(t *testing.T) {
resetDynamicHosts()
const failDuration = 50 * time.Millisecond
h, cancel := newPassiveHandler(t, 1, failDuration)
defer cancel()
u, cleanup := provisionedStaticUpstream(t, h, "10.2.0.3:80")
defer cleanup()
h.countFailure(u)
if u.Healthy() {
t.Fatal("upstream should be unhealthy immediately after MaxFails failure")
}
time.Sleep(3 * failDuration)
if !u.Healthy() {
t.Errorf("upstream should recover to healthy after FailDuration, Fails=%d", u.Host.Fails())
}
}
// TestStaticUpstreamHealthPersistedAcrossReprovisioning verifies that static
// upstreams share a Host via the UsagePool, so a second call to provisionUpstream
// for the same address (as happens on config reload) sees the accumulated state.
func TestStaticUpstreamHealthPersistedAcrossReprovisioning(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
u1, cleanup1 := provisionedStaticUpstream(t, h, "10.2.0.4:80")
defer cleanup1()
h.countFailure(u1)
h.countFailure(u1)
// Simulate a second handler instance referencing the same upstream
// (e.g. after a config reload that keeps the same backend address).
u2, cleanup2 := provisionedStaticUpstream(t, h, "10.2.0.4:80")
defer cleanup2()
if u1.Host != u2.Host {
t.Fatal("expected both Upstream structs to share the same *Host via UsagePool")
}
if u2.Healthy() {
t.Error("re-provisioned upstream should still see the prior fail count and be unhealthy")
}
}
// --- dynamic upstream passive health check ---
// TestDynamicUpstreamHealthyWithNoFailures verifies that a freshly provisioned
// dynamic upstream is healthy.
func TestDynamicUpstreamHealthyWithNoFailures(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.1:80")
defer cleanup()
if !u.Healthy() {
t.Error("dynamic upstream with no failures should be healthy")
}
}
// TestDynamicUpstreamUnhealthyAtMaxFails verifies that a dynamic upstream is
// marked unhealthy once its fail count reaches MaxFails.
func TestDynamicUpstreamUnhealthyAtMaxFails(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.2:80")
defer cleanup()
h.countFailure(u)
if !u.Healthy() {
t.Error("dynamic upstream should still be healthy after 1 of 2 allowed failures")
}
h.countFailure(u)
if u.Healthy() {
t.Error("dynamic upstream should be unhealthy after reaching MaxFails=2")
}
}
// TestDynamicUpstreamFailCountPersistedBetweenRequests is the core regression
// test: it simulates two sequential (non-concurrent) requests to the same
// dynamic upstream. Before the fix, the UsagePool entry would be deleted
// between requests, wiping the fail count. Now it should survive.
func TestDynamicUpstreamFailCountPersistedBetweenRequests(t *testing.T) {
resetDynamicHosts()
h, cancel := newPassiveHandler(t, 2, time.Minute)
defer cancel()
// --- first request ---
u1 := &Upstream{Dial: "10.3.0.3:80"}
h.provisionUpstream(u1, true)
h.countFailure(u1)
if u1.Host.Fails() != 1 {
t.Fatalf("expected 1 fail after first request, got %d", u1.Host.Fails())
}
// Simulate end of first request: no delete from any pool (key difference
// vs. the old behaviour where hosts.Delete was deferred).
// --- second request: brand-new *Upstream struct, same dial address ---
u2 := &Upstream{Dial: "10.3.0.3:80"}
h.provisionUpstream(u2, true)
if u1.Host != u2.Host {
t.Fatal("expected both requests to share the same *Host pointer from dynamicHosts")
}
if u2.Host.Fails() != 1 {
t.Errorf("expected fail count to persist across requests, got %d", u2.Host.Fails())
}
// A second failure now tips it over MaxFails=2.
h.countFailure(u2)
if u2.Healthy() {
t.Error("upstream should be unhealthy after accumulated failures across requests")
}
// Cleanup.
dynamicHostsMu.Lock()
delete(dynamicHosts, "10.3.0.3:80")
dynamicHostsMu.Unlock()
}
// TestDynamicUpstreamRecoveryAfterFailDuration verifies that a dynamic
// upstream's fail count expires and it returns to healthy.
func TestDynamicUpstreamRecoveryAfterFailDuration(t *testing.T) {
resetDynamicHosts()
const failDuration = 50 * time.Millisecond
h, cancel := newPassiveHandler(t, 1, failDuration)
defer cancel()
u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.4:80")
defer cleanup()
h.countFailure(u)
if u.Healthy() {
t.Fatal("upstream should be unhealthy immediately after MaxFails failure")
}
time.Sleep(3 * failDuration)
// Re-provision (as a new request would) to get fresh *Upstream with policy set.
u2 := &Upstream{Dial: "10.3.0.4:80"}
h.provisionUpstream(u2, true)
if !u2.Healthy() {
t.Errorf("dynamic upstream should recover to healthy after FailDuration, Fails=%d", u2.Host.Fails())
}
}
// TestDynamicUpstreamMaxRequestsFromUnhealthyRequestCount verifies that
// UnhealthyRequestCount is copied into MaxRequests so Full() works correctly.
func TestDynamicUpstreamMaxRequestsFromUnhealthyRequestCount(t *testing.T) {
resetDynamicHosts()
caddyCtx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()})
defer cancel()
h := &Handler{
ctx: caddyCtx,
HealthChecks: &HealthChecks{
Passive: &PassiveHealthChecks{
UnhealthyRequestCount: 3,
},
},
}
u, cleanup := provisionedDynamicUpstream(t, h, "10.3.0.5:80")
defer cleanup()
if u.MaxRequests != 3 {
t.Errorf("expected MaxRequests=3 from UnhealthyRequestCount, got %d", u.MaxRequests)
}
// Should not be full with fewer requests than the limit.
_ = u.Host.countRequest(2)
if u.Full() {
t.Error("upstream should not be full with 2 of 3 allowed requests")
}
_ = u.Host.countRequest(1)
if !u.Full() {
t.Error("upstream should be full at UnhealthyRequestCount concurrent requests")
}
}

View File

@@ -392,7 +392,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
// set up upstreams
for _, u := range h.Upstreams {
h.provisionUpstream(u)
h.provisionUpstream(u, false)
}
if h.HealthChecks != nil {
@@ -563,18 +563,11 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
} else {
upstreams = dUpstreams
for _, dUp := range dUpstreams {
h.provisionUpstream(dUp)
h.provisionUpstream(dUp, true)
}
if c := h.logger.Check(zapcore.DebugLevel, "provisioned dynamic upstreams"); c != nil {
c.Write(zap.Int("count", len(dUpstreams)))
}
defer func() {
// these upstreams are dynamic, so they are only used for this iteration
// of the proxy loop; be sure to let them go away when we're done with them
for _, upstream := range dUpstreams {
_, _ = hosts.Delete(upstream.String())
}
}()
}
}
@@ -1324,9 +1317,16 @@ func (h *Handler) directRequest(req *http.Request, di DialInfo) {
req.URL.Host = reqHost
}
func (h Handler) provisionUpstream(upstream *Upstream) {
// create or get the host representation for this upstream
func (h Handler) provisionUpstream(upstream *Upstream, dynamic bool) {
// create or get the host representation for this upstream;
// dynamic upstreams are tracked in a separate map with last-seen
// timestamps so their health state persists across requests without
// being reference-counted (and thus discarded between requests).
if dynamic {
upstream.fillDynamicHost()
} else {
upstream.fillHost()
}
// give it the circuit breaker, if any
upstream.cb = h.CB