mirror of
https://github.com/leanprover/lean4.git
synced 2026-03-17 18:34:06 +00:00
feat: client
This commit is contained in:
@@ -7,6 +7,7 @@ module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Server
|
||||
public import Std.Internal.Http.Client
|
||||
|
||||
public section
|
||||
|
||||
|
||||
292
src/Std/Internal/Http/Client.lean
Normal file
292
src/Std/Internal/Http/Client.lean
Normal file
@@ -0,0 +1,292 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Client.Pool
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Http
|
||||
namespace Client
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
open Std Internal IO Async TCP Protocol
|
||||
open Time
|
||||
|
||||
/-!
|
||||
# Client
|
||||
|
||||
A top-level HTTP client backed by a connection pool, similar to `reqwest::Client`.
|
||||
Use `Client.builder` to construct, then `client.get "https://..."` etc.
|
||||
|
||||
```lean
|
||||
let client ← Client.builder
|
||||
|>.proxy! "http://proxy.example.com:8080"
|
||||
|>.build
|
||||
|
||||
let res ← client.get "https://api.example.com/data"
|
||||
|>.header! "Accept" "application/json"
|
||||
|>.send
|
||||
```
|
||||
-/
|
||||
|
||||
/--
|
||||
A top-level HTTP client backed by a connection pool.
|
||||
-/
|
||||
abbrev Client := Agent.Pool
|
||||
|
||||
/--
|
||||
Builder for `Client`. Chain configuration setters then call `.build`.
|
||||
-/
|
||||
public structure Client.Builder where
|
||||
/-- Configuration applied to all sessions created by this client. -/
|
||||
config : Config := {}
|
||||
/-- Maximum number of pooled connections per host. -/
|
||||
maxPerHost : Nat := 4
|
||||
|
||||
namespace Client.Builder
|
||||
|
||||
/--
|
||||
Routes all connections through a proxy.
|
||||
`host` is the proxy hostname, `port` is the proxy port.
|
||||
Only HTTP proxies are supported.
|
||||
-/
|
||||
def proxy (b : Client.Builder) (host : String) (port : UInt16) : Client.Builder :=
|
||||
{ b with config := { b.config with proxy := some (host, port) } }
|
||||
|
||||
/--
|
||||
Routes all connections through a proxy specified as a URL string.
|
||||
Returns `none` if the URL is invalid or has no authority component.
|
||||
-/
|
||||
def proxy? (b : Client.Builder) (url : String) : Option Client.Builder := do
|
||||
let uri ← URI.parse? url
|
||||
let auth ← uri.authority
|
||||
let host := toString auth.host
|
||||
let defaultPort : UInt16 := if uri.scheme.val == "https" then 443 else 80
|
||||
let port : UInt16 := match auth.port with
|
||||
| .value p => p
|
||||
| _ => defaultPort
|
||||
pure { b with config := { b.config with proxy := some (host, port) } }
|
||||
|
||||
/--
|
||||
Sets the total request timeout (connect + send + receive).
|
||||
-/
|
||||
def timeout (b : Client.Builder) (ms : Time.Millisecond.Offset) : Client.Builder :=
|
||||
if h : 0 < ms then
|
||||
{ b with config := { b.config with requestTimeout := ⟨ms, h⟩ } }
|
||||
else b
|
||||
|
||||
/--
|
||||
Sets the `User-Agent` header sent with every request.
|
||||
-/
|
||||
def userAgent (b : Client.Builder) (ua : String) : Client.Builder :=
|
||||
{ b with config := { b.config with userAgent := Header.Value.ofString? ua } }
|
||||
|
||||
/--
|
||||
Sets the maximum number of pooled connections per host.
|
||||
-/
|
||||
def maxConnectionsPerHost (b : Client.Builder) (n : Nat) : Client.Builder :=
|
||||
{ b with maxPerHost := n }
|
||||
|
||||
/--
|
||||
Sets the maximum number of redirects to follow automatically.
|
||||
-/
|
||||
def maxRedirects (b : Client.Builder) (n : Nat) : Client.Builder :=
|
||||
{ b with config := { b.config with maxRedirects := n } }
|
||||
|
||||
/--
|
||||
Sets the predicate that decides whether a response status is acceptable.
|
||||
When set, the final response status is passed to `f`; if `f` returns `false`
|
||||
an `IO.Error` is thrown with the numeric status code.
|
||||
-/
|
||||
def validateStatus (b : Client.Builder) (f : Status → Bool) : Client.Builder :=
|
||||
{ b with config := { b.config with validateStatus := some f } }
|
||||
|
||||
/--
|
||||
Builds the `Client`.
|
||||
-/
|
||||
def build (b : Client.Builder) : Async Client := do
|
||||
Agent.Pool.new b.config b.maxPerHost
|
||||
|
||||
end Client.Builder
|
||||
|
||||
/--
|
||||
A request builder bound to a `Client`. Build up headers, query parameters, and body,
|
||||
then dispatch with one of the `send*` methods.
|
||||
-/
|
||||
public structure Client.RequestBuilder where
|
||||
/--
|
||||
The client that will dispatch this request.
|
||||
-/
|
||||
client : Client
|
||||
/--
|
||||
Resolved hostname for this request.
|
||||
-/
|
||||
host : URI.Host
|
||||
/--
|
||||
Target port.
|
||||
-/
|
||||
port : UInt16
|
||||
/--
|
||||
The underlying request builder.
|
||||
-/
|
||||
builder : Request.Builder
|
||||
|
||||
namespace Client.RequestBuilder
|
||||
|
||||
/--
|
||||
Injects a `Host` header if not already present.
|
||||
-/
|
||||
private def withHostHeader (rb : Client.RequestBuilder) : Client.RequestBuilder :=
|
||||
if rb.builder.line.headers.contains Header.Name.host then rb
|
||||
else
|
||||
-- Use the scheme derived from the port to pick the correct default.
|
||||
let scheme := URI.Scheme.ofPort rb.port
|
||||
let defaultPort := URI.Scheme.defaultPort scheme
|
||||
let hostValue :=
|
||||
if rb.port == defaultPort then toString rb.host
|
||||
else s!"{rb.host}:{rb.port}"
|
||||
{ rb with builder := rb.builder.header! "Host" hostValue }
|
||||
|
||||
/--
|
||||
Adds a typed header to the request.
|
||||
-/
|
||||
def header (rb : Client.RequestBuilder) (key : Header.Name) (value : Header.Value) : Client.RequestBuilder :=
|
||||
{ rb with builder := rb.builder.header key value }
|
||||
|
||||
/--
|
||||
Adds a header to the request. Panics if the name or value is invalid.
|
||||
-/
|
||||
def header! (rb : Client.RequestBuilder) (key : String) (value : String) : Client.RequestBuilder :=
|
||||
{ rb with builder := rb.builder.header! key value }
|
||||
|
||||
/--
|
||||
Adds a header to the request. Returns `none` if the name or value is invalid.
|
||||
-/
|
||||
def header? (rb : Client.RequestBuilder) (key : String) (value : String) : Option Client.RequestBuilder := do
|
||||
let builder ← rb.builder.header? key value
|
||||
pure { rb with builder }
|
||||
|
||||
/--
|
||||
Sets the request URI from a string. Panics if the string is not a valid request target.
|
||||
-/
|
||||
def uri! (rb : Client.RequestBuilder) (u : String) : Client.RequestBuilder :=
|
||||
{ rb with builder := rb.builder.uri! u }
|
||||
|
||||
/--
|
||||
Adds a query parameter to the request URI.
|
||||
-/
|
||||
def queryParam (rb : Client.RequestBuilder) (key : String) (value : String) : Client.RequestBuilder :=
|
||||
let newTarget := match rb.builder.line.uri with
|
||||
| .originForm o =>
|
||||
.originForm { o with query := some ((o.query.getD URI.Query.empty).insert key value) }
|
||||
| .absoluteForm af =>
|
||||
.absoluteForm { af with uri := { af.uri with query := af.uri.query.insert key value } }
|
||||
| other => other
|
||||
{ rb with builder := { rb.builder with line := { rb.builder.line with uri := newTarget } } }
|
||||
|
||||
/--
|
||||
Sends the request with an empty body.
|
||||
-/
|
||||
def send (rb : Client.RequestBuilder) : Async (Response Body.Incoming) := do
|
||||
let rb := rb.withHostHeader
|
||||
rb.client.send rb.host rb.port (← rb.builder.blank)
|
||||
|
||||
/--
|
||||
Sends the request with a plain-text body. Sets `Content-Type: text/plain; charset=utf-8`.
|
||||
-/
|
||||
def text (rb : Client.RequestBuilder) (content : String) : Async (Response Body.Incoming) := do
|
||||
let rb := rb.withHostHeader
|
||||
rb.client.send rb.host rb.port (← rb.builder.text content)
|
||||
|
||||
/--
|
||||
Sends the request with a JSON body. Sets `Content-Type: application/json`.
|
||||
-/
|
||||
def json (rb : Client.RequestBuilder) (content : String) : Async (Response Body.Incoming) := do
|
||||
let rb := rb.withHostHeader
|
||||
rb.client.send rb.host rb.port (← rb.builder.json content)
|
||||
|
||||
/--
|
||||
Sends the request with a raw binary body. Sets `Content-Type: application/octet-stream`.
|
||||
-/
|
||||
def bytes (rb : Client.RequestBuilder) (content : ByteArray) : Async (Response Body.Incoming) := do
|
||||
let rb := rb.withHostHeader
|
||||
rb.client.send rb.host rb.port (← rb.builder.bytes content)
|
||||
|
||||
/--
|
||||
Sends the request with a streaming body produced by `gen`.
|
||||
-/
|
||||
def sendStream (rb : Client.RequestBuilder) (gen : Body.Outgoing → Async Unit) : Async (Response Body.Incoming) := do
|
||||
let rb := rb.withHostHeader
|
||||
rb.client.send rb.host rb.port (← rb.builder.stream gen)
|
||||
|
||||
end Client.RequestBuilder
|
||||
|
||||
/--
|
||||
Returns a `Client.Builder` with default configuration.
|
||||
-/
|
||||
def builder : Client.Builder := {}
|
||||
|
||||
/--
|
||||
Parses `url` into `(host, port, origin-form target)`.
|
||||
Returns `none` if the URL is invalid or has no authority component.
|
||||
-/
|
||||
private def mkRequest
|
||||
(method : Request.Builder → Request.Builder)
|
||||
(client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
let target : RequestTarget :=
|
||||
.originForm (RequestTarget.OriginForm.mk url.path (if url.query.isEmpty then none else some url.query))
|
||||
{ client, host := url.host, port := url.port,
|
||||
builder := method (Request.new |>.uri target) }
|
||||
|
||||
/--
|
||||
Creates a GET request builder for `url`.
|
||||
-/
|
||||
def get (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .get) client url
|
||||
|
||||
/--
|
||||
Creates a POST request builder for `url`.
|
||||
-/
|
||||
def post (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .post) client url
|
||||
|
||||
/--
|
||||
Creates a PUT request builder for `url`.
|
||||
-/
|
||||
def put (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .put) client url
|
||||
|
||||
/--
|
||||
Creates a DELETE request builder for `url`.
|
||||
-/
|
||||
def delete (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .delete) client url
|
||||
|
||||
/--
|
||||
Creates a PATCH request builder for `url`.
|
||||
-/
|
||||
def patch (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .patch) client url
|
||||
|
||||
/--
|
||||
Creates a HEAD request builder for `url`.
|
||||
-/
|
||||
def head (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .head) client url
|
||||
|
||||
/--
|
||||
Creates an OPTIONS request builder for `url`.
|
||||
-/
|
||||
def options (client : Client) (url : URI.AuthorityForm) : Client.RequestBuilder :=
|
||||
mkRequest (·.method .options) client url
|
||||
|
||||
end Client
|
||||
end Http
|
||||
end Std
|
||||
506
src/Std/Internal/Http/Client/Agent.lean
Normal file
506
src/Std/Internal/Http/Client/Agent.lean
Normal file
@@ -0,0 +1,506 @@
|
||||
/-
|
||||
Copyright (c) 2026 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Client.Session
|
||||
public import Std.Internal.Http.Data.Cookie
|
||||
import Init.Data.Array
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Http
|
||||
namespace Client
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
open Std Internal IO Async TCP Protocol
|
||||
open Time
|
||||
|
||||
/-!
|
||||
# Agent
|
||||
|
||||
This module defines `Client.Agent`, a transport-agnostic HTTP user-agent that wraps a `Session`
|
||||
and adds automatic redirect following, cookie jar support, response interceptors, and configurable
|
||||
retries.
|
||||
|
||||
`Agent` is parameterized by the transport type `α` and contains no TCP-specific code.
|
||||
Use `Agent.ofTransport` to create an `Agent` from any connected transport. Pass a `connectTo`
|
||||
factory to enable cross-host redirect following and automatic reconnection on error.
|
||||
|
||||
On each redirect the `Location` header is parsed as a URI. If the redirect targets a different
|
||||
host or port the agent closes the current session and opens a new one using `connectTo` (when
|
||||
available). A `Array URI` tracks every URI visited in the current redirect chain so that cycles
|
||||
are detected and broken immediately.
|
||||
|
||||
When crossing to a different host the `Authorization` header is stripped from the redirected
|
||||
request to prevent credential leakage.
|
||||
-/
|
||||
|
||||
/--
|
||||
An HTTP user-agent that manages a connection to a host. It follows redirects, maintains a cookie
|
||||
jar for automatic cookie handling, applies response interceptors, and retries on connection errors.
|
||||
-/
|
||||
public structure Agent (α : Type) where
|
||||
/--
|
||||
The underlying HTTP session over the transport.
|
||||
-/
|
||||
session : Session α
|
||||
|
||||
/--
|
||||
URI scheme for this connection (e.g., `"http"` or `"https"`).
|
||||
Used when constructing absolute-form request URIs for proxy requests and some redirects.
|
||||
-/
|
||||
scheme : URI.Scheme
|
||||
|
||||
/--
|
||||
The hostname this agent is currently connected to.
|
||||
-/
|
||||
host : URI.Host
|
||||
|
||||
/--
|
||||
The port this agent is currently connected to.
|
||||
-/
|
||||
port : UInt16
|
||||
|
||||
/--
|
||||
URIs visited in the current redirect chain, used to detect cycles.
|
||||
-/
|
||||
redirectHistory : Array URI
|
||||
|
||||
/--
|
||||
Cookie jar shared across all requests and redirects through this agent.
|
||||
-/
|
||||
cookieJar : Cookie.Jar
|
||||
|
||||
/--
|
||||
Response interceptors applied (in order) after every response, including intermediate
|
||||
redirect responses. Each interceptor receives the response and returns a (possibly
|
||||
modified) response. Interceptors run before cookie processing and redirect evaluation
|
||||
so they can, e.g., unwrap envelopes or transparently decompress bodies.
|
||||
-/
|
||||
interceptors : Array (Response Body.Incoming → Async (Response Body.Incoming)) := #[]
|
||||
|
||||
/--
|
||||
Optional factory for opening a new session to `(host, port)`. Used for:
|
||||
* Automatic retry after connection errors (`maxRetries`): reconnects to the same host.
|
||||
* Cross-host redirects: connects to the new host.
|
||||
`none` for agents created via `Agent.ofTransport` without a factory; cross-host redirects
|
||||
are not followed and connection errors are not retried automatically for such agents.
|
||||
-/
|
||||
connectTo : Option (URI.Host → UInt16 → Async (Session α)) := none
|
||||
|
||||
namespace Agent
|
||||
|
||||
/--
|
||||
Rewrites an origin-form request target to absolute-form for proxy forwarding.
|
||||
`GET /path?q=1 HTTP/1.1` becomes `GET http://host:port/path?q=1 HTTP/1.1`.
|
||||
No-op for targets that are already in absolute-form or do not carry a path.
|
||||
-/
|
||||
def toAbsoluteFormRequest
|
||||
(request : Request Body.AnyBody)
|
||||
(scheme : URI.Scheme) (host : URI.Host) (port : UInt16) : Request Body.AnyBody :=
|
||||
match request.line.uri with
|
||||
| .originForm o =>
|
||||
let pathStr := toString o.path
|
||||
let queryStr := match o.query with | none => "" | some q => toString q
|
||||
let portStr := if port == URI.Scheme.defaultPort scheme then "" else s!":{port}"
|
||||
let urlStr := s!"{scheme}://{host}{portStr}{pathStr}{queryStr}"
|
||||
match RequestTarget.parse? urlStr with
|
||||
| some target => { request with line := { request.line with uri := target } }
|
||||
| none => request
|
||||
| _ => request
|
||||
|
||||
/--
|
||||
Creates an `Agent` from an already-connected transport `socket`.
|
||||
Pass a `connectTo` factory to enable automatic reconnection on error and cross-host redirect
|
||||
following; omit it (or pass `none`) to disable both.
|
||||
-/
|
||||
def ofTransport [Transport α] (socket : α) (scheme : URI.Scheme)
|
||||
(host : URI.Host) (port : UInt16)
|
||||
(connectTo : Option (URI.Host → UInt16 → Async (Session α)) := none)
|
||||
(config : Config := {}) : Async (Agent α) := do
|
||||
let session ← Session.new socket config
|
||||
let cookieJar ← Cookie.Jar.new
|
||||
pure { session, scheme, host, port, redirectHistory := #[], cookieJar, connectTo }
|
||||
|
||||
/--
|
||||
Injects matching cookies from `cookieJar` into the request headers for `host`.
|
||||
Does nothing if the jar contains no matching cookies.
|
||||
-/
|
||||
def injectCookies (cookieJar : Cookie.Jar) (host : URI.Host)
|
||||
(request : Request Body.AnyBody) : Async (Request Body.AnyBody) := do
|
||||
let path := match request.line.uri with
|
||||
| .originForm o => toString o.path
|
||||
| .absoluteForm af => toString af.uri.path
|
||||
| _ => "/"
|
||||
let cookies ← cookieJar.cookiesFor host path
|
||||
match Cookie.Jar.toCookieHeader cookies with
|
||||
| none => return request
|
||||
| some cookieValue =>
|
||||
let newHeaders :=
|
||||
request.line.headers.insert Header.Name.cookie (Header.Value.ofString! cookieValue)
|
||||
return { request with line := { request.line with headers := newHeaders } }
|
||||
|
||||
/--
|
||||
Reads all `Set-Cookie` headers from `responseHeaders` and stores the cookies in `cookieJar`.
|
||||
-/
|
||||
def processCookies (cookieJar : Cookie.Jar) (host : URI.Host)
|
||||
(responseHeaders : Headers) : BaseIO Unit := do
|
||||
if let some values := responseHeaders.getAll? Header.Name.setCookie then
|
||||
for v in values do
|
||||
cookieJar.processSetCookie host v.value
|
||||
|
||||
/--
|
||||
Applies all response interceptors to `response` in order, returning the final result.
|
||||
-/
|
||||
def applyInterceptors
|
||||
(interceptors : Array (Response Body.Incoming → Async (Response Body.Incoming)))
|
||||
(response : Response Body.Incoming) : Async (Response Body.Incoming) :=
|
||||
interceptors.foldlM (init := response) (fun r f => f r)
|
||||
|
||||
/--
|
||||
Outcome of evaluating whether a response should trigger an automatic redirect.
|
||||
-/
|
||||
inductive RedirectDecision where
|
||||
/--
|
||||
Response is final, should validate status and return it.
|
||||
-/
|
||||
| done
|
||||
/-- Follow a redirect to `(host, port, scheme)` with `request`, updating `history`. -/
|
||||
| follow (host : URI.Host) (port : UInt16) (scheme : URI.Scheme)
|
||||
(request : Request Body.AnyBody) (history : Array URI)
|
||||
|
||||
/--
|
||||
Inspects `response` and decides whether to follow a redirect.
|
||||
|
||||
Returns `.done` when:
|
||||
- `remaining` is 0 or the response is not a redirection,
|
||||
- the `Location` header is absent,
|
||||
- a redirect cycle is detected, or
|
||||
- the `Location` value cannot be parsed.
|
||||
|
||||
Returns `.follow` with the rewritten request (method, body, and headers adjusted per
|
||||
RFC 9110 §15.4, including `Authorization` stripped on cross-origin hops) when a valid
|
||||
redirect target is found. The response body is drained before returning `.follow`.
|
||||
-/
|
||||
def decideRedirect
|
||||
(remaining : Nat) (history : Array URI)
|
||||
(currentHost : URI.Host) (currentPort : UInt16) (currentScheme : URI.Scheme)
|
||||
(request : Request Body.AnyBody) (response : Response Body.Incoming)
|
||||
: Async RedirectDecision := do
|
||||
if remaining == 0 || !response.line.status.isRedirection then
|
||||
return .done
|
||||
|
||||
let some locationValue := response.line.headers.get? Header.Name.location
|
||||
| return .done
|
||||
|
||||
let locationStr := locationValue.value
|
||||
let locationURI := URI.parse? locationStr
|
||||
|
||||
discard <| ContextAsync.run (response.body.readAll (α := ByteArray))
|
||||
|
||||
if let some uri := locationURI then
|
||||
if history.contains uri then return .done
|
||||
|
||||
let some target := RequestTarget.parse? locationStr
|
||||
| return .done
|
||||
|
||||
let newMethod := match response.line.status with
|
||||
| .seeOther => .get
|
||||
| .movedPermanently | .found =>
|
||||
if request.line.method == .post then .get else request.line.method
|
||||
| _ => request.line.method
|
||||
|
||||
let newBody : Body.AnyBody :=
|
||||
if newMethod == .get || newMethod == .head || newMethod != request.line.method then
|
||||
.empty {}
|
||||
else
|
||||
request.body
|
||||
|
||||
let newHistory := match locationURI with
|
||||
| some uri => history.push uri
|
||||
| none => history
|
||||
|
||||
let (newHost, newPort, newScheme) := match locationURI with
|
||||
| some uri =>
|
||||
if let some auth := uri.authority then
|
||||
let h := auth.host
|
||||
let p : UInt16 := match auth.port with
|
||||
| .value p => p
|
||||
| _ => URI.Scheme.defaultPort uri.scheme
|
||||
(h, p, uri.scheme)
|
||||
else (currentHost, currentPort, currentScheme)
|
||||
| none => (currentHost, currentPort, currentScheme)
|
||||
|
||||
-- Strip Authorization on cross-origin redirects to prevent credential leakage (RFC 9110 §15.4).
|
||||
let crossOrigin := newHost != currentHost || newPort != currentPort
|
||||
let newHeaders :=
|
||||
if crossOrigin then request.line.headers.erase Header.Name.authorization
|
||||
else request.line.headers
|
||||
|
||||
return .follow newHost newPort newScheme
|
||||
{ line := { request.line with uri := target, method := newMethod, headers := newHeaders }
|
||||
body := newBody
|
||||
extensions := request.extensions }
|
||||
newHistory
|
||||
|
||||
private partial def sendWithRedirects [Transport α]
|
||||
(agent : Agent α) (request : Request Body.AnyBody)
|
||||
(remaining : Nat) (retriesLeft : Nat) : Async (Response Body.Incoming) := do
|
||||
-- Rewrite to absolute-form when a proxy is configured.
|
||||
let request :=
|
||||
if agent.session.config.proxy.isSome then
|
||||
toAbsoluteFormRequest request agent.scheme agent.host agent.port
|
||||
else
|
||||
request
|
||||
|
||||
let request ← injectCookies agent.cookieJar agent.host request
|
||||
|
||||
let response ← try agent.session.send request
|
||||
catch err => do
|
||||
-- Connection error: retry with a fresh session if budget and factory allow.
|
||||
if retriesLeft > 0 then
|
||||
if let some factory := agent.connectTo then
|
||||
sleep agent.session.config.retryDelay
|
||||
let newSession ← factory agent.host agent.port
|
||||
return ← sendWithRedirects { agent with session := newSession } request remaining (retriesLeft - 1)
|
||||
throw err
|
||||
|
||||
let response ← applyInterceptors agent.interceptors response
|
||||
processCookies agent.cookieJar agent.host response.line.headers
|
||||
|
||||
match ← decideRedirect remaining agent.redirectHistory agent.host agent.port agent.scheme request response with
|
||||
| .done =>
|
||||
if let some validate := agent.session.config.validateStatus then
|
||||
if !validate response.line.status then
|
||||
throw (.userError s!"unexpected HTTP status: {response.line.status.toCode}")
|
||||
return response
|
||||
| .follow newHost newPort newScheme newRequest newHistory =>
|
||||
if newHost != agent.host || newPort != agent.port then
|
||||
-- For custom transports without a connectTo factory we cannot open a new
|
||||
-- connection to a different host; return the redirect response as-is.
|
||||
let some factory := agent.connectTo | return response
|
||||
let _ ← agent.session.close
|
||||
let newSession ← factory newHost newPort
|
||||
let response ← sendWithRedirects
|
||||
{ session := newSession
|
||||
scheme := newScheme
|
||||
host := newHost
|
||||
port := newPort
|
||||
redirectHistory := newHistory
|
||||
cookieJar := agent.cookieJar
|
||||
interceptors := agent.interceptors
|
||||
connectTo := some factory }
|
||||
newRequest (remaining - 1) retriesLeft
|
||||
-- Close the cross-host session: signals the background loop to shut down after
|
||||
-- the response body is consumed (safe because the channel is only polled once
|
||||
-- the current response is fully delivered and waitingForRequest becomes true).
|
||||
let _ ← newSession.close
|
||||
return response
|
||||
else
|
||||
sendWithRedirects { agent with redirectHistory := newHistory } newRequest (remaining - 1) retriesLeft
|
||||
|
||||
/--
|
||||
Send a request, automatically following redirects up to `config.maxRedirects` hops and
|
||||
retrying on connection errors up to `config.maxRetries` times.
|
||||
For cross-host redirects the agent reconnects using its `connectTo` factory (if set).
|
||||
Cookies are automatically injected from the jar and `Set-Cookie` responses are stored.
|
||||
Response interceptors are applied after every response.
|
||||
-/
|
||||
def send {β : Type} [Coe β Body.AnyBody] [Transport α]
|
||||
(agent : Agent α) (request : Request β) : Async (Response Body.Incoming) :=
|
||||
sendWithRedirects agent
|
||||
{ line := request.line, body := (request.body : Body.AnyBody), extensions := request.extensions }
|
||||
agent.session.config.maxRedirects
|
||||
agent.session.config.maxRetries
|
||||
|
||||
end Agent
|
||||
|
||||
/-!
|
||||
# Agent.RequestBuilder
|
||||
|
||||
A fluent builder that attaches an `Agent` to a `Request.Builder`, letting callers chain header
|
||||
and query-parameter setters before dispatching with a typed `send*` terminal.
|
||||
|
||||
```lean
|
||||
let response ←
|
||||
agent.get "/api/items"
|
||||
|>.header! "Accept" "application/json"
|
||||
|>.queryParam "page" "2"
|
||||
|>.send
|
||||
```
|
||||
-/
|
||||
|
||||
/--
|
||||
A `Request.Builder` bound to a specific `Agent`. Build up headers, query parameters, and body,
|
||||
then call one of the `send*` methods to dispatch the request.
|
||||
-/
|
||||
public structure Agent.RequestBuilder (α : Type) where
|
||||
/--
|
||||
The agent that will send this request.
|
||||
-/
|
||||
agent : Agent α
|
||||
|
||||
/--
|
||||
The underlying request builder.
|
||||
-/
|
||||
builder : Request.Builder
|
||||
|
||||
|
||||
namespace Agent.RequestBuilder
|
||||
|
||||
/--
|
||||
Injects a `Host` header derived from the agent's `host` and `port` if no `Host` header
|
||||
is already present.
|
||||
-/
|
||||
private def withHostHeader [Transport α] (rb : Agent.RequestBuilder α) : Agent.RequestBuilder α :=
|
||||
if rb.builder.line.headers.contains Header.Name.host then
|
||||
rb
|
||||
else
|
||||
let defaultPort := URI.Scheme.defaultPort rb.agent.scheme
|
||||
let hostValue :=
|
||||
if rb.agent.port == defaultPort then toString rb.agent.host
|
||||
else s!"{rb.agent.host}:{rb.agent.port}"
|
||||
{ rb with builder := rb.builder.header! "Host" hostValue }
|
||||
|
||||
/--
|
||||
Injects matching cookies from the agent's jar if no `Cookie` header is already present.
|
||||
-/
|
||||
private def withCookies [Transport α] (rb : Agent.RequestBuilder α) : Async (Agent.RequestBuilder α) := do
|
||||
if rb.builder.line.headers.contains Header.Name.cookie then
|
||||
return rb
|
||||
let path := match rb.builder.line.uri with
|
||||
| .originForm o => toString o.path
|
||||
| .absoluteForm af => toString af.uri.path
|
||||
| _ => "/"
|
||||
let cookies ← rb.agent.cookieJar.cookiesFor rb.agent.host path
|
||||
match Cookie.Jar.toCookieHeader cookies with
|
||||
| none => return rb
|
||||
| some cookieValue =>
|
||||
return { rb with builder := rb.builder.header! "Cookie" cookieValue }
|
||||
|
||||
/--
|
||||
Prepares the builder by injecting Host and Cookie headers, then calls `f` to build
|
||||
and send the request.
|
||||
-/
|
||||
private def prepare [Transport α] (rb : Agent.RequestBuilder α)
|
||||
(f : Agent.RequestBuilder α → Async (Response Body.Incoming)) : Async (Response Body.Incoming) := do
|
||||
let rb := rb.withHostHeader
|
||||
let rb ← rb.withCookies
|
||||
f rb
|
||||
|
||||
/--
|
||||
Adds a typed header to the request.
|
||||
-/
|
||||
def header [Transport α] (rb : Agent.RequestBuilder α) (key : Header.Name) (value : Header.Value) : Agent.RequestBuilder α :=
|
||||
{ rb with builder := rb.builder.header key value }
|
||||
|
||||
/--
|
||||
Adds a header to the request. Panics if the name or value is invalid.
|
||||
-/
|
||||
def header! [Transport α] (rb : Agent.RequestBuilder α) (key : String) (value : String) : Agent.RequestBuilder α :=
|
||||
{ rb with builder := rb.builder.header! key value }
|
||||
|
||||
/--
|
||||
Adds a header to the request. Returns `none` if the name or value is invalid.
|
||||
-/
|
||||
def header? [Transport α] (rb : Agent.RequestBuilder α) (key : String) (value : String) : Option (Agent.RequestBuilder α) := do
|
||||
let builder ← rb.builder.header? key value
|
||||
pure { rb with builder }
|
||||
|
||||
/--
|
||||
Sets the request URI from a string. Panics if the string is not a valid request target.
|
||||
-/
|
||||
def uri! [Transport α] (rb : Agent.RequestBuilder α) (u : String) : Agent.RequestBuilder α :=
|
||||
{ rb with builder := rb.builder.uri! u }
|
||||
|
||||
/--
|
||||
Adds a query parameter to the request URI.
|
||||
Works for both origin-form (e.g. set by `agent.get "/path"`) and absolute-form targets.
|
||||
-/
|
||||
def queryParam [Transport α] (rb : Agent.RequestBuilder α) (key : String) (value : String) : Agent.RequestBuilder α :=
|
||||
let newTarget := match rb.builder.line.uri with
|
||||
| .originForm o =>
|
||||
.originForm { o with query := some ((o.query.getD URI.Query.empty).insert key value) }
|
||||
| .absoluteForm af =>
|
||||
.absoluteForm { af with uri := { af.uri with query := af.uri.query.insert key value } }
|
||||
| other => other
|
||||
{ rb with builder := { rb.builder with line := { rb.builder.line with uri := newTarget } } }
|
||||
|
||||
/--
|
||||
Sends the request with an empty body.
|
||||
-/
|
||||
def send [Transport α] (rb : Agent.RequestBuilder α) : Async (Response Body.Incoming) :=
|
||||
rb.prepare fun rb => do rb.agent.send (← rb.builder.blank)
|
||||
|
||||
/--
|
||||
Sends the request with a plain-text body.
|
||||
Sets `Content-Type: text/plain; charset=utf-8`.
|
||||
-/
|
||||
def text [Transport α] (rb : Agent.RequestBuilder α) (content : String) : Async (Response Body.Incoming) :=
|
||||
rb.prepare fun rb => do rb.agent.send (← rb.builder.text content)
|
||||
|
||||
/--
|
||||
Sends the request with a JSON body.
|
||||
Sets `Content-Type: application/json`.
|
||||
-/
|
||||
def json [Transport α] (rb : Agent.RequestBuilder α) (content : String) : Async (Response Body.Incoming) :=
|
||||
rb.prepare fun rb => do rb.agent.send (← rb.builder.json content)
|
||||
|
||||
/--
|
||||
Sends the request with a raw binary body.
|
||||
Sets `Content-Type: application/octet-stream`.
|
||||
-/
|
||||
def bytes [Transport α] (rb : Agent.RequestBuilder α) (content : ByteArray) : Async (Response Body.Incoming) :=
|
||||
rb.prepare fun rb => do rb.agent.send (← rb.builder.bytes content)
|
||||
|
||||
/--
|
||||
Sends the request with a streaming body produced by `gen`.
|
||||
-/
|
||||
def sendStream [Transport α]
|
||||
(rb : Agent.RequestBuilder α)
|
||||
(gen : Body.Outgoing → Async Unit) : Async (Response Body.Incoming) :=
|
||||
rb.prepare fun rb => do rb.agent.send (← rb.builder.stream gen)
|
||||
|
||||
end Agent.RequestBuilder
|
||||
|
||||
namespace Agent
|
||||
|
||||
/-- Creates a GET request builder for the given path or URL. -/
|
||||
def get [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.get (RequestTarget.parse! path) }
|
||||
|
||||
/-- Creates a POST request builder for the given path or URL. -/
|
||||
def post [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.post (RequestTarget.parse! path) }
|
||||
|
||||
/-- Creates a PUT request builder for the given path or URL. -/
|
||||
def put [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.put (RequestTarget.parse! path) }
|
||||
|
||||
/-- Creates a DELETE request builder for the given path or URL. -/
|
||||
def delete [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.delete (RequestTarget.parse! path) }
|
||||
|
||||
/-- Creates a PATCH request builder for the given path or URL. -/
|
||||
def patch [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.patch (RequestTarget.parse! path) }
|
||||
|
||||
/-- Creates a HEAD request builder for the given path or URL. -/
|
||||
def headReq [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.head (RequestTarget.parse! path) }
|
||||
|
||||
/-- Creates an OPTIONS request builder for the given path or URL. -/
|
||||
def options [Transport α] (agent : Agent α) (path : String) : Agent.RequestBuilder α :=
|
||||
{ agent, builder := Request.options (RequestTarget.parse! path) }
|
||||
|
||||
end Agent
|
||||
|
||||
end Client
|
||||
end Http
|
||||
end Std
|
||||
137
src/Std/Internal/Http/Client/Config.lean
Normal file
137
src/Std/Internal/Http/Client/Config.lean
Normal file
@@ -0,0 +1,137 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.Http.Protocol.H1
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Config
|
||||
|
||||
This module exposes the `Config` structure describing timeouts, connection,
|
||||
and header configurations for an HTTP client.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Client
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
Client connection configuration with validation.
|
||||
-/
|
||||
structure Config where
|
||||
/--
|
||||
Maximum number of requests per connection (for keep-alive).
|
||||
-/
|
||||
maxRequestsPerConnection : Nat := 1000
|
||||
|
||||
/--
|
||||
Maximum number of headers allowed per response.
|
||||
-/
|
||||
maxResponseHeaders : Nat := 200
|
||||
|
||||
/--
|
||||
Maximum size of a single header name in bytes.
|
||||
-/
|
||||
maxHeaderNameSize : Nat := 256
|
||||
|
||||
/--
|
||||
Maximum size of a single header value in bytes.
|
||||
-/
|
||||
maxHeaderValueSize : Nat := 16384
|
||||
|
||||
/--
|
||||
Maximum waiting time for additional data before timing out.
|
||||
-/
|
||||
readTimeout : Time.Millisecond.Offset := 30000
|
||||
|
||||
/--
|
||||
Timeout duration for keep-alive connections.
|
||||
-/
|
||||
keepAliveTimeout : { x : Time.Millisecond.Offset // 0 < x } := ⟨60000, by decide⟩
|
||||
|
||||
/--
|
||||
Timeout for the entire request lifecycle (connect + read + write).
|
||||
-/
|
||||
requestTimeout : { x : Time.Millisecond.Offset // 0 < x } := ⟨120000, by decide⟩
|
||||
|
||||
/--
|
||||
Whether to enable keep-alive connections.
|
||||
-/
|
||||
enableKeepAlive : Bool := true
|
||||
|
||||
/--
|
||||
Maximum number of bytes to receive in a single read call.
|
||||
-/
|
||||
maxRecvChunkSize : Nat := 16384
|
||||
|
||||
/--
|
||||
Default buffer size for request payloads.
|
||||
-/
|
||||
defaultRequestBufferSize : Nat := 16384
|
||||
|
||||
/--
|
||||
The user-agent string to send by default.
|
||||
-/
|
||||
userAgent : Option Header.Value := some (.mk "lean-http/1.1")
|
||||
|
||||
/--
|
||||
Maximum number of redirects to follow automatically.
|
||||
Set to 0 to disable automatic redirect following.
|
||||
-/
|
||||
maxRedirects : Nat := 10
|
||||
|
||||
/--
|
||||
Maximum number of times to retry a request after a connection error.
|
||||
Set to 0 to disable automatic retries.
|
||||
-/
|
||||
maxRetries : Nat := 3
|
||||
|
||||
/--
|
||||
Delay in milliseconds between successive retry attempts.
|
||||
-/
|
||||
retryDelay : Time.Millisecond.Offset := 1000
|
||||
|
||||
/--
|
||||
Optional HTTP proxy address as `(host, port)`.
|
||||
When set, all TCP connections are routed through this proxy and
|
||||
request URIs are rewritten to absolute-form (`GET http://host/path HTTP/1.1`).
|
||||
-/
|
||||
proxy : Option (String × UInt16) := none
|
||||
|
||||
/--
|
||||
Optional predicate that decides whether a response status is acceptable.
|
||||
When `none`, all status codes are accepted (no error is thrown).
|
||||
When `some f`, the final response status is passed to `f`; if `f` returns `false`
|
||||
an `IO.Error` is thrown with the numeric status code.
|
||||
Only applied to the final (non-redirect) response, not intermediate `3xx` responses.
|
||||
|
||||
Example — reject anything outside 2xx:
|
||||
```lean
|
||||
validateStatus := some (fun s => s.toCode / 100 == 2)
|
||||
```
|
||||
-/
|
||||
validateStatus : Option (Status → Bool) := none
|
||||
|
||||
namespace Config
|
||||
|
||||
/--
|
||||
Convert this client config into an HTTP/1.1 protocol configuration.
|
||||
-/
|
||||
def toH1Config (config : Config) : Std.Http.Protocol.H1.Config :=
|
||||
{ maxMessages := config.maxRequestsPerConnection
|
||||
maxHeaders := config.maxResponseHeaders
|
||||
maxHeaderNameLength := config.maxHeaderNameSize
|
||||
maxHeaderValueLength := config.maxHeaderValueSize
|
||||
enableKeepAlive := config.enableKeepAlive
|
||||
agentName := config.userAgent
|
||||
}
|
||||
|
||||
end Config
|
||||
end Std.Http.Client
|
||||
581
src/Std/Internal/Http/Client/Connection.lean
Normal file
581
src/Std/Internal/Http/Client/Connection.lean
Normal file
@@ -0,0 +1,581 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async.TCP
|
||||
public import Std.Internal.Async.ContextAsync
|
||||
public import Std.Internal.Http.Transport
|
||||
public import Std.Internal.Http.Protocol.H1
|
||||
public import Std.Internal.Http.Client.Config
|
||||
public import Std.Sync.Watch
|
||||
|
||||
public section
|
||||
|
||||
namespace Std.Http.Client
|
||||
|
||||
open Std Internal IO Async TCP Protocol
|
||||
open Time
|
||||
|
||||
/--
|
||||
Type-erased body operations for use in the request pipeline.
|
||||
Captures `Reader` and `Writer` methods as closures so the connection state
|
||||
is not parameterized by the body type.
|
||||
-/
|
||||
structure Body.Operations where
|
||||
/--
|
||||
Selector that resolves when a chunk is available or the body reaches EOF.
|
||||
-/
|
||||
recvSelector : Selector (Option Chunk)
|
||||
|
||||
/--
|
||||
Returns `true` when the body is closed for reading.
|
||||
-/
|
||||
isClosed : Async Bool
|
||||
|
||||
/--
|
||||
Closes the body for reading.
|
||||
-/
|
||||
close : Async Unit
|
||||
|
||||
/--
|
||||
Returns the known content length if available.
|
||||
-/
|
||||
getKnownSize : Async (Option Body.Length)
|
||||
|
||||
namespace Body.Operations
|
||||
|
||||
/--
|
||||
Creates a `Body.Operations` from any type with `Body.Reader` and `Body.Writer` instances.
|
||||
-/
|
||||
def of [Body.Reader β] [Body.Writer β] (body : β) : Body.Operations where
|
||||
recvSelector := Body.Reader.recvSelector body
|
||||
isClosed := Body.Reader.isClosed body
|
||||
close := Body.Reader.close body
|
||||
getKnownSize := Body.Writer.getKnownSize body
|
||||
|
||||
end Body.Operations
|
||||
|
||||
/-!
|
||||
# Connection
|
||||
|
||||
This module defines the `Connection.handle` loop, used to manage one persistent HTTP/1.1 client
|
||||
connection and handle sequential request/response exchanges over it.
|
||||
-/
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
A request packet queued to the background connection loop.
|
||||
-/
|
||||
structure RequestPacket where
|
||||
/--
|
||||
The request to send.
|
||||
-/
|
||||
request : Request Body.Operations
|
||||
|
||||
/--
|
||||
Promise resolved with the eventual response.
|
||||
-/
|
||||
responsePromise : IO.Promise (Except Error (Response Body.Incoming))
|
||||
|
||||
/--
|
||||
Watch channel updated with the cumulative number of request-body bytes sent.
|
||||
`none` when the caller does not need upload-progress tracking.
|
||||
-/
|
||||
uploadProgress : Option (Watch UInt64) := none
|
||||
|
||||
/--
|
||||
Watch channel updated with the cumulative number of response bytes received.
|
||||
`none` when the caller does not need download-progress tracking.
|
||||
-/
|
||||
downloadProgress : Option (Watch UInt64) := none
|
||||
|
||||
namespace RequestPacket
|
||||
|
||||
/--
|
||||
Resolve the packet with an error.
|
||||
-/
|
||||
def onError (packet : RequestPacket) (error : Error) : BaseIO Unit :=
|
||||
discard <| packet.responsePromise.resolve (.error error)
|
||||
|
||||
/--
|
||||
Resolve the packet with a response.
|
||||
-/
|
||||
def onResponse (packet : RequestPacket) (response : Response Body.Incoming) : BaseIO Unit :=
|
||||
discard <| packet.responsePromise.resolve (.ok response)
|
||||
|
||||
end RequestPacket
|
||||
|
||||
namespace Connection
|
||||
|
||||
/--
|
||||
Events produced by the async select loop in `pollNextEvent`.
|
||||
Each variant corresponds to one possible outcome of waiting for I/O.
|
||||
-/
|
||||
private inductive Recv
|
||||
| bytes (x : Option ByteArray)
|
||||
| requestBody (x : Option Chunk)
|
||||
| bodyInterest (x : Bool)
|
||||
| packet (x : Option RequestPacket)
|
||||
| timeout
|
||||
| shutdown
|
||||
| close
|
||||
|
||||
/--
|
||||
The set of I/O sources to wait on during a single poll iteration.
|
||||
Each `Option` field is `none` when that source is not currently active.
|
||||
-/
|
||||
private structure PollSources (α : Type) where
|
||||
socket : Option α
|
||||
expect : Option Nat
|
||||
requestBody : Option Body.Operations
|
||||
requestChannel : Option (Std.CloseableChannel RequestPacket)
|
||||
responseBody : Option Body.Outgoing
|
||||
timeout : Millisecond.Offset
|
||||
keepAliveTimeout : Option Millisecond.Offset
|
||||
connectionContext : CancellationContext
|
||||
|
||||
/--
|
||||
All mutable state carried through the connection processing loop.
|
||||
Bundled into a struct so it can be passed to and returned from helper functions.
|
||||
-/
|
||||
private structure ConnectionState where
|
||||
machine : H1.Machine .sending
|
||||
currentTimeout : Millisecond.Offset
|
||||
keepAliveTimeout : Option Millisecond.Offset
|
||||
currentRequest : Option RequestPacket
|
||||
requestBody : Option Body.Operations
|
||||
responseOutgoing : Option Body.Outgoing
|
||||
responseIncoming : Option Body.Incoming
|
||||
requiresData : Bool
|
||||
expectData : Option Nat
|
||||
waitingForRequest : Bool
|
||||
isInformationalResponse : Bool
|
||||
waitingForContinue : Bool
|
||||
pendingRequestBody : Option Body.Operations
|
||||
uploadProgress : Option (Watch UInt64) := none
|
||||
uploadBytes : UInt64 := 0
|
||||
downloadProgress : Option (Watch UInt64) := none
|
||||
downloadBytes : UInt64 := 0
|
||||
|
||||
@[inline]
|
||||
private def requestHasExpectContinue (request : Request Body.Operations) : Bool :=
|
||||
match request.line.headers.getAll? Header.Name.expect with
|
||||
| some #[value] =>
|
||||
match Header.Expect.parse value with
|
||||
| some res => res.expect
|
||||
| none => false
|
||||
| _ => false
|
||||
|
||||
/--
|
||||
Waits for the next I/O event across all active sources described by `sources`.
|
||||
Computes the socket recv size from `config`, then races all active selectables.
|
||||
Returns `.close` on transport errors.
|
||||
-/
|
||||
private def pollNextEvent
|
||||
[Transport α]
|
||||
(config : Config) (sources : PollSources α) : Async Recv := do
|
||||
|
||||
let expectedBytes := sources.expect
|
||||
|>.getD config.defaultRequestBufferSize
|
||||
|>.min config.maxRecvChunkSize
|
||||
|>.toUInt64
|
||||
|
||||
let mut selectables : Array (Selectable Recv) := #[
|
||||
.case sources.connectionContext.doneSelector (fun _ => do
|
||||
let reason ← sources.connectionContext.getCancellationReason
|
||||
match reason with
|
||||
| some .deadline => pure .timeout
|
||||
| _ => pure .shutdown)
|
||||
]
|
||||
|
||||
if let some socket := sources.socket then
|
||||
selectables := selectables.push (.case (Transport.recvSelector socket expectedBytes) (Recv.bytes · |> pure))
|
||||
|
||||
if let some keepAliveTimeout := sources.keepAliveTimeout then
|
||||
selectables := selectables.push (.case (← Selector.sleep keepAliveTimeout) (fun _ => pure .timeout))
|
||||
else
|
||||
selectables := selectables.push (.case (← Selector.sleep sources.timeout) (fun _ => pure .timeout))
|
||||
|
||||
if let some requestBody := sources.requestBody then
|
||||
selectables := selectables.push (.case requestBody.recvSelector (Recv.requestBody · |> pure))
|
||||
|
||||
if let some requestChannel := sources.requestChannel then
|
||||
selectables := selectables.push (.case requestChannel.recvSelector (Recv.packet · |> pure))
|
||||
|
||||
if let some responseBody := sources.responseBody then
|
||||
selectables := selectables.push (.case (Body.Writer.interestSelector responseBody) (Recv.bodyInterest · |> pure))
|
||||
|
||||
try Selectable.one selectables catch _ => pure .close
|
||||
|
||||
/--
|
||||
Processes all H1 events from a single machine step, updating the connection state.
|
||||
Handles keep-alive resets, body-size tracking, `Expect: 100-continue`, and parse errors.
|
||||
Returns the updated state and `true` if a parse failure was encountered.
|
||||
-/
|
||||
private def processH1Events
|
||||
(config : Config)
|
||||
(events : Array (H1.Event .sending))
|
||||
(state : ConnectionState) : Async (ConnectionState × Bool) := do
|
||||
|
||||
let mut st := state
|
||||
let mut sawFailure := false
|
||||
|
||||
for event in events do
|
||||
match event with
|
||||
| .needMoreData expect =>
|
||||
st := { st with requiresData := true, expectData := expect }
|
||||
|
||||
| .needAnswer => pure ()
|
||||
|
||||
| .endHeaders head =>
|
||||
if head.status.isInformational then
|
||||
-- Informational (1xx) responses are interim; do not resolve the caller's
|
||||
-- promise. The machine loops back to read the real response.
|
||||
st := { st with isInformationalResponse := true }
|
||||
|
||||
-- A `100 Continue` response authorises the body: move it from the
|
||||
-- pending slot into `requestBody` so the pump loop starts sending.
|
||||
if head.status == .continue && st.waitingForContinue then
|
||||
st := { st with
|
||||
requestBody := st.pendingRequestBody
|
||||
pendingRequestBody := none
|
||||
waitingForContinue := false
|
||||
}
|
||||
else
|
||||
st := { st with
|
||||
isInformationalResponse := false
|
||||
currentTimeout := config.readTimeout
|
||||
keepAliveTimeout := none
|
||||
}
|
||||
|
||||
-- A non-informational response while we were still waiting for
|
||||
-- `100 Continue`: the server rejected (or bypassed) the expectation.
|
||||
-- Discard the pending body — it must not be sent.
|
||||
if st.waitingForContinue then
|
||||
if let some body := st.pendingRequestBody then
|
||||
if !(← body.isClosed) then body.close
|
||||
st := { st with pendingRequestBody := none, waitingForContinue := false }
|
||||
|
||||
if let some body := st.responseOutgoing then
|
||||
if let some length := head.getSize true then
|
||||
Body.Writer.setKnownSize body (some length)
|
||||
|
||||
if let some packet := st.currentRequest then
|
||||
if let some incoming := st.responseIncoming then
|
||||
packet.onResponse { line := head, body := incoming }
|
||||
|
||||
| .closeBody =>
|
||||
-- Skip closing for informational (1xx) responses; the channel stays
|
||||
-- open for the real response body that follows.
|
||||
if !st.isInformationalResponse then
|
||||
if let some body := st.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
|
||||
| .next =>
|
||||
-- Reset all per-request state for the next pipelined request.
|
||||
if let some body := st.requestBody then
|
||||
if ¬(← body.isClosed) then body.close
|
||||
|
||||
if let some body := st.pendingRequestBody then
|
||||
if ¬(← body.isClosed) then body.close
|
||||
|
||||
if let some body := st.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
|
||||
if let some w := st.uploadProgress then Watch.close w
|
||||
if let some w := st.downloadProgress then Watch.close w
|
||||
|
||||
st := { st with
|
||||
requestBody := none
|
||||
pendingRequestBody := none
|
||||
waitingForContinue := false
|
||||
responseOutgoing := none
|
||||
responseIncoming := none
|
||||
currentRequest := none
|
||||
isInformationalResponse := false
|
||||
waitingForRequest := true
|
||||
keepAliveTimeout := some config.keepAliveTimeout.val
|
||||
currentTimeout := config.keepAliveTimeout.val
|
||||
uploadProgress := none
|
||||
uploadBytes := 0
|
||||
downloadProgress := none
|
||||
downloadBytes := 0
|
||||
}
|
||||
|
||||
| .failed err =>
|
||||
if let some packet := st.currentRequest then
|
||||
packet.onError (.userError (toString err))
|
||||
sawFailure := true
|
||||
|
||||
| .«continue» => pure ()
|
||||
|
||||
| .close => pure ()
|
||||
|
||||
return (st, sawFailure)
|
||||
|
||||
/--
|
||||
Computes the active `PollSources` for the current connection state.
|
||||
Determines which I/O sources need attention and whether to include the socket.
|
||||
-/
|
||||
private def buildPollSources
|
||||
[Transport α]
|
||||
(socket : α) (requestChannel : Std.CloseableChannel RequestPacket)
|
||||
(connectionContext : CancellationContext) (state : ConnectionState)
|
||||
: Async (PollSources α) := do
|
||||
let requestBodySource ←
|
||||
if let some body := state.requestBody then
|
||||
if ¬(← body.isClosed) then pure (some body) else pure none
|
||||
else
|
||||
pure none
|
||||
|
||||
let responseBodySource ←
|
||||
if state.machine.canPullBodyNow then
|
||||
if let some body := state.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then pure (some body) else pure none
|
||||
else
|
||||
pure none
|
||||
else
|
||||
pure none
|
||||
|
||||
let pollSocket :=
|
||||
state.requiresData ∨
|
||||
state.machine.writer.sentMessage ∨
|
||||
!state.waitingForRequest ∨
|
||||
requestBodySource.isSome ∨
|
||||
state.machine.canPullBody
|
||||
|
||||
return {
|
||||
socket := if pollSocket then some socket else none
|
||||
expect := state.expectData
|
||||
requestBody := requestBodySource
|
||||
requestChannel := if state.waitingForRequest then some requestChannel else none
|
||||
responseBody := responseBodySource
|
||||
timeout := state.currentTimeout
|
||||
keepAliveTimeout := state.keepAliveTimeout
|
||||
connectionContext := connectionContext
|
||||
}
|
||||
|
||||
/--
|
||||
Processes a single async I/O event and updates the connection state.
|
||||
Returns the updated state and `true` if the connection should be closed immediately.
|
||||
-/
|
||||
private def handleRecvEvent
|
||||
(config : Config)
|
||||
(event : Recv) (state : ConnectionState) : Async (ConnectionState × Bool) := do
|
||||
|
||||
match event with
|
||||
| .bytes (some bytes) =>
|
||||
let newDownloadBytes := state.downloadBytes + bytes.size.toUInt64
|
||||
if let some w := state.downloadProgress then
|
||||
Watch.send w newDownloadBytes
|
||||
return ({ state with machine := state.machine.feed bytes, downloadBytes := newDownloadBytes }, false)
|
||||
|
||||
| .bytes none =>
|
||||
return ({ state with machine := state.machine.noMoreInput }, false)
|
||||
|
||||
| .requestBody (some chunk) =>
|
||||
let newUploadBytes := state.uploadBytes + chunk.data.size.toUInt64
|
||||
if let some w := state.uploadProgress then
|
||||
Watch.send w newUploadBytes
|
||||
return ({ state with machine := state.machine.sendData #[chunk], uploadBytes := newUploadBytes }, false)
|
||||
|
||||
| .requestBody none =>
|
||||
if let some body := state.requestBody then
|
||||
if ¬(← body.isClosed) then body.close
|
||||
return ({ state with machine := state.machine.userClosedBody, requestBody := none }, false)
|
||||
|
||||
| .bodyInterest interested =>
|
||||
if interested then
|
||||
let (newMachine, pulledChunk) := state.machine.pullBody
|
||||
let mut st := { state with machine := newMachine }
|
||||
|
||||
if let some pulled := pulledChunk then
|
||||
if let some body := st.responseOutgoing then
|
||||
try Body.Writer.send body pulled.chunk pulled.incomplete
|
||||
catch _ => pure ()
|
||||
|
||||
if pulled.final then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
st := { st with responseOutgoing := none }
|
||||
|
||||
return (st, false)
|
||||
else
|
||||
return (state, false)
|
||||
|
||||
| .packet (some packet) =>
|
||||
let mut machine := state.machine.send packet.request.line
|
||||
let mut requestBody : Option Body.Operations := none
|
||||
let mut pendingRequestBody : Option Body.Operations := none
|
||||
let mut waitingForContinue := false
|
||||
|
||||
if requestHasExpectContinue packet.request then
|
||||
-- Defer body pumping until the server sends `100 Continue`, but still
|
||||
-- set the known size so that `Content-Length` is included in the request
|
||||
-- headers (required by RFC 9112; servers need it to fire checkContinue).
|
||||
if let some size ← packet.request.body.getKnownSize then
|
||||
machine := machine.setKnownSize size
|
||||
waitingForContinue := true
|
||||
pendingRequestBody := some packet.request.body
|
||||
else
|
||||
if let some size ← packet.request.body.getKnownSize then
|
||||
machine := machine.setKnownSize size
|
||||
requestBody := some packet.request.body
|
||||
|
||||
let (responseOutgoing, responseIncoming) ← Body.mkChannel
|
||||
|
||||
return ({ state with
|
||||
machine := machine
|
||||
currentRequest := some packet
|
||||
waitingForRequest := false
|
||||
currentTimeout := config.requestTimeout.val
|
||||
keepAliveTimeout := none
|
||||
requestBody := requestBody
|
||||
pendingRequestBody := pendingRequestBody
|
||||
waitingForContinue := waitingForContinue
|
||||
responseOutgoing := some responseOutgoing
|
||||
responseIncoming := some responseIncoming
|
||||
uploadProgress := packet.uploadProgress
|
||||
uploadBytes := 0
|
||||
downloadProgress := packet.downloadProgress
|
||||
downloadBytes := 0
|
||||
}, false)
|
||||
|
||||
| .packet none => return (state, true)
|
||||
|
||||
| .close => return (state, true)
|
||||
|
||||
| .timeout =>
|
||||
if let some packet := state.currentRequest then
|
||||
packet.onError (.userError "request timeout")
|
||||
if let some body := state.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
if let some w := state.uploadProgress then Watch.close w
|
||||
if let some w := state.downloadProgress then Watch.close w
|
||||
return ({ state with
|
||||
machine := state.machine.closeWriter.closeReader.noMoreInput
|
||||
currentRequest := none
|
||||
responseOutgoing := none
|
||||
uploadProgress := none
|
||||
downloadProgress := none
|
||||
}, false)
|
||||
|
||||
| .shutdown =>
|
||||
if let some packet := state.currentRequest then
|
||||
packet.onError (.userError "connection shutdown")
|
||||
if let some body := state.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
if let some w := state.uploadProgress then Watch.close w
|
||||
if let some w := state.downloadProgress then Watch.close w
|
||||
return ({ state with
|
||||
machine := state.machine.closeWriter.closeReader.noMoreInput
|
||||
currentRequest := none
|
||||
responseOutgoing := none
|
||||
uploadProgress := none
|
||||
downloadProgress := none
|
||||
}, false)
|
||||
|
||||
/--
|
||||
Runs the main request/response processing loop for a single connection.
|
||||
Drives the HTTP/1.1 state machine through four phases each iteration:
|
||||
close finished readers, send buffered output, process H1 events, poll for I/O.
|
||||
-/
|
||||
protected def handle
|
||||
[Transport α]
|
||||
(socket : α)
|
||||
(machine : H1.Machine .sending)
|
||||
(config : Config)
|
||||
(connectionContext : CancellationContext)
|
||||
(requestChannel : Std.CloseableChannel RequestPacket) : Async Unit := do
|
||||
|
||||
let mut state : ConnectionState := {
|
||||
machine := machine
|
||||
currentTimeout := config.keepAliveTimeout.val
|
||||
keepAliveTimeout := some config.keepAliveTimeout.val
|
||||
currentRequest := none
|
||||
requestBody := none
|
||||
responseOutgoing := none
|
||||
responseIncoming := none
|
||||
requiresData := false
|
||||
expectData := none
|
||||
waitingForRequest := true
|
||||
isInformationalResponse := false
|
||||
waitingForContinue := false
|
||||
pendingRequestBody := none
|
||||
}
|
||||
|
||||
while ¬state.machine.halted do
|
||||
|
||||
-- Phase 1: close any reader that the user has signalled is done.
|
||||
if let some body := state.requestBody then
|
||||
if ← body.isClosed then
|
||||
state := { state with machine := state.machine.userClosedBody, requestBody := none }
|
||||
|
||||
-- Phase 2: advance the state machine and flush any output.
|
||||
let (newMachine, step) := state.machine.step
|
||||
state := { state with machine := newMachine }
|
||||
|
||||
if step.output.size > 0 then
|
||||
try Transport.sendAll socket #[step.output.toByteArray]
|
||||
catch _ =>
|
||||
if let some packet := state.currentRequest then
|
||||
packet.onError (.userError "connection write failed")
|
||||
if let some body := state.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
state := { state with
|
||||
machine := state.machine.closeWriter.closeReader.noMoreInput
|
||||
currentRequest := none
|
||||
responseOutgoing := none
|
||||
}
|
||||
break
|
||||
|
||||
-- Phase 3: process all events emitted by this step.
|
||||
let (newState, sawFailure) ← processH1Events config step.events state
|
||||
state := newState
|
||||
if sawFailure then break
|
||||
|
||||
-- Phase 4: wait for the next IO event when any source needs attention.
|
||||
if state.requiresData ∨ state.waitingForRequest ∨ state.currentRequest.isSome ∨ state.requestBody.isSome ∨ state.machine.canPullBody then
|
||||
let sources ← buildPollSources socket requestChannel connectionContext state
|
||||
state := { state with requiresData := false }
|
||||
let event ← pollNextEvent config sources
|
||||
let (newState, shouldClose) ← handleRecvEvent config event state
|
||||
state := newState
|
||||
if shouldClose then break
|
||||
|
||||
-- Clean up: notify any in-flight request and close all open streams.
|
||||
if let some packet := state.currentRequest then
|
||||
packet.onError (.userError "connection closed")
|
||||
|
||||
if let some w := state.uploadProgress then
|
||||
Watch.close w
|
||||
|
||||
if let some w := state.downloadProgress then
|
||||
Watch.close w
|
||||
|
||||
if let some body := state.responseOutgoing then
|
||||
if ¬(← Body.Writer.isClosed body) then Body.Writer.close body
|
||||
|
||||
if let some body := state.requestBody then
|
||||
if ¬(← body.isClosed) then body.close
|
||||
|
||||
if let some body := state.pendingRequestBody then
|
||||
if ¬(← body.isClosed) then body.close
|
||||
|
||||
discard <| EIO.toBaseIO requestChannel.close
|
||||
|
||||
-- Drain any remaining queued packets.
|
||||
repeat do
|
||||
match ← requestChannel.tryRecv with
|
||||
| some packet => packet.onError (.userError "connection closed")
|
||||
| none => break
|
||||
|
||||
Transport.close socket
|
||||
|
||||
end Connection
|
||||
|
||||
end Std.Http.Client
|
||||
221
src/Std/Internal/Http/Client/Pool.lean
Normal file
221
src/Std/Internal/Http/Client/Pool.lean
Normal file
@@ -0,0 +1,221 @@
|
||||
/-
|
||||
Copyright (c) 2026 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Client.Agent
|
||||
import Std.Internal.Async.DNS
|
||||
import Std.Data.HashMap
|
||||
import Init.Data.Array
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Http
|
||||
namespace Client
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
open Std Internal IO Async TCP Protocol
|
||||
open Time
|
||||
|
||||
/-!
|
||||
# Agent.Pool
|
||||
|
||||
A connection pool that maintains multiple reusable sessions per `(host, port)` pair,
|
||||
enabling parallel request pipelines to the same host.
|
||||
|
||||
Use `Pool.new` to create a pool with a shared configuration and cookie jar, then call
|
||||
`pool.send` to dispatch requests through managed sessions.
|
||||
|
||||
```lean
|
||||
let pool ← Agent.Pool.new (maxPerHost := 4)
|
||||
|
||||
-- requests are distributed across up to 4 connections per host
|
||||
let r1 ← pool.send "api.example.com" 80
|
||||
(Request.get (.originForm! "/a") |>.header! "Host" "api.example.com" |>.blank)
|
||||
```
|
||||
-/
|
||||
|
||||
/--
|
||||
Resolves `host` via DNS, opens a TCP socket to `port`, and creates an HTTP session.
|
||||
When `config.proxy` is set the TCP connection is made to the proxy address instead.
|
||||
-/
|
||||
private def createTcpSession (host : URI.Host) (port : UInt16) (config : Config) : Async (Session Socket.Client) := do
|
||||
let (connectHost, connectPort) := config.proxy.getD (toString host, port)
|
||||
let addrs ← DNS.getAddrInfo connectHost (toString connectPort)
|
||||
|
||||
if addrs.isEmpty then
|
||||
throw (IO.userError s!"could not resolve host: {connectHost.quote}")
|
||||
|
||||
-- Try each resolved address in order; return on first successful connect.
|
||||
-- This handles hosts that resolve to both IPv6 (::1) and IPv4 (127.0.0.1).
|
||||
let mut lastErr : IO.Error := IO.userError s!"could not connect to {connectHost.quote}:{connectPort}"
|
||||
for ipAddr in addrs do
|
||||
let socketAddr : Std.Net.SocketAddress := match ipAddr with
|
||||
| .v4 ip => .v4 ⟨ip, connectPort⟩
|
||||
| .v6 ip => .v6 ⟨ip, connectPort⟩
|
||||
try
|
||||
let socket ← Socket.Client.mk
|
||||
let _ ← socket.connect socketAddr
|
||||
return ← Session.new socket config
|
||||
catch err =>
|
||||
lastErr := err
|
||||
|
||||
throw lastErr
|
||||
|
||||
/--
|
||||
A connection pool that manages multiple sessions per `(host, port)` pair.
|
||||
Each value in the map is an array of live sessions paired with a round-robin counter.
|
||||
-/
|
||||
public structure Agent.Pool where
|
||||
/--
|
||||
Per-host session lists and round-robin counters, guarded by a mutex.
|
||||
-/
|
||||
state : Mutex (Std.HashMap (String × UInt16) (Array (Session Socket.Client) × Nat))
|
||||
|
||||
/--
|
||||
Maximum number of sessions (connections) per host.
|
||||
-/
|
||||
maxPerHost : Nat
|
||||
|
||||
/--
|
||||
Configuration used when creating new sessions.
|
||||
-/
|
||||
config : Config
|
||||
|
||||
/--
|
||||
Cookie jar shared across all sessions in the pool.
|
||||
-/
|
||||
cookieJar : Cookie.Jar
|
||||
|
||||
/--
|
||||
Response interceptors applied (in order) after every response from any session in the pool.
|
||||
-/
|
||||
interceptors : Array (Response Body.Incoming → Async (Response Body.Incoming)) := #[]
|
||||
|
||||
namespace Agent.Pool
|
||||
|
||||
/--
|
||||
Creates a new, empty connection pool.
|
||||
-/
|
||||
def new (config : Config := {}) (maxPerHost : Nat := 4) : Async Agent.Pool := do
|
||||
let state ← Mutex.new (∅ : Std.HashMap (String × UInt16) (Array (Session Socket.Client) × Nat))
|
||||
let cookieJar ← Cookie.Jar.new
|
||||
pure { state, maxPerHost, config, cookieJar }
|
||||
|
||||
/--
|
||||
Returns a session for `(host, port)`, reusing an existing one when available or
|
||||
creating a new one when the pool has room. Uses round-robin scheduling.
|
||||
-/
|
||||
def getOrCreateSession (pool : Agent.Pool) (host : URI.Host) (port : UInt16) : Async (Session Socket.Client) := do
|
||||
-- Fast path: pick an existing session round-robin.
|
||||
let maybeSession ← pool.state.atomically do
|
||||
let st ← MonadState.get
|
||||
let (sessions, idx) := (st.get? (toString host, port)).getD (#[], 0)
|
||||
match sessions[idx % sessions.size]? with
|
||||
| none => return none
|
||||
| some selected =>
|
||||
MonadState.set (st.insert (toString host, port) (sessions, idx + 1))
|
||||
return some selected
|
||||
|
||||
if let some session := maybeSession then
|
||||
return session
|
||||
|
||||
-- Slow path: create a new session and register it.
|
||||
let session ← createTcpSession host port pool.config
|
||||
pool.state.atomically do
|
||||
let st ← MonadState.get
|
||||
let (sessions, idx) := (st.get? (toString host, port)).getD (#[], 0)
|
||||
-- Respect maxPerHost: only register if we are still under the limit.
|
||||
if sessions.size < pool.maxPerHost then
|
||||
MonadState.set (st.insert (toString host, port) (sessions.push session, idx))
|
||||
-- If over the limit (concurrent creation race), this session is still
|
||||
-- returned for the current request but not stored for future reuse.
|
||||
return session
|
||||
|
||||
/--
|
||||
Removes all sessions for `(host, port)` from the pool.
|
||||
Called when a connection error is detected so the next request gets a fresh connection.
|
||||
-/
|
||||
private def evictSessions (pool : Agent.Pool) (host : URI.Host) (port : UInt16) : Async Unit := do
|
||||
pool.state.atomically do
|
||||
let st ← MonadState.get
|
||||
MonadState.set (st.erase (toString host, port))
|
||||
|
||||
private partial def sendPoolWithRedirects
|
||||
(pool : Agent.Pool) (host : URI.Host) (port : UInt16)
|
||||
(request : Request Body.AnyBody)
|
||||
(remaining : Nat)
|
||||
(retriesLeft : Nat)
|
||||
(redirectHistory : Array URI) : Async (Response Body.Incoming) := do
|
||||
let request :=
|
||||
if pool.config.proxy.isSome then
|
||||
Agent.toAbsoluteFormRequest request (URI.Scheme.ofPort port) host port
|
||||
else
|
||||
request
|
||||
|
||||
let request ← Agent.injectCookies pool.cookieJar host request
|
||||
|
||||
let session ← pool.getOrCreateSession host port
|
||||
let response ← try session.send request
|
||||
catch err => do
|
||||
evictSessions pool host port
|
||||
if retriesLeft > 0 then
|
||||
sleep pool.config.retryDelay
|
||||
return ← sendPoolWithRedirects pool host port request remaining (retriesLeft - 1) redirectHistory
|
||||
throw err
|
||||
|
||||
let response ← Agent.applyInterceptors pool.interceptors response
|
||||
Agent.processCookies pool.cookieJar host response.line.headers
|
||||
|
||||
match ← Agent.decideRedirect remaining redirectHistory host port (URI.Scheme.ofPort port) request response with
|
||||
| .done =>
|
||||
if let some validate := pool.config.validateStatus then
|
||||
if !validate response.line.status then
|
||||
throw (.userError s!"unexpected HTTP status: {response.line.status.toCode}")
|
||||
return response
|
||||
| .follow newHost newPort _ newRequest newHistory =>
|
||||
sendPoolWithRedirects pool newHost newPort newRequest (remaining - 1) retriesLeft newHistory
|
||||
|
||||
/--
|
||||
Sends a request through a pooled session for `(host, port)`, injecting cookies from the
|
||||
shared jar, applying response interceptors, storing any `Set-Cookie` responses, following
|
||||
redirects up to `config.maxRedirects` hops, and evicting dead sessions on connection
|
||||
failure (retrying up to `config.maxRetries` times).
|
||||
-/
|
||||
def send {β : Type} [Coe β Body.AnyBody]
|
||||
(pool : Agent.Pool) (host : URI.Host) (port : UInt16)
|
||||
(request : Request β) : Async (Response Body.Incoming) :=
|
||||
sendPoolWithRedirects pool host port
|
||||
{ line := request.line, body := (request.body : Body.AnyBody), extensions := request.extensions }
|
||||
pool.config.maxRedirects
|
||||
pool.config.maxRetries
|
||||
#[]
|
||||
|
||||
end Agent.Pool
|
||||
|
||||
namespace Agent
|
||||
|
||||
/--
|
||||
Resolves `host` via DNS and establishes a TCP connection on `port`, returning a new
|
||||
`Agent Socket.Client`. Throws if DNS resolution returns no addresses.
|
||||
|
||||
When `config.proxy` is set every connection (including cross-host redirects) is routed
|
||||
through the proxy.
|
||||
-/
|
||||
def connect (host : URI.Host) (port : UInt16) (config : Config := {}) : Async (Agent Socket.Client) := do
|
||||
let session ← createTcpSession host port config
|
||||
let cookieJar ← Cookie.Jar.new
|
||||
let scheme := URI.Scheme.ofPort port
|
||||
pure { session, scheme, host, port, redirectHistory := #[], cookieJar,
|
||||
connectTo := some (fun h p => createTcpSession h p config) }
|
||||
|
||||
end Agent
|
||||
|
||||
end Client
|
||||
end Http
|
||||
end Std
|
||||
101
src/Std/Internal/Http/Client/Session.lean
Normal file
101
src/Std/Internal/Http/Client/Session.lean
Normal file
@@ -0,0 +1,101 @@
|
||||
/-
|
||||
Copyright (c) 2026 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Client.Connection
|
||||
|
||||
public section
|
||||
|
||||
namespace Std.Http.Client
|
||||
|
||||
open Std Internal IO Async TCP Protocol
|
||||
open Time
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/-!
|
||||
# Session
|
||||
|
||||
This module defines `Client.Session`, an HTTP/1.1 client session that manages a single
|
||||
persistent connection and dispatches sequential request/response exchanges over it.
|
||||
A background task drives the `Connection` loop; callers interact through a channel.
|
||||
-/
|
||||
|
||||
/--
|
||||
An HTTP client session that sends sequential requests over a persistent connection.
|
||||
-/
|
||||
public structure Session (α : Type) where
|
||||
/--
|
||||
Queue of requests sent by users.
|
||||
-/
|
||||
requestChannel : Std.CloseableChannel RequestPacket
|
||||
|
||||
/--
|
||||
Resolves when the background loop exits.
|
||||
-/
|
||||
shutdown : IO.Promise Unit
|
||||
|
||||
/--
|
||||
Configuration for this session.
|
||||
-/
|
||||
config : Config
|
||||
|
||||
namespace Session
|
||||
|
||||
/--
|
||||
Queue a request and await its response.
|
||||
-/
|
||||
def send [Transport α] {β : Type} [Body.Reader β] [Body.Writer β]
|
||||
(session : Session α) (request : Request β) : Async (Response Body.Incoming) := do
|
||||
let responsePromise ← IO.Promise.new
|
||||
|
||||
let task ← session.requestChannel.send {
|
||||
request := { line := request.line, body := Body.Operations.of request.body, extensions := request.extensions }
|
||||
responsePromise
|
||||
}
|
||||
|
||||
let .ok _ ← await task
|
||||
| throw (.userError "connection closed, cannot send more requests")
|
||||
|
||||
match ← await responsePromise.result! with
|
||||
| .ok response => pure response
|
||||
| .error error => throw error
|
||||
|
||||
/--
|
||||
Wait for background loop shutdown.
|
||||
-/
|
||||
def waitShutdown (session : Session α) : Async Unit := do
|
||||
await session.shutdown.result!
|
||||
|
||||
/--
|
||||
Close the session's request channel.
|
||||
-/
|
||||
def close (session : Session α) : Async Unit := do
|
||||
discard <| EIO.toBaseIO session.requestChannel.close
|
||||
|
||||
/--
|
||||
Creates an HTTP client session over the given transport and starts its background loop.
|
||||
-/
|
||||
def new [Transport t] (client : t) (config : Config := {}) : Async (Session t) := do
|
||||
let requestChannel ← Std.CloseableChannel.new
|
||||
let shutdown ← IO.Promise.new
|
||||
|
||||
let context ← CancellationContext.new
|
||||
|
||||
background do
|
||||
try
|
||||
Std.Http.Client.Connection.handle client
|
||||
({ config := config.toH1Config } : H1.Machine .sending)
|
||||
config context requestChannel
|
||||
finally
|
||||
discard <| shutdown.resolve ()
|
||||
|
||||
pure { requestChannel, shutdown, config }
|
||||
|
||||
end Session
|
||||
|
||||
end Std.Http.Client
|
||||
@@ -15,6 +15,7 @@ public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Headers
|
||||
public import Std.Internal.Http.Data.URI
|
||||
public import Std.Internal.Http.Data.Body
|
||||
public import Std.Internal.Http.Data.Cookie
|
||||
|
||||
/-!
|
||||
# HTTP Data Types
|
||||
|
||||
342
src/Std/Internal/Http/Data/Cookie.lean
Normal file
342
src/Std/Internal/Http/Data/Cookie.lean
Normal file
@@ -0,0 +1,342 @@
|
||||
/-
|
||||
Copyright (c) 2026 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Internal.Http.Data.URI
|
||||
public import Init.Data.String
|
||||
public import Init.Data.Array.Basic
|
||||
public import Init.Data.List.Basic
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Cookie
|
||||
|
||||
This module defines the `Cookie` and `Jar` types, a minimal RFC 6265-compliant
|
||||
implementation for managing HTTP cookies.
|
||||
|
||||
Cookies are parsed from `Set-Cookie` response headers, stored in a thread-safe jar, and
|
||||
injected as a `Cookie` request header on outgoing requests.
|
||||
|
||||
Supported `Set-Cookie` attributes: `Domain`, `Path`, `Secure`.
|
||||
|
||||
Unsupported: `Expires`, `Max-Age`, `HttpOnly`, `SameSite`. All cookies persist for the
|
||||
lifetime of the jar regardless of any expiry directives.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265
|
||||
-/
|
||||
|
||||
namespace Std.Http
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
open Internal Char
|
||||
|
||||
namespace Cookie
|
||||
|
||||
/--
|
||||
Proposition asserting that a string is a valid cookie name: a non-empty HTTP token.
|
||||
Cookie names are case-sensitive.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-4.1.1
|
||||
-/
|
||||
abbrev IsValidCookieName (s : String) : Prop :=
|
||||
isToken s
|
||||
|
||||
/--
|
||||
A validated HTTP cookie name. Cookie names are case-sensitive HTTP tokens.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-4.1.1
|
||||
-/
|
||||
@[ext]
|
||||
structure Name where
|
||||
/--
|
||||
The cookie name string.
|
||||
-/
|
||||
value : String
|
||||
|
||||
/--
|
||||
Proof that the name is a valid HTTP token.
|
||||
-/
|
||||
isValidCookieName : IsValidCookieName value := by decide
|
||||
deriving BEq, DecidableEq, Repr
|
||||
|
||||
namespace Name
|
||||
|
||||
instance : Inhabited Name where
|
||||
default := ⟨"_", by decide⟩
|
||||
|
||||
/--
|
||||
Attempts to create a `Cookie.Name` from a `String`, returning `none` if the string is
|
||||
not a valid HTTP token or is empty.
|
||||
-/
|
||||
def ofString? (s : String) : Option Name :=
|
||||
let val := s.trimAscii.toString
|
||||
if h : IsValidCookieName val then
|
||||
some ⟨val, h⟩
|
||||
else
|
||||
none
|
||||
|
||||
/--
|
||||
Creates a `Cookie.Name` from a string, panicking if the string is not a valid HTTP token.
|
||||
-/
|
||||
def ofString! (s : String) : Name :=
|
||||
match ofString? s with
|
||||
| some res => res
|
||||
| none => panic! s!"invalid cookie name: {s.quote}"
|
||||
|
||||
instance : ToString Name where
|
||||
toString n := n.value
|
||||
|
||||
end Name
|
||||
|
||||
/--
|
||||
`cookie-octet = %x21 / %x23-2B / %x2D-3A / %x3C-5B / %x5D-7E`
|
||||
|
||||
US-ASCII visible characters excluding SP, DQUOTE, comma, semicolon, and backslash.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-4.1.1
|
||||
-/
|
||||
def isCookieOctet (c : Char) : Bool :=
|
||||
c = '!' ∨
|
||||
('#' ≤ c ∧ c ≤ '+') ∨
|
||||
('-' ≤ c ∧ c ≤ ':') ∨
|
||||
('<' ≤ c ∧ c ≤ '[') ∨
|
||||
(']' ≤ c ∧ c ≤ '~')
|
||||
|
||||
/--
|
||||
Proposition asserting that a string is a valid cookie value: all characters are
|
||||
`cookie-octet` characters. Empty values are permitted.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-4.1.1
|
||||
-/
|
||||
abbrev IsValidCookieValue (s : String) : Prop :=
|
||||
s.toList.all isCookieOctet
|
||||
|
||||
/--
|
||||
A validated HTTP cookie value. Empty values are permitted.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-4.1.1
|
||||
-/
|
||||
@[ext]
|
||||
structure Value where
|
||||
/--
|
||||
The cookie value string.
|
||||
-/
|
||||
value : String
|
||||
|
||||
/--
|
||||
Proof that the value contains only valid cookie-octet characters.
|
||||
-/
|
||||
isValidCookieValue : IsValidCookieValue value := by decide
|
||||
deriving BEq, DecidableEq, Repr
|
||||
|
||||
namespace Value
|
||||
|
||||
instance : Inhabited Value where
|
||||
default := ⟨"", by decide⟩
|
||||
|
||||
/--
|
||||
Attempts to create a `Cookie.Value` from a `String`, returning `none` if the string
|
||||
contains characters not permitted in cookie values.
|
||||
-/
|
||||
def ofString? (s : String) : Option Value :=
|
||||
let val := s.trimAscii.toString
|
||||
if h : IsValidCookieValue val then
|
||||
some ⟨val, h⟩
|
||||
else
|
||||
none
|
||||
|
||||
/--
|
||||
Creates a `Cookie.Value` from a string, panicking if the string contains characters not
|
||||
permitted in cookie values.
|
||||
-/
|
||||
def ofString! (s : String) : Value :=
|
||||
match ofString? s with
|
||||
| some res => res
|
||||
| none => panic! s!"invalid cookie value: {s.quote}"
|
||||
|
||||
instance : ToString Value where
|
||||
toString v := v.value
|
||||
|
||||
end Value
|
||||
|
||||
end Cookie
|
||||
|
||||
/--
|
||||
An HTTP cookie with its matching attributes.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-4.1
|
||||
-/
|
||||
structure Cookie where
|
||||
/--
|
||||
The cookie name.
|
||||
-/
|
||||
name : Cookie.Name
|
||||
|
||||
/--
|
||||
The cookie value.
|
||||
-/
|
||||
value : Cookie.Value
|
||||
|
||||
/--
|
||||
The effective domain for this cookie. When `Set-Cookie` carries no `Domain` attribute this
|
||||
equals the origin host and `hostOnly` is `true` — only that exact host will receive the
|
||||
cookie. When `Domain` is set, `hostOnly` is `false` and subdomains also match.
|
||||
-/
|
||||
domain : URI.Host
|
||||
|
||||
/--
|
||||
`true` when the cookie must only be sent to the exact origin host (no subdomain matching).
|
||||
-/
|
||||
hostOnly : Bool
|
||||
|
||||
/--
|
||||
Path prefix for which the cookie is valid. Defaults to `"/"`.
|
||||
-/
|
||||
path : URI.Path
|
||||
|
||||
/--
|
||||
When `true` the cookie must only be sent over a secure (HTTPS) channel.
|
||||
-/
|
||||
secure : Bool
|
||||
|
||||
/--
|
||||
A thread-safe HTTP cookie jar.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-5
|
||||
-/
|
||||
structure Cookie.Jar where
|
||||
private mk ::
|
||||
private cookies : Mutex (Array Cookie)
|
||||
|
||||
namespace Cookie.Jar
|
||||
|
||||
/--
|
||||
Creates an empty cookie jar.
|
||||
-/
|
||||
def new : BaseIO Jar := do
|
||||
let cookies ← Mutex.new #[]
|
||||
return .mk cookies
|
||||
|
||||
/--
|
||||
Domain matching per RFC 6265 §5.1.3.
|
||||
-/
|
||||
private def domainMatches (cookieDomain : URI.Host) (hostOnly : Bool) (host : URI.Host) : Bool :=
|
||||
if hostOnly then
|
||||
host == cookieDomain
|
||||
else
|
||||
let d := cookieDomain
|
||||
host == d || (toString host).endsWith ("." ++ toString d)
|
||||
|
||||
/--
|
||||
Path matching per RFC 6265 §5.1.4.
|
||||
-/
|
||||
private def pathMatches (cookiePath : URI.Path) (requestPath : String) : Bool :=
|
||||
let s := toString cookiePath
|
||||
requestPath == s ||
|
||||
(requestPath.startsWith s &&
|
||||
(s.endsWith "/" || requestPath.startsWith (s ++ "/")))
|
||||
|
||||
/--
|
||||
Splits `s` at the first occurrence of `sep`, returning `(before, after)`.
|
||||
Returns `(s, "")` when `sep` does not appear in `s`.
|
||||
-/
|
||||
private def splitFirst (s : String) (sep : String) : String × String :=
|
||||
match s.splitOn sep with
|
||||
| [] | [_] => (s, "")
|
||||
| first :: rest => (first, String.intercalate sep rest)
|
||||
|
||||
/--
|
||||
Attempts to parse a host string into a `URI.Host`, trying IPv4, bracketed IPv6, and
|
||||
domain name forms in order.
|
||||
-/
|
||||
private def parseHostStr (s : String) : Option URI.Host :=
|
||||
if let some ip := Net.IPv4Addr.ofString s then
|
||||
some (.ipv4 ip)
|
||||
else if s.startsWith "[" && s.endsWith "]" && s.length > 2 then
|
||||
let inner := s.dropEnd 1 |>.drop 1 |>.toString
|
||||
(Net.IPv6Addr.ofString inner).map .ipv6
|
||||
else
|
||||
(URI.DomainName.ofString? s).map .name
|
||||
|
||||
/--
|
||||
Parses a single `Set-Cookie` header value and stores the resulting cookie.
|
||||
`host` is the origin host of the response (used as the effective domain when no
|
||||
`Domain` attribute is present).
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-5.2
|
||||
-/
|
||||
def processSetCookie (jar : Jar) (host : URI.Host) (headerValue : String) : BaseIO Unit := do
|
||||
let rawParts := (headerValue.splitOn ";").map String.trimAscii
|
||||
|
||||
let some rawNameValue := rawParts[0]?
|
||||
| return ()
|
||||
|
||||
let (rawName, rawValue) := splitFirst rawNameValue.toString "="
|
||||
|
||||
let some cookieName := Cookie.Name.ofString? rawName | return ()
|
||||
let some cookieValue := Cookie.Value.ofString? rawValue | return ()
|
||||
|
||||
let mut cookieDomain : Option URI.Host := none
|
||||
let mut cookiePath : URI.Path := URI.Path.parseOrRoot "/"
|
||||
let mut secure := false
|
||||
|
||||
for attr in rawParts.drop 1 do
|
||||
let (attrName, attrVal) := splitFirst attr.toString "="
|
||||
match attrName.trimAscii.toString.toLower with
|
||||
| "domain" =>
|
||||
let d := if attrVal.startsWith "." then attrVal.drop 1 else attrVal
|
||||
let d := d.trimAscii.toString
|
||||
if !d.isEmpty then
|
||||
cookieDomain := (URI.DomainName.ofString? d).map URI.Host.name
|
||||
| "path" =>
|
||||
let p := attrVal.trimAscii.toString
|
||||
if !p.isEmpty then cookiePath := URI.Path.parseOrRoot p
|
||||
| "secure" => secure := true
|
||||
| _ => pure ()
|
||||
|
||||
let (domain, hostOnly) ← match cookieDomain with
|
||||
| some d => pure (d, false)
|
||||
| none => pure (host, true)
|
||||
|
||||
let cookie : Cookie := { name := cookieName, value := cookieValue, domain, hostOnly, path := cookiePath, secure }
|
||||
|
||||
jar.cookies.atomically do
|
||||
let cs ← get
|
||||
let cs := cs.filter fun c =>
|
||||
!(c.name == cookie.name && c.domain == cookie.domain && c.path == cookie.path)
|
||||
set (cs.push cookie)
|
||||
|
||||
/--
|
||||
Returns all cookies that should be sent for a request to `host` at `path`.
|
||||
Pass `secure := true` when the request channel is HTTPS.
|
||||
|
||||
Reference: https://www.rfc-editor.org/rfc/rfc6265#section-5.4
|
||||
-/
|
||||
def cookiesFor
|
||||
(jar : Jar) (host : URI.Host) (path : String)
|
||||
(secure : Bool := false) : BaseIO (Array Cookie) :=
|
||||
jar.cookies.atomically do
|
||||
let cs ← get
|
||||
return cs.filter fun c =>
|
||||
domainMatches c.domain c.hostOnly host &&
|
||||
pathMatches c.path path &&
|
||||
(!c.secure || secure)
|
||||
|
||||
/--
|
||||
Formats an array of cookies into a `Cookie` header value string.
|
||||
Returns `none` when the array is empty.
|
||||
-/
|
||||
def toCookieHeader (cookies : Array Cookie) : Option String :=
|
||||
if cookies.isEmpty then
|
||||
none
|
||||
else
|
||||
some (String.intercalate "; " (cookies.map (fun c => c.name.value ++ "=" ++ c.value.value)).toList)
|
||||
|
||||
end Std.Http.Cookie.Jar
|
||||
Reference in New Issue
Block a user