Compare commits

..

7 Commits

Author SHA1 Message Date
Sebastian Ullrich
ccc7157c08 fix: expose grind gadgets abstractFn and simpMatchDiscrsOnly (#13177)
This PR adds `@[expose]` to `Lean.Grind.abstractFn` and
`Lean.Grind.simpMatchDiscrsOnly` so that the kernel can unfold them when
type-checking `grind`-produced proofs inside `module` blocks. Other
similar gadgets (`nestedDecidable`, `PreMatchCond`, `alreadyNorm`) were
already exposed; these two were simply missed.

Closes https://github.com/leanprover/lean4/issues/13167
2026-03-29 10:37:54 +00:00
Lean stage0 autoupdater
05046dc3d7 chore: update stage0 2026-03-29 01:00:22 +00:00
Mac Malone
43f18fd502 fix: lake: large cache get bulk fetch (#13173)
This PR fixes a bug in #13164 where the bulk request would hang if the
response was large.
2026-03-28 22:58:56 +00:00
Sofia Rodrigues
b06eb981a3 fix: remove non-deterministic http-body test (#13175)
This PR fixes the wrong behavior of a stream in http_body.
2026-03-28 20:36:35 +00:00
Sofia Rodrigues
f72137f53a feat: introduce Body type class and some Body types for HTTP (#12144)
This PR introduces the `Body` type class, the `ChunkStream` and `Full`
types that are used to represent streaming bodies of Requests and
Responses.

This contains the same code as #10478, divided into separate pieces to
facilitate easier review.

The pieces of this feature are:
- Core data structures: #12126
- Headers: #12127
- URI:  #12128
- Body: #12144
- H1: #12146
- Server: #12151
- Client:

---------

Co-authored-by: Rob23oba <152706811+Rob23oba@users.noreply.github.com>
2026-03-28 17:14:53 +00:00
Lean stage0 autoupdater
96dbc324f3 chore: update stage0 2026-03-28 05:24:36 +00:00
Mac Malone
d6e69649b6 refactor: lake: fetch artifact URLs in a single Reservoir request (#13164)
This PR changes `lake cache get` to fetch artifact cloud storage URLs
from Reservoir in a single bulk POST request rather than relying on
per-artifact HTTP redirects. When downloading many artifacts, the
redirect-based approach sends one request per artifact to the Reservoir
web host (Netlify), which can be slow and risks hitting rate limits. The
bulk endpoint returns all URLs at once, so curl only talks to the CDN
after that.

Non-Reservoir cache services are unaffected and continue using direct
URLs as before.

🤖 Prepared with Claude Code
2026-03-28 04:46:43 +00:00
32 changed files with 2123 additions and 60 deletions

View File

@@ -30,13 +30,13 @@ simpMatchDiscrsOnly (match 0 with | 0 => true | _ => false) = true
```
using `eq_self`.
-/
def simpMatchDiscrsOnly {α : Sort u} (a : α) : α := a
@[expose] def simpMatchDiscrsOnly {α : Sort u} (a : α) : α := a
/--
Gadget for protecting lambda abstractions created by `abstractGroundMismatches?`
from beta reduction during preprocessing. See `ProveEq.lean` for details.
-/
def abstractFn {α : Sort u} (a : α) : α := a
@[expose] def abstractFn {α : Sort u} (a : α) : α := a
/-- Gadget for representing offsets `t+k` in patterns. -/
def offset (a b : Nat) : Nat := a + b

View File

@@ -14,6 +14,7 @@ public import Std.Internal.Http.Data.Status
public import Std.Internal.Http.Data.Chunk
public import Std.Internal.Http.Data.Headers
public import Std.Internal.Http.Data.URI
public import Std.Internal.Http.Data.Body
/-!
# HTTP Data Types

View File

@@ -0,0 +1,24 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Internal.Http.Data.Body.Basic
public import Std.Internal.Http.Data.Body.Length
public import Std.Internal.Http.Data.Body.Any
public import Std.Internal.Http.Data.Body.Stream
public import Std.Internal.Http.Data.Body.Empty
public import Std.Internal.Http.Data.Body.Full
public section
/-!
# Body
This module re-exports all HTTP body types: `Body.Empty`, `Body.Full`, `Body.Stream`,
`Body.Any`, and `Body.Length`, along with the `Http.Body` typeclass and conversion
utilities (`ToByteArray`, `FromByteArray`).
-/

View File

@@ -0,0 +1,83 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Internal.Http.Data.Body.Basic
public section
/-!
# Body.Any
A type-erased body backed by closures. Implements `Http.Body` and can be constructed from any
type that also implements `Http.Body`. Used as the default handler response body type.
-/
namespace Std.Http.Body
open Std Internal IO Async
set_option linter.all true
/--
A type-erased body handle. Operations are stored as closures, making it open to any body type
that implements `Http.Body`.
-/
structure Any where
/--
Receives the next body chunk. Returns `none` at end-of-stream.
-/
recv : Async (Option Chunk)
/--
Closes the body stream.
-/
close : Async Unit
/--
Returns `true` when the body stream is closed.
-/
isClosed : Async Bool
/--
Selector that resolves when a chunk is available or EOF is reached.
-/
recvSelector : Selector (Option Chunk)
/--
Returns the declared size.
-/
getKnownSize : Async (Option Body.Length)
/--
Sets the size of the body.
-/
setKnownSize : Option Body.Length Async Unit
namespace Any
/--
Erases a body of any `Http.Body` instance into a `Body.Any`.
-/
def ofBody [Http.Body α] (body : α) : Any where
recv := Http.Body.recv body
close := Http.Body.close body
isClosed := Http.Body.isClosed body
recvSelector := Http.Body.recvSelector body
getKnownSize := Http.Body.getKnownSize body
setKnownSize := Http.Body.setKnownSize body
end Any
instance : Http.Body Any where
recv := Any.recv
close := Any.close
isClosed := Any.isClosed
recvSelector := Any.recvSelector
getKnownSize := Any.getKnownSize
setKnownSize := Any.setKnownSize
end Std.Http.Body

View File

@@ -0,0 +1,102 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Internal.Async
public import Std.Internal.Async.ContextAsync
public import Std.Internal.Http.Data.Chunk
public import Std.Internal.Http.Data.Headers
public import Std.Internal.Http.Data.Body.Length
public section
/-!
# Body.Basic
This module defines the `Body` typeclass for HTTP body streams, and shared conversion types
`ToByteArray` and `FromByteArray` used for encoding and decoding body content.
-/
namespace Std.Http
open Std Internal IO Async
set_option linter.all true
/--
Typeclass for values that can be read as HTTP body streams.
-/
class Body (α : Type) where
/--
Receives the next body chunk. Returns `none` at end-of-stream.
-/
recv : α Async (Option Chunk)
/--
Closes the body stream.
-/
close : α Async Unit
/--
Returns `true` when the body stream is closed.
-/
isClosed : α Async Bool
/--
Selector that resolves when a chunk is available or EOF is reached.
-/
recvSelector : α Selector (Option Chunk)
/--
Gets the declared size of the body.
-/
getKnownSize : α Async (Option Body.Length)
/--
Sets the declared size of a body.
-/
setKnownSize : α Option Body.Length Async Unit
end Std.Http
namespace Std.Http.Body
/--
Typeclass for types that can be converted to a `ByteArray`.
-/
class ToByteArray (α : Type) where
/--
Transforms into a `ByteArray`.
-/
toByteArray : α ByteArray
instance : ToByteArray ByteArray where
toByteArray := id
instance : ToByteArray String where
toByteArray := String.toUTF8
/--
Typeclass for types that can be decoded from a `ByteArray`. The conversion may fail with an error
message if the bytes are not valid for the target type.
-/
class FromByteArray (α : Type) where
/--
Attempts to decode a `ByteArray` into the target type, returning an error message on failure.
-/
fromByteArray : ByteArray Except String α
instance : FromByteArray ByteArray where
fromByteArray := .ok
instance : FromByteArray String where
fromByteArray bs :=
match String.fromUTF8? bs with
| some s => .ok s
| none => .error "invalid UTF-8 encoding"
end Std.Http.Body

View File

@@ -0,0 +1,116 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Internal.Http.Data.Request
public import Std.Internal.Http.Data.Response
public import Std.Internal.Http.Data.Body.Any
public section
/-!
# Body.Empty
Represents an always-empty, already-closed body handle.
-/
namespace Std.Http.Body
open Std Internal IO Async
set_option linter.all true
/--
An empty body handle.
-/
structure Empty where
deriving Inhabited, BEq
namespace Empty
/--
Receives from an empty body, always returning end-of-stream.
-/
@[inline]
def recv (_ : Empty) : Async (Option Chunk) :=
pure none
/--
Closes an empty body (no-op).
-/
@[inline]
def close (_ : Empty) : Async Unit :=
pure ()
/--
Empty bodies are always closed for reading.
-/
@[inline]
def isClosed (_ : Empty) : Async Bool :=
pure true
/--
Selector that immediately resolves with end-of-stream for an empty body.
-/
@[inline]
def recvSelector (_ : Empty) : Selector (Option Chunk) where
tryFn := pure (some none)
registerFn waiter := do
let lose := pure ()
let win promise := do
promise.resolve (.ok none)
waiter.race lose win
unregisterFn := pure ()
end Empty
instance : Http.Body Empty where
recv := Empty.recv
close := Empty.close
isClosed := Empty.isClosed
recvSelector := Empty.recvSelector
getKnownSize _ := pure (some <| .fixed 0)
setKnownSize _ _ := pure ()
instance : Coe Empty Any := Any.ofBody
instance : Coe (Response Empty) (Response Any) where
coe f := { f with }
instance : Coe (ContextAsync (Response Empty)) (ContextAsync (Response Any)) where
coe action := do
let response action
pure (response : Response Any)
instance : Coe (Async (Response Empty)) (ContextAsync (Response Any)) where
coe action := do
let response action
pure (response : Response Any)
end Body
namespace Request.Builder
open Internal.IO.Async
/--
Builds a request with no body.
-/
def empty (builder : Builder) : Async (Request Body.Empty) :=
pure <| builder.body {}
end Request.Builder
namespace Response.Builder
open Internal.IO.Async
/--
Builds a response with no body.
-/
def empty (builder : Builder) : Async (Response Body.Empty) :=
pure <| builder.body {}
end Response.Builder

View File

@@ -0,0 +1,232 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Sync
public import Std.Internal.Http.Data.Request
public import Std.Internal.Http.Data.Response
public import Std.Internal.Http.Data.Body.Any
public import Init.Data.ByteArray
public section
/-!
# Body.Full
A body backed by a fixed `ByteArray` held in a `Mutex`.
The byte array is consumed at most once: the first call to `recv` atomically takes the data
and returns it as a single chunk; subsequent calls return `none` (end-of-stream).
Closing the body discards any unconsumed data.
-/
namespace Std.Http.Body
open Std Internal IO Async
set_option linter.all true
/--
A body backed by a fixed, mutex-protected `ByteArray`.
The data is consumed on the first read. Once consumed (or explicitly closed), the body
behaves as a closed, empty channel.
-/
structure Full where
private mk ::
private state : Mutex (Option ByteArray)
deriving Nonempty
namespace Full
private def takeChunk : AtomicT (Option ByteArray) Async (Option Chunk) := do
match get with
| none =>
pure none
| some data =>
set (none : Option ByteArray)
if data.isEmpty then
pure none
else
pure (some (Chunk.ofByteArray data))
/--
Creates a `Full` body from a `ByteArray`.
-/
def ofByteArray (data : ByteArray) : Async Full := do
let state Mutex.new (some data)
return { state }
/--
Creates a `Full` body from a `String`.
-/
def ofString (data : String) : Async Full := do
let state Mutex.new (some data.toUTF8)
return { state }
/--
Receives the body data. Returns the full byte array on the first call as a single chunk,
then `none` on all subsequent calls.
-/
def recv (full : Full) : Async (Option Chunk) :=
full.state.atomically do
takeChunk
/--
Closes the body, discarding any unconsumed data.
-/
def close (full : Full) : Async Unit :=
full.state.atomically do
set (none : Option ByteArray)
/--
Returns `true` when the data has been consumed or the body has been closed.
-/
def isClosed (full : Full) : Async Bool :=
full.state.atomically do
return ( get).isNone
/--
Returns the known size of the remaining data.
Returns `some (.fixed n)` with the current byte count, or `some (.fixed 0)` if the body has
already been consumed or closed.
-/
def getKnownSize (full : Full) : Async (Option Body.Length) :=
full.state.atomically do
match get with
| none => pure (some (.fixed 0))
| some data => pure (some (.fixed data.size))
/--
Selector that immediately resolves to the remaining chunk (or EOF).
-/
def recvSelector (full : Full) : Selector (Option Chunk) where
tryFn := do
let chunk full.state.atomically do
takeChunk
pure (some chunk)
registerFn waiter := do
full.state.atomically do
let lose := pure ()
let win promise := do
let chunk takeChunk
promise.resolve (.ok chunk)
waiter.race lose win
unregisterFn := pure ()
end Full
instance : Http.Body Full where
recv := Full.recv
close := Full.close
isClosed := Full.isClosed
recvSelector := Full.recvSelector
getKnownSize := Full.getKnownSize
setKnownSize _ _ := pure ()
instance : Coe Full Any := Any.ofBody
instance : Coe (Response Full) (Response Any) where
coe f := { f with }
instance : Coe (ContextAsync (Response Full)) (ContextAsync (Response Any)) where
coe action := do
let response action
pure (response : Response Any)
instance : Coe (Async (Response Full)) (ContextAsync (Response Any)) where
coe action := do
let response action
pure (response : Response Any)
end Body
namespace Request.Builder
open Internal.IO.Async
/--
Builds a request body from raw bytes without setting any headers.
Use `bytes` instead if you want `Content-Type: application/octet-stream` set automatically.
-/
def fromBytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) := do
return builder.body ( Body.Full.ofByteArray content)
/--
Builds a request with a binary body.
Sets `Content-Type: application/octet-stream`.
Use `fromBytes` instead if you need to set a different `Content-Type` or none at all.
-/
def bytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream")) content
/--
Builds a request with a text body.
Sets `Content-Type: text/plain; charset=utf-8`.
-/
def text (builder : Builder) (content : String) : Async (Request Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8")) content.toUTF8
/--
Builds a request with a JSON body.
Sets `Content-Type: application/json`.
-/
def json (builder : Builder) (content : String) : Async (Request Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/json")) content.toUTF8
/--
Builds a request with an HTML body.
Sets `Content-Type: text/html; charset=utf-8`.
-/
def html (builder : Builder) (content : String) : Async (Request Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8")) content.toUTF8
end Request.Builder
namespace Response.Builder
open Internal.IO.Async
/--
Builds a response body from raw bytes without setting any headers.
Use `bytes` instead if you want `Content-Type: application/octet-stream` set automatically.
-/
def fromBytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) := do
return builder.body ( Body.Full.ofByteArray content)
/--
Builds a response with a binary body.
Sets `Content-Type: application/octet-stream`.
Use `fromBytes` instead if you need to set a different `Content-Type` or none at all.
-/
def bytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream")) content
/--
Builds a response with a text body.
Sets `Content-Type: text/plain; charset=utf-8`.
-/
def text (builder : Builder) (content : String) : Async (Response Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8")) content.toUTF8
/--
Builds a response with a JSON body.
Sets `Content-Type: application/json`.
-/
def json (builder : Builder) (content : String) : Async (Response Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/json")) content.toUTF8
/--
Builds a response with an HTML body.
Sets `Content-Type: text/html; charset=utf-8`.
-/
def html (builder : Builder) (content : String) : Async (Response Body.Full) :=
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8")) content.toUTF8
end Response.Builder

View File

@@ -0,0 +1,60 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Init.Data.Repr
public section
/-!
# Body.Length
This module defines the `Length` type, that represents the Content-Length or Transfer-Encoding
of an HTTP request or response.
-/
namespace Std.Http.Body
set_option linter.all true
/--
Size of the body of a response or request.
-/
inductive Length
/--
Indicates that the HTTP message body uses **chunked transfer encoding**.
-/
| chunked
/--
Indicates that the HTTP message body has a **fixed, known length**, as specified by the
`Content-Length` header.
-/
| fixed (n : Nat)
deriving Repr, BEq
namespace Length
/--
Checks if the `Length` is chunked.
-/
@[inline]
def isChunked : Length Bool
| .chunked => true
| _ => false
/--
Checks if the `Length` is a fixed size.
-/
@[inline]
def isFixed : Length Bool
| .fixed _ => true
| _ => false
end Length
end Std.Http.Body

View File

@@ -0,0 +1,650 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Sync
public import Std.Internal.Async
public import Std.Internal.Http.Data.Request
public import Std.Internal.Http.Data.Response
public import Std.Internal.Http.Data.Chunk
public import Std.Internal.Http.Data.Body.Basic
public import Std.Internal.Http.Data.Body.Any
public import Init.Data.ByteArray
public section
/-!
# Body.Stream
This module defines a zero-buffer rendezvous body channel (`Body.Stream`) that supports
both sending and receiving chunks.
There is no queue and no capacity. A send waits for a receiver and a receive waits for a sender.
At most one blocked producer and one blocked consumer are supported.
-/
namespace Std.Http
namespace Body
open Std Internal IO Async
set_option linter.all true
namespace Channel
open Internal.IO.Async in
private inductive Consumer where
| normal (promise : IO.Promise (Option Chunk))
| select (finished : Waiter (Option Chunk))
private def Consumer.resolve (c : Consumer) (x : Option Chunk) : BaseIO Bool := do
match c with
| .normal promise =>
promise.resolve x
return true
| .select waiter =>
let lose := return false
let win promise := do
promise.resolve (.ok x)
return true
waiter.race lose win
private structure Producer where
chunk : Chunk
/--
Resolved with `true` when consumed by a receiver, `false` when the channel closes.
-/
done : IO.Promise Bool
open Internal.IO.Async in
private def resolveInterestWaiter (waiter : Waiter Bool) (x : Bool) : BaseIO Bool := do
let lose := return false
let win promise := do
promise.resolve (.ok x)
return true
waiter.race lose win
private structure State where
/--
A single blocked producer waiting for a receiver.
-/
pendingProducer : Option Producer
/--
A single blocked consumer waiting for a producer.
-/
pendingConsumer : Option Consumer
/--
A waiter for `Stream.interestSelector`.
-/
interestWaiter : Option (Internal.IO.Async.Waiter Bool)
/--
Whether the channel is closed.
-/
closed : Bool
/--
Known size of the stream if available.
-/
knownSize : Option Body.Length
/--
Buffered partial chunk data accumulated from `Stream.send ... (incomplete := true)`.
These partial pieces are collapsed and emitted as a single chunk on the next complete send.
-/
pendingIncompleteChunk : Option Chunk := none
deriving Nonempty
end Channel
/--
A zero-buffer rendezvous body channel that supports both sending and receiving chunks.
-/
structure Stream where
private mk ::
private state : Mutex Channel.State
deriving Nonempty, TypeName
/--
Creates a rendezvous body stream.
-/
def mkStream : Async Stream := do
let state Mutex.new {
pendingProducer := none
pendingConsumer := none
interestWaiter := none
closed := false
knownSize := none
}
return { state }
namespace Channel
private def decreaseKnownSize (knownSize : Option Body.Length) (chunk : Chunk) : Option Body.Length :=
match knownSize with
| some (.fixed res) => some (Body.Length.fixed (res - chunk.data.size))
| _ => knownSize
private def pruneFinishedWaiters [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
AtomicT State m Unit := do
let st get
let pendingConsumer
match st.pendingConsumer with
| some (.select waiter) =>
if waiter.checkFinished then
pure none
else
pure st.pendingConsumer
| _ =>
pure st.pendingConsumer
let interestWaiter
match st.interestWaiter with
| some waiter =>
if waiter.checkFinished then
pure none
else
pure st.interestWaiter
| none =>
pure none
set { st with pendingConsumer, interestWaiter }
private def signalInterest [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
AtomicT State m Unit := do
let st get
if let some waiter := st.interestWaiter then
discard <| resolveInterestWaiter waiter true
set { st with interestWaiter := none }
private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
AtomicT State m Bool := do
let st get
return st.pendingProducer.isSome || st.closed
private def hasInterest' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
AtomicT State m Bool := do
let st get
return st.pendingConsumer.isSome
private def tryRecv' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
AtomicT State m (Option Chunk) := do
let st get
if let some producer := st.pendingProducer then
set {
st with
pendingProducer := none
knownSize := decreaseKnownSize st.knownSize producer.chunk
}
discard <| producer.done.resolve true
return some producer.chunk
else
return none
private def close' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
AtomicT State m Unit := do
let st get
if st.closed then
return ()
if let some consumer := st.pendingConsumer then
discard <| consumer.resolve none
if let some waiter := st.interestWaiter then
discard <| resolveInterestWaiter waiter false
if let some producer := st.pendingProducer then
discard <| producer.done.resolve false
set {
st with
pendingProducer := none
pendingConsumer := none
interestWaiter := none
pendingIncompleteChunk := none
closed := true
}
end Channel
namespace Stream
/--
Attempts to receive a chunk from the channel without blocking.
Returns `some chunk` only when a producer is already waiting.
-/
def tryRecv (stream : Stream) : Async (Option Chunk) :=
stream.state.atomically do
Channel.pruneFinishedWaiters
Channel.tryRecv'
private def recv' (stream : Stream) : BaseIO (AsyncTask (Option Chunk)) := do
stream.state.atomically do
Channel.pruneFinishedWaiters
if let some chunk Channel.tryRecv' then
return AsyncTask.pure (some chunk)
let st get
if st.closed then
return AsyncTask.pure none
if st.pendingConsumer.isSome then
return Task.pure (.error (IO.Error.userError "only one blocked consumer is allowed"))
let promise IO.Promise.new
set { st with pendingConsumer := some (.normal promise) }
Channel.signalInterest
return promise.result?.map (sync := true) fun
| none => .error (IO.Error.userError "the promise linked to the consumer was dropped")
| some res => .ok res
/--
Receives a chunk from the channel. Blocks until a producer sends one.
Returns `none` if the channel is closed and no producer is waiting.
-/
def recv (stream : Stream) : Async (Option Chunk) := do
Async.ofAsyncTask ( recv' stream)
/--
Closes the channel.
-/
def close (stream : Stream) : Async Unit :=
stream.state.atomically do
Channel.close'
/--
Checks whether the channel is closed.
-/
@[always_inline, inline]
def isClosed (stream : Stream) : Async Bool :=
stream.state.atomically do
return ( get).closed
/--
Gets the known size if available.
-/
@[always_inline, inline]
def getKnownSize (stream : Stream) : Async (Option Body.Length) :=
stream.state.atomically do
return ( get).knownSize
/--
Sets known size metadata.
-/
@[always_inline, inline]
def setKnownSize (stream : Stream) (size : Option Body.Length) : Async Unit :=
stream.state.atomically do
modify fun st => { st with knownSize := size }
open Internal.IO.Async in
/--
Creates a selector that resolves when a producer is waiting (or the channel closes).
-/
def recvSelector (stream : Stream) : Selector (Option Chunk) where
tryFn := do
stream.state.atomically do
Channel.pruneFinishedWaiters
if Channel.recvReady' then
return some ( Channel.tryRecv')
else
return none
registerFn waiter := do
stream.state.atomically do
Channel.pruneFinishedWaiters
if Channel.recvReady' then
let lose := return ()
let win promise := do
promise.resolve (.ok ( Channel.tryRecv'))
waiter.race lose win
else
let st get
if st.pendingConsumer.isSome then
throw (.userError "only one blocked consumer is allowed")
set { st with pendingConsumer := some (.select waiter) }
Channel.signalInterest
unregisterFn := do
stream.state.atomically do
Channel.pruneFinishedWaiters
/--
Iterates over chunks until the channel closes.
-/
@[inline]
protected partial def forIn
{β : Type} (stream : Stream) (acc : β)
(step : Chunk β Async (ForInStep β)) : Async β := do
let rec @[specialize] loop (stream : Stream) (acc : β) : Async β := do
if let some chunk stream.recv then
match step chunk acc with
| .done res => return res
| .yield res => loop stream res
else
return acc
loop stream acc
/--
Context-aware iteration over chunks until the channel closes.
-/
@[inline]
protected partial def forIn'
{β : Type} (stream : Stream) (acc : β)
(step : Chunk β ContextAsync (ForInStep β)) : ContextAsync β := do
let rec @[specialize] loop (stream : Stream) (acc : β) : ContextAsync β := do
let data Selectable.one #[
.case stream.recvSelector pure,
.case ( ContextAsync.doneSelector) (fun _ => pure none),
]
if let some chunk := data then
match step chunk acc with
| .done res => return res
| .yield res => loop stream res
else
return acc
loop stream acc
/--
Abstracts over how the next chunk is received, allowing `readAll` to work in both `Async`
(no cancellation) and `ContextAsync` (races with cancellation via `doneSelector`).
-/
class NextChunk (m : Type Type) where
/--
Receives the next chunk, stopping at EOF or (in `ContextAsync`) when the context is cancelled.
-/
nextChunk : Stream m (Option Chunk)
instance : NextChunk Async where
nextChunk := Stream.recv
instance : NextChunk ContextAsync where
nextChunk stream := do
Selectable.one #[
.case stream.recvSelector pure,
.case ( ContextAsync.doneSelector) (fun _ => pure none),
]
/--
Reads all remaining chunks and decodes them into `α`.
Works in both `Async` (reads until EOF, no cancellation) and `ContextAsync` (also stops if the
context is cancelled).
-/
partial def readAll
[FromByteArray α]
[Monad m] [MonadExceptOf IO.Error m] [NextChunk m]
(stream : Stream)
(maximumSize : Option UInt64 := none) :
m α := do
let rec loop (result : ByteArray) : m ByteArray := do
match NextChunk.nextChunk stream with
| none => return result
| some chunk =>
let result := result ++ chunk.data
if let some max := maximumSize then
if result.size.toUInt64 > max then
throw (.userError s!"body exceeded maximum size of {max} bytes")
loop result
let result loop ByteArray.empty
match FromByteArray.fromByteArray result with
| .ok a => return a
| .error msg => throw (.userError msg)
private def collapseForSend
(stream : Stream)
(chunk : Chunk)
(incomplete : Bool) : BaseIO (Except IO.Error (Option Chunk)) := do
stream.state.atomically do
Channel.pruneFinishedWaiters
let st get
if st.closed then
return .error (.userError "channel closed")
let merged := match st.pendingIncompleteChunk with
| some pending =>
{
data := pending.data ++ chunk.data
extensions := if pending.extensions.isEmpty then chunk.extensions else pending.extensions
}
| none => chunk
if incomplete then
set { st with pendingIncompleteChunk := some merged }
return .ok none
else
set { st with pendingIncompleteChunk := none }
return .ok (some merged)
/--
Sends a chunk, retrying if a select-mode consumer races and loses. If no consumer is ready,
installs the chunk as a pending producer and awaits acknowledgement from the receiver.
-/
private partial def send' (stream : Stream) (chunk : Chunk) : Async Unit := do
let done IO.Promise.new
let result : Except IO.Error (Option Bool) stream.state.atomically do
Channel.pruneFinishedWaiters
let st get
if st.closed then
return .error (IO.Error.userError "channel closed")
if let some consumer := st.pendingConsumer then
let success consumer.resolve (some chunk)
if success then
set {
st with
pendingConsumer := none
knownSize := Channel.decreaseKnownSize st.knownSize chunk
}
return .ok (some true)
else
set { st with pendingConsumer := none }
return .ok (some false)
else if st.pendingProducer.isSome then
return .error (IO.Error.userError "only one blocked producer is allowed")
else
set { st with pendingProducer := some { chunk, done } }
return .ok none
match result with
| .error err =>
throw err
| .ok (some true) =>
return ()
| .ok (some false) =>
-- The select-mode consumer raced and lost; recurse to allocate a fresh `done` promise.
send' stream chunk
| .ok none =>
match await done.result? with
| some true => return ()
| _ => throw (IO.Error.userError "channel closed")
/--
Sends a chunk.
If `incomplete := true`, the chunk is buffered and collapsed with subsequent chunks, and is not
delivered to the receiver yet.
If `incomplete := false`, any buffered incomplete pieces are collapsed with this chunk and the
single merged chunk is sent.
-/
def send (stream : Stream) (chunk : Chunk) (incomplete : Bool := false) : Async Unit := do
match ( collapseForSend stream chunk incomplete) with
| .error err => throw err
| .ok none => pure ()
| .ok (some toSend) =>
if toSend.data.isEmpty toSend.extensions.isEmpty then
return ()
send' stream toSend
/--
Returns `true` when a consumer is currently blocked waiting for data.
-/
def hasInterest (stream : Stream) : Async Bool :=
stream.state.atomically do
Channel.pruneFinishedWaiters
Channel.hasInterest'
open Internal.IO.Async in
/--
Creates a selector that resolves when consumer interest is present.
Returns `true` when a consumer is waiting, `false` when the channel closes first.
-/
def interestSelector (stream : Stream) : Selector Bool where
tryFn := do
stream.state.atomically do
Channel.pruneFinishedWaiters
let st get
if st.pendingConsumer.isSome then
return some true
else if st.closed then
return some false
else
return none
registerFn waiter := do
stream.state.atomically do
Channel.pruneFinishedWaiters
let st get
if st.pendingConsumer.isSome then
let lose := return ()
let win promise := do
promise.resolve (.ok true)
waiter.race lose win
else if st.closed then
let lose := return ()
let win promise := do
promise.resolve (.ok false)
waiter.race lose win
else if st.interestWaiter.isSome then
throw (.userError "only one blocked interest selector is allowed")
else
set { st with interestWaiter := some waiter }
unregisterFn := do
stream.state.atomically do
Channel.pruneFinishedWaiters
end Stream
/--
Creates a body from a producer function.
Returns the stream immediately and runs `gen` in a detached task.
The channel is always closed when `gen` returns or throws.
Errors from `gen` are not rethrown here; consumers observe end-of-stream via `recv = none`.
-/
def stream (gen : Stream Async Unit) : Async Stream := do
let s mkStream
background <| do
try
gen s
finally
s.close
return s
/--
Creates a body from a fixed byte array.
-/
def fromBytes (content : ByteArray) : Async Stream := do
stream fun s => do
s.setKnownSize (some (.fixed content.size))
if content.size > 0 then
s.send (Chunk.ofByteArray content)
/--
Creates an empty `Stream` body channel (already closed, no data).
Prefer `Body.Empty` when you need a concrete zero-cost type. Use this when the calling
context requires a `Stream` specifically.
-/
def empty : Async Stream := do
let s mkStream
s.setKnownSize (some (.fixed 0))
s.close
return s
instance : ForIn Async Stream Chunk where
forIn := Stream.forIn
instance : ForIn ContextAsync Stream Chunk where
forIn := Stream.forIn'
instance : Http.Body Stream where
recv := Stream.recv
close := Stream.close
isClosed := Stream.isClosed
recvSelector := Stream.recvSelector
getKnownSize := Stream.getKnownSize
setKnownSize := Stream.setKnownSize
instance : Coe Stream Any := Any.ofBody
instance : Coe (Response Stream) (Response Any) where
coe f := { f with }
instance : Coe (ContextAsync (Response Stream)) (ContextAsync (Response Any)) where
coe action := do
let response action
pure (response : Response Any)
instance : Coe (Async (Response Stream)) (ContextAsync (Response Any)) where
coe action := do
let response action
pure (response : Response Any)
end Body
namespace Request.Builder
open Internal.IO.Async
/--
Builds a request with a streaming body generator.
-/
def stream
(builder : Builder)
(gen : Body.Stream Async Unit) :
Async (Request Body.Stream) := do
let s Body.stream gen
return Request.Builder.body builder s
end Request.Builder
namespace Response.Builder
open Internal.IO.Async
/--
Builds a response with a streaming body generator.
-/
def stream
(builder : Builder)
(gen : Body.Stream Async Unit) :
Async (Response Body.Stream) := do
let s Body.stream gen
return Response.Builder.body builder s
end Response.Builder

View File

@@ -124,12 +124,6 @@ def new : Builder := { }
namespace Builder
/--
Creates a new HTTP request builder with the default head
(method: GET, version: HTTP/1.1, target: `*`).
-/
def empty : Builder := { }
/--
Sets the HTTP method for the request being built.
-/

View File

@@ -111,7 +111,7 @@ namespace Builder
/--
Creates a new HTTP Response builder with default head (status: 200 OK, version: HTTP/1.1).
-/
def empty : Builder := { }
def new : Builder := { }
/--
Sets the HTTP status code for the response being built.
@@ -173,66 +173,66 @@ end Builder
Creates a new HTTP Response builder with the 200 status code.
-/
def ok : Builder :=
.empty |>.status .ok
.new |>.status .ok
/--
Creates a new HTTP Response builder with the provided status.
-/
def withStatus (status : Status) : Builder :=
.empty |>.status status
.new |>.status status
/--
Creates a new HTTP Response builder with the 404 status code.
-/
def notFound : Builder :=
.empty |>.status .notFound
.new |>.status .notFound
/--
Creates a new HTTP Response builder with the 500 status code.
-/
def internalServerError : Builder :=
.empty |>.status .internalServerError
.new |>.status .internalServerError
/--
Creates a new HTTP Response builder with the 400 status code.
-/
def badRequest : Builder :=
.empty |>.status .badRequest
.new |>.status .badRequest
/--
Creates a new HTTP Response builder with the 201 status code.
-/
def created : Builder :=
.empty |>.status .created
.new |>.status .created
/--
Creates a new HTTP Response builder with the 202 status code.
-/
def accepted : Builder :=
.empty |>.status .accepted
.new |>.status .accepted
/--
Creates a new HTTP Response builder with the 401 status code.
-/
def unauthorized : Builder :=
.empty |>.status .unauthorized
.new |>.status .unauthorized
/--
Creates a new HTTP Response builder with the 403 status code.
-/
def forbidden : Builder :=
.empty |>.status .forbidden
.new |>.status .forbidden
/--
Creates a new HTTP Response builder with the 409 status code.
-/
def conflict : Builder :=
.empty |>.status .conflict
.new |>.status .conflict
/--
Creates a new HTTP Response builder with the 503 status code.
-/
def serviceUnavailable : Builder :=
.empty |>.status .serviceUnavailable
.new |>.status .serviceUnavailable
end Response

View File

@@ -94,4 +94,3 @@ def parseOrRoot (s : String) : Std.Http.URI.Path :=
parse? s |>.getD { segments := #[], absolute := true }
end Std.Http.URI.Path

View File

@@ -174,19 +174,19 @@ opaque osEnviron : IO (Array (String × String))
Gets the value of an environment variable.
-/
@[extern "lean_uv_os_getenv"]
opaque osGetenv : @& String IO (Option String)
opaque osGetenv : String IO (Option String)
/--
Sets the value of an environment variable.
-/
@[extern "lean_uv_os_setenv"]
opaque osSetenv : @& String @& String IO Unit
opaque osSetenv : String String IO Unit
/--
Unsets an environment variable.
-/
@[extern "lean_uv_os_unsetenv"]
opaque osUnsetenv : @& String IO Unit
opaque osUnsetenv : String IO Unit
/--
Gets the hostname of the machine.

View File

@@ -765,12 +765,13 @@ where
\n remote URL: {info.url}"
match cfg.kind with
| .get =>
if let .ok size := out.getAs Nat "size_download" then
if size > 0 then
if let .ok contentType := out.getAs String "content_type" then
if contentType != artifactContentType then
if let .ok resp IO.FS.readFile info.path |>.toBaseIO then
msg := s!"{msg}\nunexpected response:\n{resp}"
unless code? matches .ok 404 do -- ignore response bodies on 404s
if let .ok size := out.getAs Nat "size_download" then
if size > 0 then
if let .ok contentType := out.getAs String "content_type" then
if contentType != artifactContentType then
if let .ok resp IO.FS.readFile info.path |>.toBaseIO then
msg := s!"{msg}\nunexpected response:\n{resp}"
removeFileIfExists info.path
| .put =>
if let .ok size := out.getAs Nat "size_download" then
@@ -787,7 +788,7 @@ private def transferArtifacts
match cfg.kind with
| .get =>
cfg.infos.forM fun info => do
h.putStrLn s!"url = {info.url}"
h.putStrLn s!"url = {info.url.quote}"
h.putStrLn s!"-o {info.path.toString.quote}"
h.flush
return #[
@@ -798,7 +799,7 @@ private def transferArtifacts
| .put =>
cfg.infos.forM fun info => do
h.putStrLn s!"-T {info.path.toString.quote}"
h.putStrLn s!"url = {info.url}"
h.putStrLn s!"url = {info.url.quote}"
h.flush
return #[
"-Z", "-X", "PUT", "-L",
@@ -827,6 +828,13 @@ private def transferArtifacts
if s.didError then
failure
private def reservoirArtifactsUrl (service : CacheService) (scope : CacheServiceScope) : String :=
let endpoint :=
match scope.impl with
| .repo scope => appendScope s!"{service.impl.apiEndpoint}/repositories" scope
| .str scope => appendScope s!"{service.impl.apiEndpoint}/packages" scope
s!"{endpoint}/artifacts"
public def downloadArtifacts
(descrs : Array ArtifactDescr) (cache : Cache)
(service : CacheService) (scope : CacheServiceScope) (force := false)
@@ -844,8 +852,68 @@ public def downloadArtifacts
return s.push {url, path, descr}
if infos.isEmpty then
return
let infos id do
if service.isReservoir then
-- Artifact cloud storage URLs are fetched in a single request
-- to avoid hammering the Reservoir web host
fetchUrls (service.reservoirArtifactsUrl scope) infos
else return infos
IO.FS.createDirAll cache.artifactDir
transferArtifacts {scope, infos, kind := .get}
where
fetchUrls url infos := IO.FS.withTempFile fun h path => do
let body := Json.arr <| infos.map (toJson ·.descr.hash)
h.putStr body.compress
h.flush
let args := #[
"-X", "POST", "-L", "-d", s!"@{path}",
"--retry", "3", -- intermittent network errors can occur
"-s", "-w", "%{stderr}%{json}\n",
"-H", "Content-Type: application/json",
]
let args := Reservoir.lakeHeaders.foldl (· ++ #["-H", ·]) args
let spawnArgs := {
cmd := "curl", args := args.push url
stdout := .piped, stderr := .piped
}
logVerbose (mkCmdLog spawnArgs)
let {stdout, stderr, exitCode} IO.Process.output spawnArgs
match Json.parse stdout >>= fromJson? with
| .ok (resp : ReservoirResp (Array String)) =>
match resp with
| .data urls =>
if h : infos.size = urls.size then
let s := infos.size.fold (init := infos.toVector) fun i hi s =>
s.set i {s[i] with url := urls[i]'(h hi)}
return s.toArray
else
error s!"failed to fetch artifact URLs\
\n POST {url}\
\nIncorrect number of results: expected {infos.size}, got {urls.size}"
| .error status message =>
error s!"failed to fetch artifact URLs (status code: {status})\
\n POST {url}\
\nReservoir error: {message}"
| .error _ =>
match Json.parse stderr >>= fromJson? with
| .ok (out : JsonObject) =>
let mut msg := "failed to fetch artifact URLs"
if let .ok code := out.getAs Nat "http_code" then
msg := s!"{msg} (status code: {code})"
msg := s!"{msg}\n POST {url}"
if let .ok errMsg := out.getAs String "errormsg" then
msg := s!"{msg}\n Transfer error: {errMsg}"
unless stdout.isEmpty do
msg := s!"{msg}\nstdout:\n{stdout.trimAsciiEnd}"
logError msg
logVerbose s!"curl JSON:\n{stderr.trimAsciiEnd}"
| .error e =>
logError s!"failed to fetch artifact URLs\
\n POST {url}
\nInvalid curl JSON: {e}; received: {stderr.trimAscii}"
unless stdout.isEmpty do
logWarning s!"curl produced unexpected output:\n{stdout.trimAsciiEnd}"
error s!"curl exited with code {exitCode}"
@[deprecated "Deprecated without replacement." (since := "2026-02-27")]
public def downloadOutputArtifacts

View File

@@ -103,24 +103,6 @@ public instance : FromJson RegistryPkg := ⟨RegistryPkg.fromJson?⟩
end RegistryPkg
/-- A Reservoir API response object. -/
public inductive ReservoirResp (α : Type u)
| data (a : α)
| error (status : Nat) (message : String)
public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do
let obj JsonObject.fromJson? val
if let some (err : JsonObject) obj.get? "error" then
let status err.get "status"
let message err.get "message"
return .error status message
else if let some (val : Json) obj.get? "data" then
.data <$> fromJson? val
else
.data <$> fromJson? val
public instance [FromJson α] : FromJson (ReservoirResp α) := ReservoirResp.fromJson?
public def Reservoir.pkgApiUrl (lakeEnv : Lake.Env) (owner pkg : String) :=
s!"{lakeEnv.reservoirApiUrl}/packages/{uriEncode owner}/{uriEncode pkg}"

View File

@@ -6,8 +6,9 @@ Authors: Mac Malone
module
prelude
public import Init.Prelude
import Init.Data.Array.Basic
public import Lake.Util.JsonObject
open Lean
namespace Lake
@@ -15,3 +16,23 @@ public def Reservoir.lakeHeaders : Array String := #[
"X-Reservoir-Api-Version:1.0.0",
"X-Lake-Registry-Api-Version:0.1.0"
]
/-- A Reservoir API response object. -/
public inductive ReservoirResp (α : Type u)
| data (a : α)
| error (status : Nat) (message : String)
public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do
if let .ok obj := JsonObject.fromJson? val then
if let some (err : JsonObject) obj.get? "error" then
let status err.get "status"
let message err.get "message"
return .error status message
else if let some (val : Json) obj.get? "data" then
.data <$> fromJson? val
else
.data <$> fromJson? val
else
.data <$> fromJson? val
public instance [FromJson α] : FromJson (ReservoirResp α) := ReservoirResp.fromJson?

View File

@@ -31,7 +31,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_get_process_title() {
return lean_io_result_mk_ok(lean_title);
}
// Std.Internal.UV.System.setProcessTitle : @& String → IO Unit
// Std.Internal.UV.System.setProcessTitle : String → IO Unit
extern "C" LEAN_EXPORT lean_obj_res lean_uv_set_process_title(b_obj_arg title) {
const char* title_str = lean_string_cstr(title);
if (strlen(title_str) != lean_string_size(title) - 1) {
@@ -124,7 +124,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_cwd() {
return lean_io_result_mk_ok(lean_cwd);
}
// Std.Internal.UV.System.chdir : @& String → IO Unit
// Std.Internal.UV.System.chdir : String → IO Unit
extern "C" LEAN_EXPORT lean_obj_res lean_uv_chdir(b_obj_arg path) {
const char* path_str = lean_string_cstr(path);
if (strlen(path_str) != lean_string_size(path) - 1) {
@@ -271,7 +271,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_environ() {
return lean_io_result_mk_ok(env_array);
}
// Std.Internal.UV.System.osGetenv : @& String → IO (Option String)
// Std.Internal.UV.System.osGetenv : String → IO (Option String)
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_getenv(b_obj_arg name) {
const char* name_str = lean_string_cstr(name);
if (strlen(name_str) != lean_string_size(name) - 1) {
@@ -313,7 +313,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_getenv(b_obj_arg name) {
}
// Std.Internal.UV.System.osSetenv : @& String → @& String → IO Unit
// Std.Internal.UV.System.osSetenv : String → String → IO Unit
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_setenv(b_obj_arg name, b_obj_arg value) {
const char* name_str = lean_string_cstr(name);
const char* value_str = lean_string_cstr(value);
@@ -333,7 +333,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_setenv(b_obj_arg name, b_obj_arg
return lean_io_result_mk_ok(lean_box(0));
}
// Std.Internal.UV.System.osUnsetenv : @& String → IO Unit
// Std.Internal.UV.System.osUnsetenv : String → IO Unit
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_unsetenv(b_obj_arg name) {
const char* name_str = lean_string_cstr(name);
if (strlen(name_str) != lean_string_size(name) - 1) {
@@ -641,21 +641,21 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_environ() {
);
}
// Std.Internal.UV.System.osGetenv : @& String → IO (Option String)
// Std.Internal.UV.System.osGetenv : String → IO (Option String)
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_getenv(b_obj_arg name) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
// Std.Internal.UV.System.osSetenv : @& String → @& String → IO Unit
// Std.Internal.UV.System.osSetenv : String → String → IO Unit
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_setenv(b_obj_arg name, b_obj_arg value) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
// Std.Internal.UV.System.osUnsetenv : @& String → IO Unit
// Std.Internal.UV.System.osUnsetenv : String → IO Unit
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_unsetenv(b_obj_arg name) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,725 @@
import Std.Internal.Http.Data.Body
open Std.Internal.IO Async
open Std.Http
open Std.Http.Body
/-! ## Stream tests -/
-- Test send and recv on stream
def channelSendRecv : Async Unit := do
let stream Body.mkStream
let chunk := Chunk.ofByteArray "hello".toUTF8
let sendTask async (t := AsyncTask) <| stream.send chunk
let result stream.recv
assert! result.isSome
assert! result.get!.data == "hello".toUTF8
await sendTask
#eval channelSendRecv.block
-- Test tryRecv on empty stream returns none
def channelTryRecvEmpty : Async Unit := do
let stream Body.mkStream
let result stream.tryRecv
assert! result.isNone
#eval channelTryRecvEmpty.block
-- Test tryRecv consumes a waiting producer
def channelTryRecvWithPendingSend : Async Unit := do
let stream Body.mkStream
let sendTask async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "data".toUTF8)
let mut result := none
let mut fuel := 100
while result.isNone && fuel > 0 do
result stream.tryRecv
if result.isNone then
let _ Selectable.one #[
.case ( Selector.sleep 1) pure
]
fuel := fuel - 1
assert! result.isSome
assert! result.get!.data == "data".toUTF8
await sendTask
#eval channelTryRecvWithPendingSend.block
-- Test close sets closed flag
def channelClose : Async Unit := do
let stream Body.mkStream
assert! !( stream.isClosed)
stream.close
assert! ( stream.isClosed)
#eval channelClose.block
-- Test recv on closed stream returns none
def channelRecvAfterClose : Async Unit := do
let stream Body.mkStream
stream.close
let result stream.recv
assert! result.isNone
#eval channelRecvAfterClose.block
-- Test for-in iteration collects chunks until close
def channelForIn : Async Unit := do
let stream Body.mkStream
let producer async (t := AsyncTask) <| do
stream.send (Chunk.ofByteArray "a".toUTF8)
stream.send (Chunk.ofByteArray "b".toUTF8)
stream.close
let mut acc : ByteArray := .empty
for chunk in stream do
acc := acc ++ chunk.data
assert! acc == "ab".toUTF8
await producer
#eval channelForIn.block
-- Test chunk extensions are preserved
def channelExtensions : Async Unit := do
let stream Body.mkStream
let chunk := { data := "hello".toUTF8, extensions := #[(.mk "key", some (Chunk.ExtensionValue.ofString! "value"))] : Chunk }
let sendTask async (t := AsyncTask) <| stream.send chunk
let result stream.recv
assert! result.isSome
assert! result.get!.extensions.size == 1
assert! result.get!.extensions[0]! == (Chunk.ExtensionName.mk "key", some <| .ofString! "value")
await sendTask
#eval channelExtensions.block
-- Test known size metadata
def channelKnownSize : Async Unit := do
let stream Body.mkStream
stream.setKnownSize (some (.fixed 100))
let size stream.getKnownSize
assert! size == some (.fixed 100)
#eval channelKnownSize.block
-- Test known size decreases when a chunk is consumed
def channelKnownSizeDecreases : Async Unit := do
let stream Body.mkStream
stream.setKnownSize (some (.fixed 5))
let sendTask async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "hello".toUTF8)
let _ stream.recv
await sendTask
let size stream.getKnownSize
assert! size == some (.fixed 0)
#eval channelKnownSizeDecreases.block
-- Test only one blocked producer is allowed
def channelSingleProducerRule : Async Unit := do
let stream Body.mkStream
let send1 async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "one".toUTF8)
-- Yield so `send1` can occupy the single pending-producer slot.
let _ Selectable.one #[
.case ( Selector.sleep 5) pure
]
let send2Failed
try
stream.send (Chunk.ofByteArray "two".toUTF8)
pure false
catch _ =>
pure true
assert! send2Failed
let first stream.recv
assert! first.isSome
assert! first.get!.data == "one".toUTF8
await send1
#eval channelSingleProducerRule.block
-- Test only one blocked consumer is allowed
def channelSingleConsumerRule : Async Unit := do
let stream Body.mkStream
let recv1 async (t := AsyncTask) <| stream.recv
let hasInterest Selectable.one #[
.case stream.interestSelector pure
]
assert! hasInterest
let recv2Failed
try
let _ stream.recv
pure false
catch _ =>
pure true
assert! recv2Failed
let sendTask async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "ok".toUTF8)
let r1 await recv1
assert! r1.isSome
assert! r1.get!.data == "ok".toUTF8
await sendTask
#eval channelSingleConsumerRule.block
-- Test hasInterest reflects blocked receiver state
def channelHasInterest : Async Unit := do
let stream Body.mkStream
assert! !( stream.hasInterest)
let recvTask async (t := AsyncTask) <| stream.recv
let hasInterest Selectable.one #[
.case stream.interestSelector pure
]
assert! hasInterest
assert! ( stream.hasInterest)
let sendTask async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "x".toUTF8)
let _ await recvTask
await sendTask
assert! !( stream.hasInterest)
#eval channelHasInterest.block
-- Test interestSelector resolves false when stream closes first
def channelInterestSelectorClose : Async Unit := do
let stream Body.mkStream
let waitInterest async (t := AsyncTask) <|
Selectable.one #[
.case stream.interestSelector pure
]
stream.close
let interested await waitInterest
assert! interested == false
#eval channelInterestSelectorClose.block
-- Test incomplete sends are buffered and merged into one chunk on the final send
def channelIncompleteChunks : Async Unit := do
let stream Body.mkStream
let sendTask async (t := AsyncTask) <| do
stream.send (Chunk.ofByteArray "hel".toUTF8) (incomplete := true)
stream.send (Chunk.ofByteArray "lo".toUTF8)
let result stream.recv
assert! result.isSome
assert! result.get!.data == "hello".toUTF8
await sendTask
#eval channelIncompleteChunks.block
-- Test sending to a closed stream raises an error
def channelSendAfterClose : Async Unit := do
let stream Body.mkStream
stream.close
let failed
try
stream.send (Chunk.ofByteArray "test".toUTF8)
pure false
catch _ =>
pure true
assert! failed
#eval channelSendAfterClose.block
-- Test Body.stream runs producer and returns the stream handle
def channelStreamHelper : Async Unit := do
let stream Body.stream fun s => do
s.send (Chunk.ofByteArray "hello".toUTF8)
let result stream.recv
assert! result.isSome
assert! result.get!.data == "hello".toUTF8
let eof stream.recv
assert! eof.isNone
#eval channelStreamHelper.block
-- Test Body.empty creates an already-closed Stream
def channelEmptyHelper : Async Unit := do
let stream Body.empty
assert! ( stream.isClosed)
let result stream.recv
assert! result.isNone
#eval channelEmptyHelper.block
-- Test Stream.readAll concatenates all chunks
def channelReadAll : Async Unit := do
let stream Body.mkStream
let sendTask async (t := AsyncTask) <| do
stream.send (Chunk.ofByteArray "foo".toUTF8)
stream.send (Chunk.ofByteArray "bar".toUTF8)
stream.close
let result : ByteArray stream.readAll
assert! result == "foobar".toUTF8
await sendTask
#eval channelReadAll.block
-- Test Stream.readAll enforces a maximum size limit
def channelReadAllMaxSize : Async Unit := do
let stream Body.mkStream
let sendTask async (t := AsyncTask) <| do
stream.send (Chunk.ofByteArray "abcdefgh".toUTF8)
stream.close
let failed
try
let _ : ByteArray stream.readAll (maximumSize := some 4)
pure false
catch _ =>
pure true
assert! failed
await sendTask
#eval channelReadAllMaxSize.block
-- Test Stream.getKnownSize reflects the value set via setKnownSize
def channelKnownSizeRoundtrip : Async Unit := do
let stream Body.mkStream
stream.setKnownSize (some (.fixed 42))
let size stream.getKnownSize
assert! size == some (.fixed 42)
#eval channelKnownSizeRoundtrip.block
/-! ## Full tests -/
-- Test Full.recv returns content once then EOF
def fullRecvConsumesOnce : Async Unit := do
let full Body.Full.ofString "hello"
let first full.recv
let second full.recv
assert! first.isSome
assert! first.get!.data == "hello".toUTF8
assert! second.isNone
#eval fullRecvConsumesOnce.block
-- Test Full known-size metadata tracks consumption
def fullKnownSizeLifecycle : Async Unit := do
let data := ByteArray.mk #[0x01, 0x02, 0x03, 0x04]
let full Body.Full.ofByteArray data
assert! ( full.getKnownSize) == some (.fixed 4)
let chunk full.recv
assert! chunk.isSome
assert! chunk.get!.data == data
assert! ( full.getKnownSize) == some (.fixed 0)
#eval fullKnownSizeLifecycle.block
-- Test Full.close discards remaining content
def fullClose : Async Unit := do
let full Body.Full.ofString "bye"
assert! !( full.isClosed)
full.close
assert! ( full.isClosed)
assert! ( full.recv).isNone
#eval fullClose.block
-- Test Full from an empty ByteArray returns none on the first recv
def fullEmptyBytes : Async Unit := do
let full Body.Full.ofByteArray ByteArray.empty
let result full.recv
assert! result.isNone
#eval fullEmptyBytes.block
-- Test Full.recvSelector resolves immediately with the stored chunk
def fullRecvSelectorResolves : Async Unit := do
let full Body.Full.ofString "world"
let result Selectable.one #[
.case full.recvSelector pure
]
assert! result.isSome
assert! result.get!.data == "world".toUTF8
#eval fullRecvSelectorResolves.block
-- Test Full.getKnownSize returns 0 after close
def fullKnownSizeAfterClose : Async Unit := do
let full Body.Full.ofString "data"
assert! ( full.getKnownSize) == some (.fixed 4)
full.close
assert! ( full.getKnownSize) == some (.fixed 0)
#eval fullKnownSizeAfterClose.block
-- Test Full.tryRecv succeeds once and returns none thereafter
def fullTryRecvIdempotent : Async Unit := do
let full Body.Full.ofString "once"
let first full.recv
let second full.recv
assert! first.isSome
assert! first.get!.data == "once".toUTF8
assert! second.isNone
#eval fullTryRecvIdempotent.block
/-! ## Empty tests -/
-- Test Empty.recv always returns none
def emptyBodyRecv : Async Unit := do
let body : Body.Empty := {}
let result body.recv
assert! result.isNone
#eval emptyBodyRecv.block
-- Test Empty.isClosed is always true
def emptyBodyIsClosed : Async Unit := do
let body : Body.Empty := {}
assert! ( body.isClosed)
#eval emptyBodyIsClosed.block
-- Test Empty.close is a no-op: still closed and recv still returns none
def emptyBodyClose : Async Unit := do
let body : Body.Empty := {}
body.close
assert! ( body.isClosed)
let result body.recv
assert! result.isNone
#eval emptyBodyClose.block
-- Test Empty.recvSelector resolves immediately with none
def emptyBodyRecvSelector : Async Unit := do
let body : Body.Empty := {}
let result Selectable.one #[
.case body.recvSelector pure
]
assert! result.isNone
#eval emptyBodyRecvSelector.block
/-! ## Any tests -/
-- Test Any wrapping a Full body forwards recv correctly
def anyFromFull : Async Unit := do
let full Body.Full.ofString "hello"
let any : Body.Any := full
let result any.recv
assert! result.isSome
assert! result.get!.data == "hello".toUTF8
#eval anyFromFull.block
-- Test Any wrapping an Empty body returns none and reports closed
def anyFromEmpty : Async Unit := do
let empty : Body.Empty := {}
let any : Body.Any := empty
let result any.recv
assert! result.isNone
assert! ( any.isClosed)
#eval anyFromEmpty.block
-- Test Any.close closes the underlying body
def anyCloseForwards : Async Unit := do
let full Body.Full.ofString "test"
let any : Body.Any := full
any.close
assert! ( any.isClosed)
let result any.recv
assert! result.isNone
#eval anyCloseForwards.block
-- Test Any.recvSelector resolves immediately for a Full body
def anyRecvSelectorFromFull : Async Unit := do
let full Body.Full.ofString "sel"
let any : Body.Any := full
let result Selectable.one #[
.case any.recvSelector pure
]
assert! result.isSome
assert! result.get!.data == "sel".toUTF8
#eval anyRecvSelectorFromFull.block
/-! ## Request.Builder body tests -/
private def recvBuiltBody (body : Body.Full) : Async (Option Chunk) :=
body.recv
-- Test Request.Builder.text sets correct headers
def requestBuilderText : Async Unit := do
let req Request.post (.originForm! "/api")
|>.text "Hello, World!"
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8")
assert! req.line.headers.get? Header.Name.contentLength == none
let body recvBuiltBody req.body
assert! body.isSome
assert! body.get!.data == "Hello, World!".toUTF8
#eval requestBuilderText.block
-- Test Request.Builder.json sets correct headers
def requestBuilderJson : Async Unit := do
let req Request.post (.originForm! "/api")
|>.json "{\"key\": \"value\"}"
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json")
assert! req.line.headers.get? Header.Name.contentLength == none
let body recvBuiltBody req.body
assert! body.isSome
assert! body.get!.data == "{\"key\": \"value\"}".toUTF8
#eval requestBuilderJson.block
-- Test Request.Builder.fromBytes sets body
def requestBuilderFromBytes : Async Unit := do
let data := ByteArray.mk #[0x01, 0x02, 0x03]
let req Request.post (.originForm! "/api")
|>.fromBytes data
assert! req.line.headers.get? Header.Name.contentLength == none
let body recvBuiltBody req.body
assert! body.isSome
assert! body.get!.data == data
#eval requestBuilderFromBytes.block
-- Test Request.Builder.noBody creates empty body
def requestBuilderNoBody : Async Unit := do
let req Request.get (.originForm! "/api")
|>.empty
assert! req.body == {}
#eval requestBuilderNoBody.block
-- Test Request.Builder.bytes sets application/octet-stream content type
def requestBuilderBytes : Async Unit := do
let data := ByteArray.mk #[0xde, 0xad, 0xbe, 0xef]
let req Request.post (.originForm! "/api")
|>.bytes data
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/octet-stream")
let body recvBuiltBody req.body
assert! body.isSome
assert! body.get!.data == data
#eval requestBuilderBytes.block
-- Test Request.Builder.html sets text/html content type
def requestBuilderHtml : Async Unit := do
let req Request.post (.originForm! "/api")
|>.html "<h1>Hello</h1>"
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/html; charset=utf-8")
let body recvBuiltBody req.body
assert! body.isSome
assert! body.get!.data == "<h1>Hello</h1>".toUTF8
#eval requestBuilderHtml.block
-- Test Request.Builder.stream creates a streaming body
def requestBuilderStream : Async Unit := do
let req Request.post (.originForm! "/api")
|>.stream fun s => do
s.send (Chunk.ofByteArray "streamed".toUTF8)
let result req.body.recv
assert! result.isSome
assert! result.get!.data == "streamed".toUTF8
#eval requestBuilderStream.block
-- Test Request.Builder.noBody body is always closed and returns none
def requestBuilderNoBodyAlwaysClosed : Async Unit := do
let req Request.get (.originForm! "/api")
|>.empty
assert! ( req.body.isClosed)
let result req.body.recv
assert! result.isNone
#eval requestBuilderNoBodyAlwaysClosed.block
/-! ## Response.Builder body tests -/
-- Test Response.Builder.text sets correct headers
def responseBuilderText : Async Unit := do
let res Response.ok
|>.text "Hello, World!"
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8")
assert! res.line.headers.get? Header.Name.contentLength == none
let body recvBuiltBody res.body
assert! body.isSome
assert! body.get!.data == "Hello, World!".toUTF8
#eval responseBuilderText.block
-- Test Response.Builder.json sets correct headers
def responseBuilderJson : Async Unit := do
let res Response.ok
|>.json "{\"status\": \"ok\"}"
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json")
assert! res.line.headers.get? Header.Name.contentLength == none
let body recvBuiltBody res.body
assert! body.isSome
assert! body.get!.data == "{\"status\": \"ok\"}".toUTF8
#eval responseBuilderJson.block
-- Test Response.Builder.fromBytes sets body
def responseBuilderFromBytes : Async Unit := do
let data := ByteArray.mk #[0xaa, 0xbb]
let res Response.ok
|>.fromBytes data
assert! res.line.headers.get? Header.Name.contentLength == none
let body recvBuiltBody res.body
assert! body.isSome
assert! body.get!.data == data
#eval responseBuilderFromBytes.block
-- Test Response.Builder.noBody creates empty body
def responseBuilderNoBody : Async Unit := do
let res Response.ok
|>.empty
assert! res.body == {}
#eval responseBuilderNoBody.block
-- Test Response.Builder.bytes sets application/octet-stream content type
def responseBuilderBytes : Async Unit := do
let data := ByteArray.mk #[0xca, 0xfe]
let res Response.ok
|>.bytes data
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/octet-stream")
let body recvBuiltBody res.body
assert! body.isSome
assert! body.get!.data == data
#eval responseBuilderBytes.block
-- Test Response.Builder.html sets text/html content type
def responseBuilderHtml : Async Unit := do
let res Response.ok
|>.html "<p>OK</p>"
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/html; charset=utf-8")
let body recvBuiltBody res.body
assert! body.isSome
assert! body.get!.data == "<p>OK</p>".toUTF8
#eval responseBuilderHtml.block
-- Test Response.Builder.stream creates a streaming body
def responseBuilderStream : Async Unit := do
let res Response.ok
|>.stream fun s => do
s.send (Chunk.ofByteArray "streamed".toUTF8)
let result res.body.recv
assert! result.isSome
assert! result.get!.data == "streamed".toUTF8
#eval responseBuilderStream.block
-- Test Response.Builder.noBody body is always closed and returns none
def responseBuilderNoBodyAlwaysClosed : Async Unit := do
let res Response.ok
|>.empty
assert! ( res.body.isClosed)
let result res.body.recv
assert! result.isNone
#eval responseBuilderNoBodyAlwaysClosed.block

View File

@@ -0,0 +1,6 @@
module
-- https://github.com/leanprover/lean4/issues/13167
theorem Option.bind_pmap {α β γ} {p : α Prop} (f : a, p a β) (x : Option α) (g : β Option γ) (H) :
pmap f x H >>= g = x.pbind fun a h g (f a (H _ h)) := by
grind [cases Option, pmap]