mirror of
https://github.com/leanprover/lean4.git
synced 2026-03-29 16:24:08 +00:00
Compare commits
7 Commits
sofia/fix-
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ccc7157c08 | ||
|
|
05046dc3d7 | ||
|
|
43f18fd502 | ||
|
|
b06eb981a3 | ||
|
|
f72137f53a | ||
|
|
96dbc324f3 | ||
|
|
d6e69649b6 |
@@ -30,13 +30,13 @@ simpMatchDiscrsOnly (match 0 with | 0 => true | _ => false) = true
|
||||
```
|
||||
using `eq_self`.
|
||||
-/
|
||||
def simpMatchDiscrsOnly {α : Sort u} (a : α) : α := a
|
||||
@[expose] def simpMatchDiscrsOnly {α : Sort u} (a : α) : α := a
|
||||
|
||||
/--
|
||||
Gadget for protecting lambda abstractions created by `abstractGroundMismatches?`
|
||||
from beta reduction during preprocessing. See `ProveEq.lean` for details.
|
||||
-/
|
||||
def abstractFn {α : Sort u} (a : α) : α := a
|
||||
@[expose] def abstractFn {α : Sort u} (a : α) : α := a
|
||||
|
||||
/-- Gadget for representing offsets `t+k` in patterns. -/
|
||||
def offset (a b : Nat) : Nat := a + b
|
||||
|
||||
@@ -14,6 +14,7 @@ public import Std.Internal.Http.Data.Status
|
||||
public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Headers
|
||||
public import Std.Internal.Http.Data.URI
|
||||
public import Std.Internal.Http.Data.Body
|
||||
|
||||
/-!
|
||||
# HTTP Data Types
|
||||
|
||||
24
src/Std/Internal/Http/Data/Body.lean
Normal file
24
src/Std/Internal/Http/Data/Body.lean
Normal file
@@ -0,0 +1,24 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Data.Body.Basic
|
||||
public import Std.Internal.Http.Data.Body.Length
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
public import Std.Internal.Http.Data.Body.Stream
|
||||
public import Std.Internal.Http.Data.Body.Empty
|
||||
public import Std.Internal.Http.Data.Body.Full
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body
|
||||
|
||||
This module re-exports all HTTP body types: `Body.Empty`, `Body.Full`, `Body.Stream`,
|
||||
`Body.Any`, and `Body.Length`, along with the `Http.Body` typeclass and conversion
|
||||
utilities (`ToByteArray`, `FromByteArray`).
|
||||
-/
|
||||
83
src/Std/Internal/Http/Data/Body/Any.lean
Normal file
83
src/Std/Internal/Http/Data/Body/Any.lean
Normal file
@@ -0,0 +1,83 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Data.Body.Basic
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Any
|
||||
|
||||
A type-erased body backed by closures. Implements `Http.Body` and can be constructed from any
|
||||
type that also implements `Http.Body`. Used as the default handler response body type.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
A type-erased body handle. Operations are stored as closures, making it open to any body type
|
||||
that implements `Http.Body`.
|
||||
-/
|
||||
structure Any where
|
||||
|
||||
/--
|
||||
Receives the next body chunk. Returns `none` at end-of-stream.
|
||||
-/
|
||||
recv : Async (Option Chunk)
|
||||
|
||||
/--
|
||||
Closes the body stream.
|
||||
-/
|
||||
close : Async Unit
|
||||
|
||||
/--
|
||||
Returns `true` when the body stream is closed.
|
||||
-/
|
||||
isClosed : Async Bool
|
||||
|
||||
/--
|
||||
Selector that resolves when a chunk is available or EOF is reached.
|
||||
-/
|
||||
recvSelector : Selector (Option Chunk)
|
||||
|
||||
/--
|
||||
Returns the declared size.
|
||||
-/
|
||||
getKnownSize : Async (Option Body.Length)
|
||||
|
||||
/--
|
||||
Sets the size of the body.
|
||||
-/
|
||||
setKnownSize : Option Body.Length → Async Unit
|
||||
namespace Any
|
||||
|
||||
/--
|
||||
Erases a body of any `Http.Body` instance into a `Body.Any`.
|
||||
-/
|
||||
def ofBody [Http.Body α] (body : α) : Any where
|
||||
recv := Http.Body.recv body
|
||||
close := Http.Body.close body
|
||||
isClosed := Http.Body.isClosed body
|
||||
recvSelector := Http.Body.recvSelector body
|
||||
getKnownSize := Http.Body.getKnownSize body
|
||||
setKnownSize := Http.Body.setKnownSize body
|
||||
|
||||
end Any
|
||||
|
||||
instance : Http.Body Any where
|
||||
recv := Any.recv
|
||||
close := Any.close
|
||||
isClosed := Any.isClosed
|
||||
recvSelector := Any.recvSelector
|
||||
getKnownSize := Any.getKnownSize
|
||||
setKnownSize := Any.setKnownSize
|
||||
|
||||
end Std.Http.Body
|
||||
102
src/Std/Internal/Http/Data/Body/Basic.lean
Normal file
102
src/Std/Internal/Http/Data/Body/Basic.lean
Normal file
@@ -0,0 +1,102 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async
|
||||
public import Std.Internal.Async.ContextAsync
|
||||
public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Headers
|
||||
public import Std.Internal.Http.Data.Body.Length
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Basic
|
||||
|
||||
This module defines the `Body` typeclass for HTTP body streams, and shared conversion types
|
||||
`ToByteArray` and `FromByteArray` used for encoding and decoding body content.
|
||||
-/
|
||||
|
||||
namespace Std.Http
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
Typeclass for values that can be read as HTTP body streams.
|
||||
-/
|
||||
class Body (α : Type) where
|
||||
/--
|
||||
Receives the next body chunk. Returns `none` at end-of-stream.
|
||||
-/
|
||||
recv : α → Async (Option Chunk)
|
||||
|
||||
/--
|
||||
Closes the body stream.
|
||||
-/
|
||||
close : α → Async Unit
|
||||
|
||||
/--
|
||||
Returns `true` when the body stream is closed.
|
||||
-/
|
||||
isClosed : α → Async Bool
|
||||
|
||||
/--
|
||||
Selector that resolves when a chunk is available or EOF is reached.
|
||||
-/
|
||||
recvSelector : α → Selector (Option Chunk)
|
||||
|
||||
/--
|
||||
Gets the declared size of the body.
|
||||
-/
|
||||
getKnownSize : α → Async (Option Body.Length)
|
||||
|
||||
/--
|
||||
Sets the declared size of a body.
|
||||
-/
|
||||
setKnownSize : α → Option Body.Length → Async Unit
|
||||
end Std.Http
|
||||
|
||||
namespace Std.Http.Body
|
||||
|
||||
/--
|
||||
Typeclass for types that can be converted to a `ByteArray`.
|
||||
-/
|
||||
class ToByteArray (α : Type) where
|
||||
|
||||
/--
|
||||
Transforms into a `ByteArray`.
|
||||
-/
|
||||
toByteArray : α → ByteArray
|
||||
|
||||
instance : ToByteArray ByteArray where
|
||||
toByteArray := id
|
||||
|
||||
instance : ToByteArray String where
|
||||
toByteArray := String.toUTF8
|
||||
|
||||
/--
|
||||
Typeclass for types that can be decoded from a `ByteArray`. The conversion may fail with an error
|
||||
message if the bytes are not valid for the target type.
|
||||
-/
|
||||
class FromByteArray (α : Type) where
|
||||
|
||||
/--
|
||||
Attempts to decode a `ByteArray` into the target type, returning an error message on failure.
|
||||
-/
|
||||
fromByteArray : ByteArray → Except String α
|
||||
|
||||
instance : FromByteArray ByteArray where
|
||||
fromByteArray := .ok
|
||||
|
||||
instance : FromByteArray String where
|
||||
fromByteArray bs :=
|
||||
match String.fromUTF8? bs with
|
||||
| some s => .ok s
|
||||
| none => .error "invalid UTF-8 encoding"
|
||||
|
||||
end Std.Http.Body
|
||||
116
src/Std/Internal/Http/Data/Body/Empty.lean
Normal file
116
src/Std/Internal/Http/Data/Body/Empty.lean
Normal file
@@ -0,0 +1,116 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Data.Request
|
||||
public import Std.Internal.Http.Data.Response
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Empty
|
||||
|
||||
Represents an always-empty, already-closed body handle.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
An empty body handle.
|
||||
-/
|
||||
structure Empty where
|
||||
deriving Inhabited, BEq
|
||||
|
||||
namespace Empty
|
||||
|
||||
/--
|
||||
Receives from an empty body, always returning end-of-stream.
|
||||
-/
|
||||
@[inline]
|
||||
def recv (_ : Empty) : Async (Option Chunk) :=
|
||||
pure none
|
||||
|
||||
/--
|
||||
Closes an empty body (no-op).
|
||||
-/
|
||||
@[inline]
|
||||
def close (_ : Empty) : Async Unit :=
|
||||
pure ()
|
||||
|
||||
/--
|
||||
Empty bodies are always closed for reading.
|
||||
-/
|
||||
@[inline]
|
||||
def isClosed (_ : Empty) : Async Bool :=
|
||||
pure true
|
||||
|
||||
/--
|
||||
Selector that immediately resolves with end-of-stream for an empty body.
|
||||
-/
|
||||
@[inline]
|
||||
def recvSelector (_ : Empty) : Selector (Option Chunk) where
|
||||
tryFn := pure (some none)
|
||||
registerFn waiter := do
|
||||
let lose := pure ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok none)
|
||||
waiter.race lose win
|
||||
unregisterFn := pure ()
|
||||
|
||||
end Empty
|
||||
|
||||
instance : Http.Body Empty where
|
||||
recv := Empty.recv
|
||||
close := Empty.close
|
||||
isClosed := Empty.isClosed
|
||||
recvSelector := Empty.recvSelector
|
||||
getKnownSize _ := pure (some <| .fixed 0)
|
||||
setKnownSize _ _ := pure ()
|
||||
|
||||
|
||||
instance : Coe Empty Any := ⟨Any.ofBody⟩
|
||||
|
||||
instance : Coe (Response Empty) (Response Any) where
|
||||
coe f := { f with }
|
||||
|
||||
instance : Coe (ContextAsync (Response Empty)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
instance : Coe (Async (Response Empty)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
end Body
|
||||
|
||||
namespace Request.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a request with no body.
|
||||
-/
|
||||
def empty (builder : Builder) : Async (Request Body.Empty) :=
|
||||
pure <| builder.body {}
|
||||
|
||||
end Request.Builder
|
||||
|
||||
namespace Response.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a response with no body.
|
||||
-/
|
||||
def empty (builder : Builder) : Async (Response Body.Empty) :=
|
||||
pure <| builder.body {}
|
||||
|
||||
end Response.Builder
|
||||
232
src/Std/Internal/Http/Data/Body/Full.lean
Normal file
232
src/Std/Internal/Http/Data/Body/Full.lean
Normal file
@@ -0,0 +1,232 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Sync
|
||||
public import Std.Internal.Http.Data.Request
|
||||
public import Std.Internal.Http.Data.Response
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
public import Init.Data.ByteArray
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Full
|
||||
|
||||
A body backed by a fixed `ByteArray` held in a `Mutex`.
|
||||
|
||||
The byte array is consumed at most once: the first call to `recv` atomically takes the data
|
||||
and returns it as a single chunk; subsequent calls return `none` (end-of-stream).
|
||||
Closing the body discards any unconsumed data.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
A body backed by a fixed, mutex-protected `ByteArray`.
|
||||
|
||||
The data is consumed on the first read. Once consumed (or explicitly closed), the body
|
||||
behaves as a closed, empty channel.
|
||||
-/
|
||||
structure Full where
|
||||
private mk ::
|
||||
private state : Mutex (Option ByteArray)
|
||||
deriving Nonempty
|
||||
|
||||
namespace Full
|
||||
|
||||
private def takeChunk : AtomicT (Option ByteArray) Async (Option Chunk) := do
|
||||
match ← get with
|
||||
| none =>
|
||||
pure none
|
||||
| some data =>
|
||||
set (none : Option ByteArray)
|
||||
if data.isEmpty then
|
||||
pure none
|
||||
else
|
||||
pure (some (Chunk.ofByteArray data))
|
||||
|
||||
/--
|
||||
Creates a `Full` body from a `ByteArray`.
|
||||
-/
|
||||
def ofByteArray (data : ByteArray) : Async Full := do
|
||||
let state ← Mutex.new (some data)
|
||||
return { state }
|
||||
|
||||
/--
|
||||
Creates a `Full` body from a `String`.
|
||||
-/
|
||||
def ofString (data : String) : Async Full := do
|
||||
let state ← Mutex.new (some data.toUTF8)
|
||||
return { state }
|
||||
|
||||
/--
|
||||
Receives the body data. Returns the full byte array on the first call as a single chunk,
|
||||
then `none` on all subsequent calls.
|
||||
-/
|
||||
def recv (full : Full) : Async (Option Chunk) :=
|
||||
full.state.atomically do
|
||||
takeChunk
|
||||
|
||||
/--
|
||||
Closes the body, discarding any unconsumed data.
|
||||
-/
|
||||
def close (full : Full) : Async Unit :=
|
||||
full.state.atomically do
|
||||
set (none : Option ByteArray)
|
||||
|
||||
/--
|
||||
Returns `true` when the data has been consumed or the body has been closed.
|
||||
-/
|
||||
def isClosed (full : Full) : Async Bool :=
|
||||
full.state.atomically do
|
||||
return (← get).isNone
|
||||
|
||||
/--
|
||||
Returns the known size of the remaining data.
|
||||
Returns `some (.fixed n)` with the current byte count, or `some (.fixed 0)` if the body has
|
||||
already been consumed or closed.
|
||||
-/
|
||||
def getKnownSize (full : Full) : Async (Option Body.Length) :=
|
||||
full.state.atomically do
|
||||
match ← get with
|
||||
| none => pure (some (.fixed 0))
|
||||
| some data => pure (some (.fixed data.size))
|
||||
|
||||
/--
|
||||
Selector that immediately resolves to the remaining chunk (or EOF).
|
||||
-/
|
||||
def recvSelector (full : Full) : Selector (Option Chunk) where
|
||||
tryFn := do
|
||||
let chunk ← full.state.atomically do
|
||||
takeChunk
|
||||
pure (some chunk)
|
||||
|
||||
registerFn waiter := do
|
||||
full.state.atomically do
|
||||
let lose := pure ()
|
||||
|
||||
let win promise := do
|
||||
let chunk ← takeChunk
|
||||
promise.resolve (.ok chunk)
|
||||
|
||||
waiter.race lose win
|
||||
|
||||
unregisterFn := pure ()
|
||||
|
||||
end Full
|
||||
|
||||
instance : Http.Body Full where
|
||||
recv := Full.recv
|
||||
close := Full.close
|
||||
isClosed := Full.isClosed
|
||||
recvSelector := Full.recvSelector
|
||||
getKnownSize := Full.getKnownSize
|
||||
setKnownSize _ _ := pure ()
|
||||
|
||||
instance : Coe Full Any := ⟨Any.ofBody⟩
|
||||
|
||||
instance : Coe (Response Full) (Response Any) where
|
||||
coe f := { f with }
|
||||
|
||||
instance : Coe (ContextAsync (Response Full)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
instance : Coe (Async (Response Full)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
end Body
|
||||
|
||||
namespace Request.Builder
|
||||
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a request body from raw bytes without setting any headers.
|
||||
Use `bytes` instead if you want `Content-Type: application/octet-stream` set automatically.
|
||||
-/
|
||||
def fromBytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) := do
|
||||
return builder.body (← Body.Full.ofByteArray content)
|
||||
|
||||
/--
|
||||
Builds a request with a binary body.
|
||||
Sets `Content-Type: application/octet-stream`.
|
||||
Use `fromBytes` instead if you need to set a different `Content-Type` or none at all.
|
||||
-/
|
||||
def bytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream")) content
|
||||
|
||||
/--
|
||||
Builds a request with a text body.
|
||||
Sets `Content-Type: text/plain; charset=utf-8`.
|
||||
-/
|
||||
def text (builder : Builder) (content : String) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a request with a JSON body.
|
||||
Sets `Content-Type: application/json`.
|
||||
-/
|
||||
def json (builder : Builder) (content : String) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/json")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a request with an HTML body.
|
||||
Sets `Content-Type: text/html; charset=utf-8`.
|
||||
-/
|
||||
def html (builder : Builder) (content : String) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8")) content.toUTF8
|
||||
|
||||
end Request.Builder
|
||||
|
||||
namespace Response.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a response body from raw bytes without setting any headers.
|
||||
Use `bytes` instead if you want `Content-Type: application/octet-stream` set automatically.
|
||||
-/
|
||||
def fromBytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) := do
|
||||
return builder.body (← Body.Full.ofByteArray content)
|
||||
|
||||
/--
|
||||
Builds a response with a binary body.
|
||||
Sets `Content-Type: application/octet-stream`.
|
||||
Use `fromBytes` instead if you need to set a different `Content-Type` or none at all.
|
||||
-/
|
||||
def bytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream")) content
|
||||
|
||||
/--
|
||||
Builds a response with a text body.
|
||||
Sets `Content-Type: text/plain; charset=utf-8`.
|
||||
-/
|
||||
def text (builder : Builder) (content : String) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a response with a JSON body.
|
||||
Sets `Content-Type: application/json`.
|
||||
-/
|
||||
def json (builder : Builder) (content : String) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/json")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a response with an HTML body.
|
||||
Sets `Content-Type: text/html; charset=utf-8`.
|
||||
-/
|
||||
def html (builder : Builder) (content : String) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8")) content.toUTF8
|
||||
|
||||
end Response.Builder
|
||||
60
src/Std/Internal/Http/Data/Body/Length.lean
Normal file
60
src/Std/Internal/Http/Data/Body/Length.lean
Normal file
@@ -0,0 +1,60 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Init.Data.Repr
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Length
|
||||
|
||||
This module defines the `Length` type, that represents the Content-Length or Transfer-Encoding
|
||||
of an HTTP request or response.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
Size of the body of a response or request.
|
||||
-/
|
||||
inductive Length
|
||||
/--
|
||||
Indicates that the HTTP message body uses **chunked transfer encoding**.
|
||||
-/
|
||||
| chunked
|
||||
|
||||
/--
|
||||
Indicates that the HTTP message body has a **fixed, known length**, as specified by the
|
||||
`Content-Length` header.
|
||||
-/
|
||||
| fixed (n : Nat)
|
||||
deriving Repr, BEq
|
||||
|
||||
namespace Length
|
||||
|
||||
/--
|
||||
Checks if the `Length` is chunked.
|
||||
-/
|
||||
@[inline]
|
||||
def isChunked : Length → Bool
|
||||
| .chunked => true
|
||||
| _ => false
|
||||
|
||||
/--
|
||||
Checks if the `Length` is a fixed size.
|
||||
-/
|
||||
@[inline]
|
||||
def isFixed : Length → Bool
|
||||
| .fixed _ => true
|
||||
| _ => false
|
||||
|
||||
end Length
|
||||
|
||||
end Std.Http.Body
|
||||
650
src/Std/Internal/Http/Data/Body/Stream.lean
Normal file
650
src/Std/Internal/Http/Data/Body/Stream.lean
Normal file
@@ -0,0 +1,650 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Sync
|
||||
public import Std.Internal.Async
|
||||
public import Std.Internal.Http.Data.Request
|
||||
public import Std.Internal.Http.Data.Response
|
||||
public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Body.Basic
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
public import Init.Data.ByteArray
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Stream
|
||||
|
||||
This module defines a zero-buffer rendezvous body channel (`Body.Stream`) that supports
|
||||
both sending and receiving chunks.
|
||||
|
||||
There is no queue and no capacity. A send waits for a receiver and a receive waits for a sender.
|
||||
At most one blocked producer and one blocked consumer are supported.
|
||||
-/
|
||||
|
||||
namespace Std.Http
|
||||
|
||||
namespace Body
|
||||
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
namespace Channel
|
||||
|
||||
open Internal.IO.Async in
|
||||
private inductive Consumer where
|
||||
| normal (promise : IO.Promise (Option Chunk))
|
||||
| select (finished : Waiter (Option Chunk))
|
||||
|
||||
private def Consumer.resolve (c : Consumer) (x : Option Chunk) : BaseIO Bool := do
|
||||
match c with
|
||||
| .normal promise =>
|
||||
promise.resolve x
|
||||
return true
|
||||
| .select waiter =>
|
||||
let lose := return false
|
||||
let win promise := do
|
||||
promise.resolve (.ok x)
|
||||
return true
|
||||
waiter.race lose win
|
||||
|
||||
private structure Producer where
|
||||
chunk : Chunk
|
||||
|
||||
/--
|
||||
Resolved with `true` when consumed by a receiver, `false` when the channel closes.
|
||||
-/
|
||||
done : IO.Promise Bool
|
||||
|
||||
open Internal.IO.Async in
|
||||
private def resolveInterestWaiter (waiter : Waiter Bool) (x : Bool) : BaseIO Bool := do
|
||||
let lose := return false
|
||||
let win promise := do
|
||||
promise.resolve (.ok x)
|
||||
return true
|
||||
waiter.race lose win
|
||||
|
||||
private structure State where
|
||||
/--
|
||||
A single blocked producer waiting for a receiver.
|
||||
-/
|
||||
pendingProducer : Option Producer
|
||||
|
||||
/--
|
||||
A single blocked consumer waiting for a producer.
|
||||
-/
|
||||
pendingConsumer : Option Consumer
|
||||
|
||||
/--
|
||||
A waiter for `Stream.interestSelector`.
|
||||
-/
|
||||
interestWaiter : Option (Internal.IO.Async.Waiter Bool)
|
||||
|
||||
/--
|
||||
Whether the channel is closed.
|
||||
-/
|
||||
closed : Bool
|
||||
|
||||
/--
|
||||
Known size of the stream if available.
|
||||
-/
|
||||
knownSize : Option Body.Length
|
||||
|
||||
/--
|
||||
Buffered partial chunk data accumulated from `Stream.send ... (incomplete := true)`.
|
||||
These partial pieces are collapsed and emitted as a single chunk on the next complete send.
|
||||
-/
|
||||
pendingIncompleteChunk : Option Chunk := none
|
||||
deriving Nonempty
|
||||
|
||||
end Channel
|
||||
|
||||
/--
|
||||
A zero-buffer rendezvous body channel that supports both sending and receiving chunks.
|
||||
-/
|
||||
structure Stream where
|
||||
private mk ::
|
||||
private state : Mutex Channel.State
|
||||
deriving Nonempty, TypeName
|
||||
|
||||
/--
|
||||
Creates a rendezvous body stream.
|
||||
-/
|
||||
def mkStream : Async Stream := do
|
||||
let state ← Mutex.new {
|
||||
pendingProducer := none
|
||||
pendingConsumer := none
|
||||
interestWaiter := none
|
||||
closed := false
|
||||
knownSize := none
|
||||
}
|
||||
return { state }
|
||||
|
||||
namespace Channel
|
||||
|
||||
private def decreaseKnownSize (knownSize : Option Body.Length) (chunk : Chunk) : Option Body.Length :=
|
||||
match knownSize with
|
||||
| some (.fixed res) => some (Body.Length.fixed (res - chunk.data.size))
|
||||
| _ => knownSize
|
||||
|
||||
private def pruneFinishedWaiters [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
AtomicT State m Unit := do
|
||||
let st ← get
|
||||
|
||||
let pendingConsumer ←
|
||||
match st.pendingConsumer with
|
||||
| some (.select waiter) =>
|
||||
if ← waiter.checkFinished then
|
||||
pure none
|
||||
else
|
||||
pure st.pendingConsumer
|
||||
| _ =>
|
||||
pure st.pendingConsumer
|
||||
|
||||
let interestWaiter ←
|
||||
match st.interestWaiter with
|
||||
| some waiter =>
|
||||
if ← waiter.checkFinished then
|
||||
pure none
|
||||
else
|
||||
pure st.interestWaiter
|
||||
| none =>
|
||||
pure none
|
||||
|
||||
set { st with pendingConsumer, interestWaiter }
|
||||
|
||||
private def signalInterest [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
|
||||
AtomicT State m Unit := do
|
||||
let st ← get
|
||||
if let some waiter := st.interestWaiter then
|
||||
discard <| resolveInterestWaiter waiter true
|
||||
set { st with interestWaiter := none }
|
||||
|
||||
private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
AtomicT State m Bool := do
|
||||
let st ← get
|
||||
return st.pendingProducer.isSome || st.closed
|
||||
|
||||
private def hasInterest' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
AtomicT State m Bool := do
|
||||
let st ← get
|
||||
return st.pendingConsumer.isSome
|
||||
|
||||
private def tryRecv' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
|
||||
AtomicT State m (Option Chunk) := do
|
||||
let st ← get
|
||||
if let some producer := st.pendingProducer then
|
||||
set {
|
||||
st with
|
||||
pendingProducer := none
|
||||
knownSize := decreaseKnownSize st.knownSize producer.chunk
|
||||
}
|
||||
discard <| producer.done.resolve true
|
||||
return some producer.chunk
|
||||
else
|
||||
return none
|
||||
|
||||
private def close' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
|
||||
AtomicT State m Unit := do
|
||||
let st ← get
|
||||
if st.closed then
|
||||
return ()
|
||||
|
||||
if let some consumer := st.pendingConsumer then
|
||||
discard <| consumer.resolve none
|
||||
|
||||
if let some waiter := st.interestWaiter then
|
||||
discard <| resolveInterestWaiter waiter false
|
||||
|
||||
if let some producer := st.pendingProducer then
|
||||
discard <| producer.done.resolve false
|
||||
|
||||
set {
|
||||
st with
|
||||
pendingProducer := none
|
||||
pendingConsumer := none
|
||||
interestWaiter := none
|
||||
pendingIncompleteChunk := none
|
||||
closed := true
|
||||
}
|
||||
|
||||
end Channel
|
||||
|
||||
namespace Stream
|
||||
|
||||
/--
|
||||
Attempts to receive a chunk from the channel without blocking.
|
||||
Returns `some chunk` only when a producer is already waiting.
|
||||
-/
|
||||
def tryRecv (stream : Stream) : Async (Option Chunk) :=
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
Channel.tryRecv'
|
||||
|
||||
private def recv' (stream : Stream) : BaseIO (AsyncTask (Option Chunk)) := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
|
||||
if let some chunk ← Channel.tryRecv' then
|
||||
return AsyncTask.pure (some chunk)
|
||||
|
||||
let st ← get
|
||||
if st.closed then
|
||||
return AsyncTask.pure none
|
||||
|
||||
if st.pendingConsumer.isSome then
|
||||
return Task.pure (.error (IO.Error.userError "only one blocked consumer is allowed"))
|
||||
|
||||
let promise ← IO.Promise.new
|
||||
set { st with pendingConsumer := some (.normal promise) }
|
||||
Channel.signalInterest
|
||||
return promise.result?.map (sync := true) fun
|
||||
| none => .error (IO.Error.userError "the promise linked to the consumer was dropped")
|
||||
| some res => .ok res
|
||||
|
||||
/--
|
||||
Receives a chunk from the channel. Blocks until a producer sends one.
|
||||
Returns `none` if the channel is closed and no producer is waiting.
|
||||
-/
|
||||
def recv (stream : Stream) : Async (Option Chunk) := do
|
||||
Async.ofAsyncTask (← recv' stream)
|
||||
|
||||
/--
|
||||
Closes the channel.
|
||||
-/
|
||||
def close (stream : Stream) : Async Unit :=
|
||||
stream.state.atomically do
|
||||
Channel.close'
|
||||
|
||||
/--
|
||||
Checks whether the channel is closed.
|
||||
-/
|
||||
@[always_inline, inline]
|
||||
def isClosed (stream : Stream) : Async Bool :=
|
||||
stream.state.atomically do
|
||||
return (← get).closed
|
||||
|
||||
/--
|
||||
Gets the known size if available.
|
||||
-/
|
||||
@[always_inline, inline]
|
||||
def getKnownSize (stream : Stream) : Async (Option Body.Length) :=
|
||||
stream.state.atomically do
|
||||
return (← get).knownSize
|
||||
|
||||
/--
|
||||
Sets known size metadata.
|
||||
-/
|
||||
@[always_inline, inline]
|
||||
def setKnownSize (stream : Stream) (size : Option Body.Length) : Async Unit :=
|
||||
stream.state.atomically do
|
||||
modify fun st => { st with knownSize := size }
|
||||
|
||||
open Internal.IO.Async in
|
||||
|
||||
/--
|
||||
Creates a selector that resolves when a producer is waiting (or the channel closes).
|
||||
-/
|
||||
def recvSelector (stream : Stream) : Selector (Option Chunk) where
|
||||
tryFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
if ← Channel.recvReady' then
|
||||
return some (← Channel.tryRecv')
|
||||
else
|
||||
return none
|
||||
|
||||
registerFn waiter := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
if ← Channel.recvReady' then
|
||||
let lose := return ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok (← Channel.tryRecv'))
|
||||
waiter.race lose win
|
||||
else
|
||||
let st ← get
|
||||
if st.pendingConsumer.isSome then
|
||||
throw (.userError "only one blocked consumer is allowed")
|
||||
|
||||
set { st with pendingConsumer := some (.select waiter) }
|
||||
Channel.signalInterest
|
||||
|
||||
unregisterFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
|
||||
/--
|
||||
Iterates over chunks until the channel closes.
|
||||
-/
|
||||
@[inline]
|
||||
protected partial def forIn
|
||||
{β : Type} (stream : Stream) (acc : β)
|
||||
(step : Chunk → β → Async (ForInStep β)) : Async β := do
|
||||
|
||||
let rec @[specialize] loop (stream : Stream) (acc : β) : Async β := do
|
||||
if let some chunk ← stream.recv then
|
||||
match ← step chunk acc with
|
||||
| .done res => return res
|
||||
| .yield res => loop stream res
|
||||
else
|
||||
return acc
|
||||
|
||||
loop stream acc
|
||||
|
||||
/--
|
||||
Context-aware iteration over chunks until the channel closes.
|
||||
-/
|
||||
@[inline]
|
||||
protected partial def forIn'
|
||||
{β : Type} (stream : Stream) (acc : β)
|
||||
(step : Chunk → β → ContextAsync (ForInStep β)) : ContextAsync β := do
|
||||
|
||||
let rec @[specialize] loop (stream : Stream) (acc : β) : ContextAsync β := do
|
||||
let data ← Selectable.one #[
|
||||
.case stream.recvSelector pure,
|
||||
.case (← ContextAsync.doneSelector) (fun _ => pure none),
|
||||
]
|
||||
|
||||
if let some chunk := data then
|
||||
match ← step chunk acc with
|
||||
| .done res => return res
|
||||
| .yield res => loop stream res
|
||||
else
|
||||
return acc
|
||||
|
||||
loop stream acc
|
||||
|
||||
/--
|
||||
Abstracts over how the next chunk is received, allowing `readAll` to work in both `Async`
|
||||
(no cancellation) and `ContextAsync` (races with cancellation via `doneSelector`).
|
||||
-/
|
||||
class NextChunk (m : Type → Type) where
|
||||
/--
|
||||
Receives the next chunk, stopping at EOF or (in `ContextAsync`) when the context is cancelled.
|
||||
-/
|
||||
nextChunk : Stream → m (Option Chunk)
|
||||
|
||||
instance : NextChunk Async where
|
||||
nextChunk := Stream.recv
|
||||
|
||||
instance : NextChunk ContextAsync where
|
||||
nextChunk stream := do
|
||||
Selectable.one #[
|
||||
.case stream.recvSelector pure,
|
||||
.case (← ContextAsync.doneSelector) (fun _ => pure none),
|
||||
]
|
||||
|
||||
/--
|
||||
Reads all remaining chunks and decodes them into `α`.
|
||||
|
||||
Works in both `Async` (reads until EOF, no cancellation) and `ContextAsync` (also stops if the
|
||||
context is cancelled).
|
||||
-/
|
||||
partial def readAll
|
||||
[FromByteArray α]
|
||||
[Monad m] [MonadExceptOf IO.Error m] [NextChunk m]
|
||||
(stream : Stream)
|
||||
(maximumSize : Option UInt64 := none) :
|
||||
m α := do
|
||||
|
||||
let rec loop (result : ByteArray) : m ByteArray := do
|
||||
match ← NextChunk.nextChunk stream with
|
||||
| none => return result
|
||||
| some chunk =>
|
||||
let result := result ++ chunk.data
|
||||
if let some max := maximumSize then
|
||||
if result.size.toUInt64 > max then
|
||||
throw (.userError s!"body exceeded maximum size of {max} bytes")
|
||||
loop result
|
||||
|
||||
let result ← loop ByteArray.empty
|
||||
|
||||
match FromByteArray.fromByteArray result with
|
||||
| .ok a => return a
|
||||
| .error msg => throw (.userError msg)
|
||||
|
||||
private def collapseForSend
|
||||
(stream : Stream)
|
||||
(chunk : Chunk)
|
||||
(incomplete : Bool) : BaseIO (Except IO.Error (Option Chunk)) := do
|
||||
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
|
||||
if st.closed then
|
||||
return .error (.userError "channel closed")
|
||||
|
||||
let merged := match st.pendingIncompleteChunk with
|
||||
| some pending =>
|
||||
{
|
||||
data := pending.data ++ chunk.data
|
||||
extensions := if pending.extensions.isEmpty then chunk.extensions else pending.extensions
|
||||
}
|
||||
| none => chunk
|
||||
|
||||
if incomplete then
|
||||
set { st with pendingIncompleteChunk := some merged }
|
||||
return .ok none
|
||||
else
|
||||
set { st with pendingIncompleteChunk := none }
|
||||
return .ok (some merged)
|
||||
|
||||
/--
|
||||
Sends a chunk, retrying if a select-mode consumer races and loses. If no consumer is ready,
|
||||
installs the chunk as a pending producer and awaits acknowledgement from the receiver.
|
||||
-/
|
||||
private partial def send' (stream : Stream) (chunk : Chunk) : Async Unit := do
|
||||
let done ← IO.Promise.new
|
||||
let result : Except IO.Error (Option Bool) ← stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
|
||||
if st.closed then
|
||||
return .error (IO.Error.userError "channel closed")
|
||||
|
||||
if let some consumer := st.pendingConsumer then
|
||||
let success ← consumer.resolve (some chunk)
|
||||
|
||||
if success then
|
||||
set {
|
||||
st with
|
||||
pendingConsumer := none
|
||||
knownSize := Channel.decreaseKnownSize st.knownSize chunk
|
||||
}
|
||||
return .ok (some true)
|
||||
else
|
||||
set { st with pendingConsumer := none }
|
||||
return .ok (some false)
|
||||
else if st.pendingProducer.isSome then
|
||||
return .error (IO.Error.userError "only one blocked producer is allowed")
|
||||
else
|
||||
set { st with pendingProducer := some { chunk, done } }
|
||||
return .ok none
|
||||
|
||||
match result with
|
||||
| .error err =>
|
||||
throw err
|
||||
| .ok (some true) =>
|
||||
return ()
|
||||
| .ok (some false) =>
|
||||
-- The select-mode consumer raced and lost; recurse to allocate a fresh `done` promise.
|
||||
send' stream chunk
|
||||
| .ok none =>
|
||||
match ← await done.result? with
|
||||
| some true => return ()
|
||||
| _ => throw (IO.Error.userError "channel closed")
|
||||
|
||||
/--
|
||||
Sends a chunk.
|
||||
|
||||
If `incomplete := true`, the chunk is buffered and collapsed with subsequent chunks, and is not
|
||||
delivered to the receiver yet.
|
||||
If `incomplete := false`, any buffered incomplete pieces are collapsed with this chunk and the
|
||||
single merged chunk is sent.
|
||||
-/
|
||||
def send (stream : Stream) (chunk : Chunk) (incomplete : Bool := false) : Async Unit := do
|
||||
match (← collapseForSend stream chunk incomplete) with
|
||||
| .error err => throw err
|
||||
| .ok none => pure ()
|
||||
| .ok (some toSend) =>
|
||||
if toSend.data.isEmpty ∧ toSend.extensions.isEmpty then
|
||||
return ()
|
||||
|
||||
send' stream toSend
|
||||
|
||||
/--
|
||||
Returns `true` when a consumer is currently blocked waiting for data.
|
||||
-/
|
||||
def hasInterest (stream : Stream) : Async Bool :=
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
Channel.hasInterest'
|
||||
|
||||
open Internal.IO.Async in
|
||||
/--
|
||||
Creates a selector that resolves when consumer interest is present.
|
||||
Returns `true` when a consumer is waiting, `false` when the channel closes first.
|
||||
-/
|
||||
def interestSelector (stream : Stream) : Selector Bool where
|
||||
tryFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
if st.pendingConsumer.isSome then
|
||||
return some true
|
||||
else if st.closed then
|
||||
return some false
|
||||
else
|
||||
return none
|
||||
|
||||
registerFn waiter := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
|
||||
if st.pendingConsumer.isSome then
|
||||
let lose := return ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok true)
|
||||
waiter.race lose win
|
||||
else if st.closed then
|
||||
let lose := return ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok false)
|
||||
waiter.race lose win
|
||||
else if st.interestWaiter.isSome then
|
||||
throw (.userError "only one blocked interest selector is allowed")
|
||||
else
|
||||
set { st with interestWaiter := some waiter }
|
||||
|
||||
unregisterFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
|
||||
end Stream
|
||||
|
||||
/--
|
||||
Creates a body from a producer function.
|
||||
Returns the stream immediately and runs `gen` in a detached task.
|
||||
The channel is always closed when `gen` returns or throws.
|
||||
Errors from `gen` are not rethrown here; consumers observe end-of-stream via `recv = none`.
|
||||
-/
|
||||
def stream (gen : Stream → Async Unit) : Async Stream := do
|
||||
let s ← mkStream
|
||||
background <| do
|
||||
try
|
||||
gen s
|
||||
finally
|
||||
s.close
|
||||
return s
|
||||
|
||||
/--
|
||||
Creates a body from a fixed byte array.
|
||||
-/
|
||||
def fromBytes (content : ByteArray) : Async Stream := do
|
||||
stream fun s => do
|
||||
s.setKnownSize (some (.fixed content.size))
|
||||
if content.size > 0 then
|
||||
s.send (Chunk.ofByteArray content)
|
||||
|
||||
/--
|
||||
Creates an empty `Stream` body channel (already closed, no data).
|
||||
|
||||
Prefer `Body.Empty` when you need a concrete zero-cost type. Use this when the calling
|
||||
context requires a `Stream` specifically.
|
||||
-/
|
||||
def empty : Async Stream := do
|
||||
let s ← mkStream
|
||||
s.setKnownSize (some (.fixed 0))
|
||||
s.close
|
||||
return s
|
||||
|
||||
instance : ForIn Async Stream Chunk where
|
||||
forIn := Stream.forIn
|
||||
|
||||
instance : ForIn ContextAsync Stream Chunk where
|
||||
forIn := Stream.forIn'
|
||||
|
||||
instance : Http.Body Stream where
|
||||
recv := Stream.recv
|
||||
close := Stream.close
|
||||
isClosed := Stream.isClosed
|
||||
recvSelector := Stream.recvSelector
|
||||
getKnownSize := Stream.getKnownSize
|
||||
setKnownSize := Stream.setKnownSize
|
||||
|
||||
instance : Coe Stream Any := ⟨Any.ofBody⟩
|
||||
|
||||
instance : Coe (Response Stream) (Response Any) where
|
||||
coe f := { f with }
|
||||
|
||||
instance : Coe (ContextAsync (Response Stream)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
instance : Coe (Async (Response Stream)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
end Body
|
||||
|
||||
namespace Request.Builder
|
||||
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a request with a streaming body generator.
|
||||
-/
|
||||
def stream
|
||||
(builder : Builder)
|
||||
(gen : Body.Stream → Async Unit) :
|
||||
Async (Request Body.Stream) := do
|
||||
let s ← Body.stream gen
|
||||
return Request.Builder.body builder s
|
||||
|
||||
end Request.Builder
|
||||
|
||||
namespace Response.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a response with a streaming body generator.
|
||||
-/
|
||||
def stream
|
||||
(builder : Builder)
|
||||
(gen : Body.Stream → Async Unit) :
|
||||
Async (Response Body.Stream) := do
|
||||
let s ← Body.stream gen
|
||||
return Response.Builder.body builder s
|
||||
|
||||
end Response.Builder
|
||||
@@ -124,12 +124,6 @@ def new : Builder := { }
|
||||
|
||||
namespace Builder
|
||||
|
||||
/--
|
||||
Creates a new HTTP request builder with the default head
|
||||
(method: GET, version: HTTP/1.1, target: `*`).
|
||||
-/
|
||||
def empty : Builder := { }
|
||||
|
||||
/--
|
||||
Sets the HTTP method for the request being built.
|
||||
-/
|
||||
|
||||
@@ -111,7 +111,7 @@ namespace Builder
|
||||
/--
|
||||
Creates a new HTTP Response builder with default head (status: 200 OK, version: HTTP/1.1).
|
||||
-/
|
||||
def empty : Builder := { }
|
||||
def new : Builder := { }
|
||||
|
||||
/--
|
||||
Sets the HTTP status code for the response being built.
|
||||
@@ -173,66 +173,66 @@ end Builder
|
||||
Creates a new HTTP Response builder with the 200 status code.
|
||||
-/
|
||||
def ok : Builder :=
|
||||
.empty |>.status .ok
|
||||
.new |>.status .ok
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the provided status.
|
||||
-/
|
||||
def withStatus (status : Status) : Builder :=
|
||||
.empty |>.status status
|
||||
.new |>.status status
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 404 status code.
|
||||
-/
|
||||
def notFound : Builder :=
|
||||
.empty |>.status .notFound
|
||||
.new |>.status .notFound
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 500 status code.
|
||||
-/
|
||||
def internalServerError : Builder :=
|
||||
.empty |>.status .internalServerError
|
||||
.new |>.status .internalServerError
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 400 status code.
|
||||
-/
|
||||
def badRequest : Builder :=
|
||||
.empty |>.status .badRequest
|
||||
.new |>.status .badRequest
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 201 status code.
|
||||
-/
|
||||
def created : Builder :=
|
||||
.empty |>.status .created
|
||||
.new |>.status .created
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 202 status code.
|
||||
-/
|
||||
def accepted : Builder :=
|
||||
.empty |>.status .accepted
|
||||
.new |>.status .accepted
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 401 status code.
|
||||
-/
|
||||
def unauthorized : Builder :=
|
||||
.empty |>.status .unauthorized
|
||||
.new |>.status .unauthorized
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 403 status code.
|
||||
-/
|
||||
def forbidden : Builder :=
|
||||
.empty |>.status .forbidden
|
||||
.new |>.status .forbidden
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 409 status code.
|
||||
-/
|
||||
def conflict : Builder :=
|
||||
.empty |>.status .conflict
|
||||
.new |>.status .conflict
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 503 status code.
|
||||
-/
|
||||
def serviceUnavailable : Builder :=
|
||||
.empty |>.status .serviceUnavailable
|
||||
.new |>.status .serviceUnavailable
|
||||
|
||||
end Response
|
||||
|
||||
@@ -94,4 +94,3 @@ def parseOrRoot (s : String) : Std.Http.URI.Path :=
|
||||
parse? s |>.getD { segments := #[], absolute := true }
|
||||
|
||||
end Std.Http.URI.Path
|
||||
|
||||
|
||||
@@ -174,19 +174,19 @@ opaque osEnviron : IO (Array (String × String))
|
||||
Gets the value of an environment variable.
|
||||
-/
|
||||
@[extern "lean_uv_os_getenv"]
|
||||
opaque osGetenv : @& String → IO (Option String)
|
||||
opaque osGetenv : String → IO (Option String)
|
||||
|
||||
/--
|
||||
Sets the value of an environment variable.
|
||||
-/
|
||||
@[extern "lean_uv_os_setenv"]
|
||||
opaque osSetenv : @& String → @& String → IO Unit
|
||||
opaque osSetenv : String → String → IO Unit
|
||||
|
||||
/--
|
||||
Unsets an environment variable.
|
||||
-/
|
||||
@[extern "lean_uv_os_unsetenv"]
|
||||
opaque osUnsetenv : @& String → IO Unit
|
||||
opaque osUnsetenv : String → IO Unit
|
||||
|
||||
/--
|
||||
Gets the hostname of the machine.
|
||||
|
||||
@@ -765,12 +765,13 @@ where
|
||||
\n remote URL: {info.url}"
|
||||
match cfg.kind with
|
||||
| .get =>
|
||||
if let .ok size := out.getAs Nat "size_download" then
|
||||
if size > 0 then
|
||||
if let .ok contentType := out.getAs String "content_type" then
|
||||
if contentType != artifactContentType then
|
||||
if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then
|
||||
msg := s!"{msg}\nunexpected response:\n{resp}"
|
||||
unless code? matches .ok 404 do -- ignore response bodies on 404s
|
||||
if let .ok size := out.getAs Nat "size_download" then
|
||||
if size > 0 then
|
||||
if let .ok contentType := out.getAs String "content_type" then
|
||||
if contentType != artifactContentType then
|
||||
if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then
|
||||
msg := s!"{msg}\nunexpected response:\n{resp}"
|
||||
removeFileIfExists info.path
|
||||
| .put =>
|
||||
if let .ok size := out.getAs Nat "size_download" then
|
||||
@@ -787,7 +788,7 @@ private def transferArtifacts
|
||||
match cfg.kind with
|
||||
| .get =>
|
||||
cfg.infos.forM fun info => do
|
||||
h.putStrLn s!"url = {info.url}"
|
||||
h.putStrLn s!"url = {info.url.quote}"
|
||||
h.putStrLn s!"-o {info.path.toString.quote}"
|
||||
h.flush
|
||||
return #[
|
||||
@@ -798,7 +799,7 @@ private def transferArtifacts
|
||||
| .put =>
|
||||
cfg.infos.forM fun info => do
|
||||
h.putStrLn s!"-T {info.path.toString.quote}"
|
||||
h.putStrLn s!"url = {info.url}"
|
||||
h.putStrLn s!"url = {info.url.quote}"
|
||||
h.flush
|
||||
return #[
|
||||
"-Z", "-X", "PUT", "-L",
|
||||
@@ -827,6 +828,13 @@ private def transferArtifacts
|
||||
if s.didError then
|
||||
failure
|
||||
|
||||
private def reservoirArtifactsUrl (service : CacheService) (scope : CacheServiceScope) : String :=
|
||||
let endpoint :=
|
||||
match scope.impl with
|
||||
| .repo scope => appendScope s!"{service.impl.apiEndpoint}/repositories" scope
|
||||
| .str scope => appendScope s!"{service.impl.apiEndpoint}/packages" scope
|
||||
s!"{endpoint}/artifacts"
|
||||
|
||||
public def downloadArtifacts
|
||||
(descrs : Array ArtifactDescr) (cache : Cache)
|
||||
(service : CacheService) (scope : CacheServiceScope) (force := false)
|
||||
@@ -844,8 +852,68 @@ public def downloadArtifacts
|
||||
return s.push {url, path, descr}
|
||||
if infos.isEmpty then
|
||||
return
|
||||
let infos ← id do
|
||||
if service.isReservoir then
|
||||
-- Artifact cloud storage URLs are fetched in a single request
|
||||
-- to avoid hammering the Reservoir web host
|
||||
fetchUrls (service.reservoirArtifactsUrl scope) infos
|
||||
else return infos
|
||||
IO.FS.createDirAll cache.artifactDir
|
||||
transferArtifacts {scope, infos, kind := .get}
|
||||
where
|
||||
fetchUrls url infos := IO.FS.withTempFile fun h path => do
|
||||
let body := Json.arr <| infos.map (toJson ·.descr.hash)
|
||||
h.putStr body.compress
|
||||
h.flush
|
||||
let args := #[
|
||||
"-X", "POST", "-L", "-d", s!"@{path}",
|
||||
"--retry", "3", -- intermittent network errors can occur
|
||||
"-s", "-w", "%{stderr}%{json}\n",
|
||||
"-H", "Content-Type: application/json",
|
||||
]
|
||||
let args := Reservoir.lakeHeaders.foldl (· ++ #["-H", ·]) args
|
||||
let spawnArgs := {
|
||||
cmd := "curl", args := args.push url
|
||||
stdout := .piped, stderr := .piped
|
||||
}
|
||||
logVerbose (mkCmdLog spawnArgs)
|
||||
let {stdout, stderr, exitCode} ← IO.Process.output spawnArgs
|
||||
match Json.parse stdout >>= fromJson? with
|
||||
| .ok (resp : ReservoirResp (Array String)) =>
|
||||
match resp with
|
||||
| .data urls =>
|
||||
if h : infos.size = urls.size then
|
||||
let s := infos.size.fold (init := infos.toVector) fun i hi s =>
|
||||
s.set i {s[i] with url := urls[i]'(h ▸ hi)}
|
||||
return s.toArray
|
||||
else
|
||||
error s!"failed to fetch artifact URLs\
|
||||
\n POST {url}\
|
||||
\nIncorrect number of results: expected {infos.size}, got {urls.size}"
|
||||
| .error status message =>
|
||||
error s!"failed to fetch artifact URLs (status code: {status})\
|
||||
\n POST {url}\
|
||||
\nReservoir error: {message}"
|
||||
| .error _ =>
|
||||
match Json.parse stderr >>= fromJson? with
|
||||
| .ok (out : JsonObject) =>
|
||||
let mut msg := "failed to fetch artifact URLs"
|
||||
if let .ok code := out.getAs Nat "http_code" then
|
||||
msg := s!"{msg} (status code: {code})"
|
||||
msg := s!"{msg}\n POST {url}"
|
||||
if let .ok errMsg := out.getAs String "errormsg" then
|
||||
msg := s!"{msg}\n Transfer error: {errMsg}"
|
||||
unless stdout.isEmpty do
|
||||
msg := s!"{msg}\nstdout:\n{stdout.trimAsciiEnd}"
|
||||
logError msg
|
||||
logVerbose s!"curl JSON:\n{stderr.trimAsciiEnd}"
|
||||
| .error e =>
|
||||
logError s!"failed to fetch artifact URLs\
|
||||
\n POST {url}
|
||||
\nInvalid curl JSON: {e}; received: {stderr.trimAscii}"
|
||||
unless stdout.isEmpty do
|
||||
logWarning s!"curl produced unexpected output:\n{stdout.trimAsciiEnd}"
|
||||
error s!"curl exited with code {exitCode}"
|
||||
|
||||
@[deprecated "Deprecated without replacement." (since := "2026-02-27")]
|
||||
public def downloadOutputArtifacts
|
||||
|
||||
@@ -103,24 +103,6 @@ public instance : FromJson RegistryPkg := ⟨RegistryPkg.fromJson?⟩
|
||||
|
||||
end RegistryPkg
|
||||
|
||||
/-- A Reservoir API response object. -/
|
||||
public inductive ReservoirResp (α : Type u)
|
||||
| data (a : α)
|
||||
| error (status : Nat) (message : String)
|
||||
|
||||
public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do
|
||||
let obj ← JsonObject.fromJson? val
|
||||
if let some (err : JsonObject) ← obj.get? "error" then
|
||||
let status ← err.get "status"
|
||||
let message ← err.get "message"
|
||||
return .error status message
|
||||
else if let some (val : Json) ← obj.get? "data" then
|
||||
.data <$> fromJson? val
|
||||
else
|
||||
.data <$> fromJson? val
|
||||
|
||||
public instance [FromJson α] : FromJson (ReservoirResp α) := ⟨ReservoirResp.fromJson?⟩
|
||||
|
||||
public def Reservoir.pkgApiUrl (lakeEnv : Lake.Env) (owner pkg : String) :=
|
||||
s!"{lakeEnv.reservoirApiUrl}/packages/{uriEncode owner}/{uriEncode pkg}"
|
||||
|
||||
|
||||
@@ -6,8 +6,9 @@ Authors: Mac Malone
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Init.Prelude
|
||||
import Init.Data.Array.Basic
|
||||
public import Lake.Util.JsonObject
|
||||
|
||||
open Lean
|
||||
|
||||
namespace Lake
|
||||
|
||||
@@ -15,3 +16,23 @@ public def Reservoir.lakeHeaders : Array String := #[
|
||||
"X-Reservoir-Api-Version:1.0.0",
|
||||
"X-Lake-Registry-Api-Version:0.1.0"
|
||||
]
|
||||
|
||||
/-- A Reservoir API response object. -/
|
||||
public inductive ReservoirResp (α : Type u)
|
||||
| data (a : α)
|
||||
| error (status : Nat) (message : String)
|
||||
|
||||
public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do
|
||||
if let .ok obj := JsonObject.fromJson? val then
|
||||
if let some (err : JsonObject) ← obj.get? "error" then
|
||||
let status ← err.get "status"
|
||||
let message ← err.get "message"
|
||||
return .error status message
|
||||
else if let some (val : Json) ← obj.get? "data" then
|
||||
.data <$> fromJson? val
|
||||
else
|
||||
.data <$> fromJson? val
|
||||
else
|
||||
.data <$> fromJson? val
|
||||
|
||||
public instance [FromJson α] : FromJson (ReservoirResp α) := ⟨ReservoirResp.fromJson?⟩
|
||||
|
||||
@@ -31,7 +31,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_get_process_title() {
|
||||
return lean_io_result_mk_ok(lean_title);
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.setProcessTitle : @& String → IO Unit
|
||||
// Std.Internal.UV.System.setProcessTitle : String → IO Unit
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_set_process_title(b_obj_arg title) {
|
||||
const char* title_str = lean_string_cstr(title);
|
||||
if (strlen(title_str) != lean_string_size(title) - 1) {
|
||||
@@ -124,7 +124,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_cwd() {
|
||||
return lean_io_result_mk_ok(lean_cwd);
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.chdir : @& String → IO Unit
|
||||
// Std.Internal.UV.System.chdir : String → IO Unit
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_chdir(b_obj_arg path) {
|
||||
const char* path_str = lean_string_cstr(path);
|
||||
if (strlen(path_str) != lean_string_size(path) - 1) {
|
||||
@@ -271,7 +271,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_environ() {
|
||||
return lean_io_result_mk_ok(env_array);
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.osGetenv : @& String → IO (Option String)
|
||||
// Std.Internal.UV.System.osGetenv : String → IO (Option String)
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_getenv(b_obj_arg name) {
|
||||
const char* name_str = lean_string_cstr(name);
|
||||
if (strlen(name_str) != lean_string_size(name) - 1) {
|
||||
@@ -313,7 +313,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_getenv(b_obj_arg name) {
|
||||
}
|
||||
|
||||
|
||||
// Std.Internal.UV.System.osSetenv : @& String → @& String → IO Unit
|
||||
// Std.Internal.UV.System.osSetenv : String → String → IO Unit
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_setenv(b_obj_arg name, b_obj_arg value) {
|
||||
const char* name_str = lean_string_cstr(name);
|
||||
const char* value_str = lean_string_cstr(value);
|
||||
@@ -333,7 +333,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_setenv(b_obj_arg name, b_obj_arg
|
||||
return lean_io_result_mk_ok(lean_box(0));
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.osUnsetenv : @& String → IO Unit
|
||||
// Std.Internal.UV.System.osUnsetenv : String → IO Unit
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_unsetenv(b_obj_arg name) {
|
||||
const char* name_str = lean_string_cstr(name);
|
||||
if (strlen(name_str) != lean_string_size(name) - 1) {
|
||||
@@ -641,21 +641,21 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_environ() {
|
||||
);
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.osGetenv : @& String → IO (Option String)
|
||||
// Std.Internal.UV.System.osGetenv : String → IO (Option String)
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_getenv(b_obj_arg name) {
|
||||
lean_always_assert(
|
||||
false && ("Please build a version of Lean4 with libuv to invoke this.")
|
||||
);
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.osSetenv : @& String → @& String → IO Unit
|
||||
// Std.Internal.UV.System.osSetenv : String → String → IO Unit
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_setenv(b_obj_arg name, b_obj_arg value) {
|
||||
lean_always_assert(
|
||||
false && ("Please build a version of Lean4 with libuv to invoke this.")
|
||||
);
|
||||
}
|
||||
|
||||
// Std.Internal.UV.System.osUnsetenv : @& String → IO Unit
|
||||
// Std.Internal.UV.System.osUnsetenv : String → IO Unit
|
||||
extern "C" LEAN_EXPORT lean_obj_res lean_uv_os_unsetenv(b_obj_arg name) {
|
||||
lean_always_assert(
|
||||
false && ("Please build a version of Lean4 with libuv to invoke this.")
|
||||
|
||||
BIN
stage0/stdlib/Lake/Config/Cache.c
generated
BIN
stage0/stdlib/Lake/Config/Cache.c
generated
Binary file not shown.
BIN
stage0/stdlib/Lake/Reservoir.c
generated
BIN
stage0/stdlib/Lake/Reservoir.c
generated
Binary file not shown.
BIN
stage0/stdlib/Lake/Util/Reservoir.c
generated
BIN
stage0/stdlib/Lake/Util/Reservoir.c
generated
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data.c
generated
BIN
stage0/stdlib/Std/Internal/Http/Data.c
generated
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Any.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Any.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Basic.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Basic.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Empty.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Empty.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Full.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Full.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Length.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Length.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Stream.c
generated
Normal file
BIN
stage0/stdlib/Std/Internal/Http/Data/Body/Stream.c
generated
Normal file
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Request.c
generated
BIN
stage0/stdlib/Std/Internal/Http/Data/Request.c
generated
Binary file not shown.
BIN
stage0/stdlib/Std/Internal/Http/Data/Response.c
generated
BIN
stage0/stdlib/Std/Internal/Http/Data/Response.c
generated
Binary file not shown.
725
tests/elab/async_http_body.lean
Normal file
725
tests/elab/async_http_body.lean
Normal file
@@ -0,0 +1,725 @@
|
||||
import Std.Internal.Http.Data.Body
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std.Http
|
||||
open Std.Http.Body
|
||||
|
||||
/-! ## Stream tests -/
|
||||
|
||||
-- Test send and recv on stream
|
||||
|
||||
def channelSendRecv : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let chunk := Chunk.ofByteArray "hello".toUTF8
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send chunk
|
||||
let result ← stream.recv
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelSendRecv.block
|
||||
|
||||
|
||||
-- Test tryRecv on empty stream returns none
|
||||
|
||||
def channelTryRecvEmpty : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let result ← stream.tryRecv
|
||||
assert! result.isNone
|
||||
|
||||
#eval channelTryRecvEmpty.block
|
||||
|
||||
-- Test tryRecv consumes a waiting producer
|
||||
|
||||
def channelTryRecvWithPendingSend : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "data".toUTF8)
|
||||
let mut result := none
|
||||
let mut fuel := 100
|
||||
while result.isNone && fuel > 0 do
|
||||
result ← stream.tryRecv
|
||||
if result.isNone then
|
||||
let _ ← Selectable.one #[
|
||||
.case (← Selector.sleep 1) pure
|
||||
]
|
||||
fuel := fuel - 1
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "data".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelTryRecvWithPendingSend.block
|
||||
|
||||
-- Test close sets closed flag
|
||||
|
||||
def channelClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
assert! !(← stream.isClosed)
|
||||
stream.close
|
||||
assert! (← stream.isClosed)
|
||||
|
||||
#eval channelClose.block
|
||||
|
||||
-- Test recv on closed stream returns none
|
||||
|
||||
def channelRecvAfterClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.close
|
||||
let result ← stream.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval channelRecvAfterClose.block
|
||||
|
||||
-- Test for-in iteration collects chunks until close
|
||||
|
||||
def channelForIn : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let producer ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "a".toUTF8)
|
||||
stream.send (Chunk.ofByteArray "b".toUTF8)
|
||||
stream.close
|
||||
|
||||
let mut acc : ByteArray := .empty
|
||||
for chunk in stream do
|
||||
acc := acc ++ chunk.data
|
||||
|
||||
assert! acc == "ab".toUTF8
|
||||
await producer
|
||||
|
||||
#eval channelForIn.block
|
||||
|
||||
-- Test chunk extensions are preserved
|
||||
|
||||
def channelExtensions : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let chunk := { data := "hello".toUTF8, extensions := #[(.mk "key", some (Chunk.ExtensionValue.ofString! "value"))] : Chunk }
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send chunk
|
||||
let result ← stream.recv
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.extensions.size == 1
|
||||
assert! result.get!.extensions[0]! == (Chunk.ExtensionName.mk "key", some <| .ofString! "value")
|
||||
await sendTask
|
||||
|
||||
#eval channelExtensions.block
|
||||
|
||||
-- Test known size metadata
|
||||
|
||||
def channelKnownSize : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.setKnownSize (some (.fixed 100))
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 100)
|
||||
|
||||
#eval channelKnownSize.block
|
||||
|
||||
-- Test known size decreases when a chunk is consumed
|
||||
|
||||
def channelKnownSizeDecreases : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.setKnownSize (some (.fixed 5))
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "hello".toUTF8)
|
||||
let _ ← stream.recv
|
||||
await sendTask
|
||||
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 0)
|
||||
|
||||
#eval channelKnownSizeDecreases.block
|
||||
|
||||
-- Test only one blocked producer is allowed
|
||||
|
||||
def channelSingleProducerRule : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let send1 ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "one".toUTF8)
|
||||
|
||||
-- Yield so `send1` can occupy the single pending-producer slot.
|
||||
let _ ← Selectable.one #[
|
||||
.case (← Selector.sleep 5) pure
|
||||
]
|
||||
|
||||
let send2Failed ←
|
||||
try
|
||||
stream.send (Chunk.ofByteArray "two".toUTF8)
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
assert! send2Failed
|
||||
|
||||
let first ← stream.recv
|
||||
assert! first.isSome
|
||||
assert! first.get!.data == "one".toUTF8
|
||||
|
||||
await send1
|
||||
|
||||
#eval channelSingleProducerRule.block
|
||||
|
||||
-- Test only one blocked consumer is allowed
|
||||
|
||||
def channelSingleConsumerRule : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let recv1 ← async (t := AsyncTask) <| stream.recv
|
||||
|
||||
let hasInterest ← Selectable.one #[
|
||||
.case stream.interestSelector pure
|
||||
]
|
||||
assert! hasInterest
|
||||
|
||||
let recv2Failed ←
|
||||
try
|
||||
let _ ← stream.recv
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
|
||||
assert! recv2Failed
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "ok".toUTF8)
|
||||
let r1 ← await recv1
|
||||
|
||||
assert! r1.isSome
|
||||
assert! r1.get!.data == "ok".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelSingleConsumerRule.block
|
||||
|
||||
-- Test hasInterest reflects blocked receiver state
|
||||
|
||||
def channelHasInterest : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
assert! !(← stream.hasInterest)
|
||||
|
||||
let recvTask ← async (t := AsyncTask) <| stream.recv
|
||||
|
||||
let hasInterest ← Selectable.one #[
|
||||
.case stream.interestSelector pure
|
||||
]
|
||||
assert! hasInterest
|
||||
assert! (← stream.hasInterest)
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "x".toUTF8)
|
||||
let _ ← await recvTask
|
||||
await sendTask
|
||||
|
||||
assert! !(← stream.hasInterest)
|
||||
|
||||
#eval channelHasInterest.block
|
||||
|
||||
-- Test interestSelector resolves false when stream closes first
|
||||
|
||||
def channelInterestSelectorClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let waitInterest ← async (t := AsyncTask) <|
|
||||
Selectable.one #[
|
||||
.case stream.interestSelector pure
|
||||
]
|
||||
|
||||
stream.close
|
||||
let interested ← await waitInterest
|
||||
assert! interested == false
|
||||
|
||||
#eval channelInterestSelectorClose.block
|
||||
|
||||
-- Test incomplete sends are buffered and merged into one chunk on the final send
|
||||
|
||||
def channelIncompleteChunks : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "hel".toUTF8) (incomplete := true)
|
||||
stream.send (Chunk.ofByteArray "lo".toUTF8)
|
||||
|
||||
let result ← stream.recv
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelIncompleteChunks.block
|
||||
|
||||
-- Test sending to a closed stream raises an error
|
||||
|
||||
def channelSendAfterClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.close
|
||||
|
||||
let failed ←
|
||||
try
|
||||
stream.send (Chunk.ofByteArray "test".toUTF8)
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
assert! failed
|
||||
|
||||
#eval channelSendAfterClose.block
|
||||
|
||||
-- Test Body.stream runs producer and returns the stream handle
|
||||
|
||||
def channelStreamHelper : Async Unit := do
|
||||
let stream ← Body.stream fun s => do
|
||||
s.send (Chunk.ofByteArray "hello".toUTF8)
|
||||
|
||||
let result ← stream.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
|
||||
let eof ← stream.recv
|
||||
assert! eof.isNone
|
||||
|
||||
#eval channelStreamHelper.block
|
||||
|
||||
-- Test Body.empty creates an already-closed Stream
|
||||
|
||||
def channelEmptyHelper : Async Unit := do
|
||||
let stream ← Body.empty
|
||||
assert! (← stream.isClosed)
|
||||
|
||||
let result ← stream.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval channelEmptyHelper.block
|
||||
|
||||
-- Test Stream.readAll concatenates all chunks
|
||||
|
||||
def channelReadAll : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "foo".toUTF8)
|
||||
stream.send (Chunk.ofByteArray "bar".toUTF8)
|
||||
stream.close
|
||||
|
||||
let result : ByteArray ← stream.readAll
|
||||
assert! result == "foobar".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelReadAll.block
|
||||
|
||||
-- Test Stream.readAll enforces a maximum size limit
|
||||
|
||||
def channelReadAllMaxSize : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "abcdefgh".toUTF8)
|
||||
stream.close
|
||||
|
||||
let failed ←
|
||||
try
|
||||
let _ : ByteArray ← stream.readAll (maximumSize := some 4)
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
assert! failed
|
||||
await sendTask
|
||||
|
||||
#eval channelReadAllMaxSize.block
|
||||
|
||||
-- Test Stream.getKnownSize reflects the value set via setKnownSize
|
||||
|
||||
def channelKnownSizeRoundtrip : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.setKnownSize (some (.fixed 42))
|
||||
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 42)
|
||||
|
||||
#eval channelKnownSizeRoundtrip.block
|
||||
|
||||
/-! ## Full tests -/
|
||||
|
||||
-- Test Full.recv returns content once then EOF
|
||||
|
||||
def fullRecvConsumesOnce : Async Unit := do
|
||||
let full ← Body.Full.ofString "hello"
|
||||
let first ← full.recv
|
||||
let second ← full.recv
|
||||
|
||||
assert! first.isSome
|
||||
assert! first.get!.data == "hello".toUTF8
|
||||
assert! second.isNone
|
||||
|
||||
#eval fullRecvConsumesOnce.block
|
||||
|
||||
-- Test Full known-size metadata tracks consumption
|
||||
|
||||
def fullKnownSizeLifecycle : Async Unit := do
|
||||
let data := ByteArray.mk #[0x01, 0x02, 0x03, 0x04]
|
||||
let full ← Body.Full.ofByteArray data
|
||||
|
||||
assert! (← full.getKnownSize) == some (.fixed 4)
|
||||
let chunk ← full.recv
|
||||
assert! chunk.isSome
|
||||
assert! chunk.get!.data == data
|
||||
assert! (← full.getKnownSize) == some (.fixed 0)
|
||||
|
||||
#eval fullKnownSizeLifecycle.block
|
||||
|
||||
-- Test Full.close discards remaining content
|
||||
|
||||
def fullClose : Async Unit := do
|
||||
let full ← Body.Full.ofString "bye"
|
||||
assert! !(← full.isClosed)
|
||||
full.close
|
||||
assert! (← full.isClosed)
|
||||
assert! (← full.recv).isNone
|
||||
|
||||
#eval fullClose.block
|
||||
|
||||
-- Test Full from an empty ByteArray returns none on the first recv
|
||||
|
||||
def fullEmptyBytes : Async Unit := do
|
||||
let full ← Body.Full.ofByteArray ByteArray.empty
|
||||
let result ← full.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval fullEmptyBytes.block
|
||||
|
||||
-- Test Full.recvSelector resolves immediately with the stored chunk
|
||||
|
||||
def fullRecvSelectorResolves : Async Unit := do
|
||||
let full ← Body.Full.ofString "world"
|
||||
let result ← Selectable.one #[
|
||||
.case full.recvSelector pure
|
||||
]
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "world".toUTF8
|
||||
|
||||
#eval fullRecvSelectorResolves.block
|
||||
|
||||
-- Test Full.getKnownSize returns 0 after close
|
||||
|
||||
def fullKnownSizeAfterClose : Async Unit := do
|
||||
let full ← Body.Full.ofString "data"
|
||||
assert! (← full.getKnownSize) == some (.fixed 4)
|
||||
full.close
|
||||
assert! (← full.getKnownSize) == some (.fixed 0)
|
||||
|
||||
#eval fullKnownSizeAfterClose.block
|
||||
|
||||
-- Test Full.tryRecv succeeds once and returns none thereafter
|
||||
|
||||
def fullTryRecvIdempotent : Async Unit := do
|
||||
let full ← Body.Full.ofString "once"
|
||||
let first ← full.recv
|
||||
let second ← full.recv
|
||||
assert! first.isSome
|
||||
assert! first.get!.data == "once".toUTF8
|
||||
assert! second.isNone
|
||||
|
||||
#eval fullTryRecvIdempotent.block
|
||||
|
||||
/-! ## Empty tests -/
|
||||
|
||||
-- Test Empty.recv always returns none
|
||||
|
||||
def emptyBodyRecv : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
let result ← body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval emptyBodyRecv.block
|
||||
|
||||
-- Test Empty.isClosed is always true
|
||||
|
||||
def emptyBodyIsClosed : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
assert! (← body.isClosed)
|
||||
|
||||
#eval emptyBodyIsClosed.block
|
||||
|
||||
-- Test Empty.close is a no-op: still closed and recv still returns none
|
||||
|
||||
def emptyBodyClose : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
body.close
|
||||
assert! (← body.isClosed)
|
||||
let result ← body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval emptyBodyClose.block
|
||||
|
||||
-- Test Empty.recvSelector resolves immediately with none
|
||||
|
||||
def emptyBodyRecvSelector : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
let result ← Selectable.one #[
|
||||
.case body.recvSelector pure
|
||||
]
|
||||
assert! result.isNone
|
||||
|
||||
#eval emptyBodyRecvSelector.block
|
||||
|
||||
/-! ## Any tests -/
|
||||
|
||||
-- Test Any wrapping a Full body forwards recv correctly
|
||||
|
||||
def anyFromFull : Async Unit := do
|
||||
let full ← Body.Full.ofString "hello"
|
||||
let any : Body.Any := full
|
||||
let result ← any.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
|
||||
#eval anyFromFull.block
|
||||
|
||||
-- Test Any wrapping an Empty body returns none and reports closed
|
||||
|
||||
def anyFromEmpty : Async Unit := do
|
||||
let empty : Body.Empty := {}
|
||||
let any : Body.Any := empty
|
||||
let result ← any.recv
|
||||
assert! result.isNone
|
||||
assert! (← any.isClosed)
|
||||
|
||||
#eval anyFromEmpty.block
|
||||
|
||||
-- Test Any.close closes the underlying body
|
||||
|
||||
def anyCloseForwards : Async Unit := do
|
||||
let full ← Body.Full.ofString "test"
|
||||
let any : Body.Any := full
|
||||
any.close
|
||||
assert! (← any.isClosed)
|
||||
let result ← any.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval anyCloseForwards.block
|
||||
|
||||
-- Test Any.recvSelector resolves immediately for a Full body
|
||||
|
||||
def anyRecvSelectorFromFull : Async Unit := do
|
||||
let full ← Body.Full.ofString "sel"
|
||||
let any : Body.Any := full
|
||||
let result ← Selectable.one #[
|
||||
.case any.recvSelector pure
|
||||
]
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "sel".toUTF8
|
||||
|
||||
#eval anyRecvSelectorFromFull.block
|
||||
|
||||
/-! ## Request.Builder body tests -/
|
||||
|
||||
private def recvBuiltBody (body : Body.Full) : Async (Option Chunk) :=
|
||||
body.recv
|
||||
|
||||
-- Test Request.Builder.text sets correct headers
|
||||
|
||||
def requestBuilderText : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.text "Hello, World!"
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8")
|
||||
assert! req.line.headers.get? Header.Name.contentLength == none
|
||||
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "Hello, World!".toUTF8
|
||||
|
||||
#eval requestBuilderText.block
|
||||
|
||||
-- Test Request.Builder.json sets correct headers
|
||||
|
||||
def requestBuilderJson : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.json "{\"key\": \"value\"}"
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json")
|
||||
assert! req.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "{\"key\": \"value\"}".toUTF8
|
||||
|
||||
#eval requestBuilderJson.block
|
||||
|
||||
-- Test Request.Builder.fromBytes sets body
|
||||
|
||||
def requestBuilderFromBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0x01, 0x02, 0x03]
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.fromBytes data
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval requestBuilderFromBytes.block
|
||||
|
||||
-- Test Request.Builder.noBody creates empty body
|
||||
|
||||
def requestBuilderNoBody : Async Unit := do
|
||||
let req ← Request.get (.originForm! "/api")
|
||||
|>.empty
|
||||
|
||||
assert! req.body == {}
|
||||
|
||||
#eval requestBuilderNoBody.block
|
||||
|
||||
-- Test Request.Builder.bytes sets application/octet-stream content type
|
||||
|
||||
def requestBuilderBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0xde, 0xad, 0xbe, 0xef]
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.bytes data
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/octet-stream")
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval requestBuilderBytes.block
|
||||
|
||||
-- Test Request.Builder.html sets text/html content type
|
||||
|
||||
def requestBuilderHtml : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.html "<h1>Hello</h1>"
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/html; charset=utf-8")
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "<h1>Hello</h1>".toUTF8
|
||||
|
||||
#eval requestBuilderHtml.block
|
||||
|
||||
-- Test Request.Builder.stream creates a streaming body
|
||||
|
||||
def requestBuilderStream : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.stream fun s => do
|
||||
s.send (Chunk.ofByteArray "streamed".toUTF8)
|
||||
|
||||
let result ← req.body.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "streamed".toUTF8
|
||||
|
||||
#eval requestBuilderStream.block
|
||||
|
||||
-- Test Request.Builder.noBody body is always closed and returns none
|
||||
|
||||
def requestBuilderNoBodyAlwaysClosed : Async Unit := do
|
||||
let req ← Request.get (.originForm! "/api")
|
||||
|>.empty
|
||||
|
||||
assert! (← req.body.isClosed)
|
||||
let result ← req.body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval requestBuilderNoBodyAlwaysClosed.block
|
||||
|
||||
/-! ## Response.Builder body tests -/
|
||||
|
||||
-- Test Response.Builder.text sets correct headers
|
||||
|
||||
def responseBuilderText : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.text "Hello, World!"
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8")
|
||||
assert! res.line.headers.get? Header.Name.contentLength == none
|
||||
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "Hello, World!".toUTF8
|
||||
|
||||
#eval responseBuilderText.block
|
||||
|
||||
-- Test Response.Builder.json sets correct headers
|
||||
|
||||
def responseBuilderJson : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.json "{\"status\": \"ok\"}"
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json")
|
||||
assert! res.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "{\"status\": \"ok\"}".toUTF8
|
||||
|
||||
#eval responseBuilderJson.block
|
||||
|
||||
-- Test Response.Builder.fromBytes sets body
|
||||
|
||||
def responseBuilderFromBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0xaa, 0xbb]
|
||||
let res ← Response.ok
|
||||
|>.fromBytes data
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval responseBuilderFromBytes.block
|
||||
|
||||
-- Test Response.Builder.noBody creates empty body
|
||||
|
||||
def responseBuilderNoBody : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.empty
|
||||
|
||||
assert! res.body == {}
|
||||
|
||||
#eval responseBuilderNoBody.block
|
||||
|
||||
-- Test Response.Builder.bytes sets application/octet-stream content type
|
||||
|
||||
def responseBuilderBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0xca, 0xfe]
|
||||
let res ← Response.ok
|
||||
|>.bytes data
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/octet-stream")
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval responseBuilderBytes.block
|
||||
|
||||
-- Test Response.Builder.html sets text/html content type
|
||||
|
||||
def responseBuilderHtml : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.html "<p>OK</p>"
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/html; charset=utf-8")
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "<p>OK</p>".toUTF8
|
||||
|
||||
#eval responseBuilderHtml.block
|
||||
|
||||
-- Test Response.Builder.stream creates a streaming body
|
||||
|
||||
def responseBuilderStream : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.stream fun s => do
|
||||
s.send (Chunk.ofByteArray "streamed".toUTF8)
|
||||
|
||||
let result ← res.body.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "streamed".toUTF8
|
||||
|
||||
#eval responseBuilderStream.block
|
||||
|
||||
-- Test Response.Builder.noBody body is always closed and returns none
|
||||
|
||||
def responseBuilderNoBodyAlwaysClosed : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.empty
|
||||
|
||||
assert! (← res.body.isClosed)
|
||||
let result ← res.body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval responseBuilderNoBodyAlwaysClosed.block
|
||||
6
tests/elab/grind_13167.lean
Normal file
6
tests/elab/grind_13167.lean
Normal file
@@ -0,0 +1,6 @@
|
||||
module
|
||||
|
||||
-- https://github.com/leanprover/lean4/issues/13167
|
||||
theorem Option.bind_pmap {α β γ} {p : α → Prop} (f : ∀ a, p a → β) (x : Option α) (g : β → Option γ) (H) :
|
||||
pmap f x H >>= g = x.pbind fun a h ↦ g (f a (H _ h)) := by
|
||||
grind [cases Option, pmap]
|
||||
Reference in New Issue
Block a user