mirror of
https://github.com/leanprover/lean4.git
synced 2026-04-22 03:54:08 +00:00
Compare commits
371 Commits
sg/control
...
sofia/asyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ac6746e3a | ||
|
|
a5551e3291 | ||
|
|
286182df24 | ||
|
|
bb90f72a40 | ||
|
|
c485824d11 | ||
|
|
afe1676e4a | ||
|
|
64889857b2 | ||
|
|
0ac5d75bac | ||
|
|
e4f2f5717c | ||
|
|
abbe36c0d2 | ||
|
|
7ef652911e | ||
|
|
9ef386d7c3 | ||
|
|
b9b2e08181 | ||
|
|
33caa4e82f | ||
|
|
8c292c70ee | ||
|
|
4f4ee7c789 | ||
|
|
d7ea3a5984 | ||
|
|
33c36c7466 | ||
|
|
e86dbf3992 | ||
|
|
d71f0bdae7 | ||
|
|
232d173af3 | ||
|
|
3a4a309aed | ||
|
|
e359001026 | ||
|
|
72244398dc | ||
|
|
c0e60b797c | ||
|
|
400908a2f4 | ||
|
|
394c999c2a | ||
|
|
b7e88dadeb | ||
|
|
a39a0575a0 | ||
|
|
5815f33342 | ||
|
|
7a852aedb6 | ||
|
|
1554f57525 | ||
|
|
1fa01cdadb | ||
|
|
758e5afb07 | ||
|
|
11516bbf09 | ||
|
|
f76dca5bba | ||
|
|
fe6ac812af | ||
|
|
51a00843ea | ||
|
|
eab144bbb2 | ||
|
|
cfe282f024 | ||
|
|
debafcf0ef | ||
|
|
2668f07808 | ||
|
|
0315d56389 | ||
|
|
b9e489cc8f | ||
|
|
135b049080 | ||
|
|
4005bd027b | ||
|
|
fbf03e31f9 | ||
|
|
39ab2b289c | ||
|
|
6c6f9a5d83 | ||
|
|
a7aea9a12d | ||
|
|
74af777707 | ||
|
|
3dfb5e002a | ||
|
|
3075e5091b | ||
|
|
af12f7e9be | ||
|
|
53c9277209 | ||
|
|
f14977f495 | ||
|
|
cfa5cf76fc | ||
|
|
238925a681 | ||
|
|
8cb236e9eb | ||
|
|
3d039f8dba | ||
|
|
203d5362d4 | ||
|
|
6189d4c130 | ||
|
|
58f14d34d7 | ||
|
|
710eee2b49 | ||
|
|
bd4af50d04 | ||
|
|
8cb30347b6 | ||
|
|
d8e6b09b90 | ||
|
|
df8abc2b3f | ||
|
|
5a852bdffd | ||
|
|
11d3860c69 | ||
|
|
5a253001b3 | ||
|
|
083fec29c8 | ||
|
|
d41753a5f9 | ||
|
|
a086a817e0 | ||
|
|
5e1204e70d | ||
|
|
61d7c151da | ||
|
|
bd10d0193e | ||
|
|
67822f4c42 | ||
|
|
d99485dd79 | ||
|
|
f85b9b8d09 | ||
|
|
5fb254b7ef | ||
|
|
6e202e34a4 | ||
|
|
843c814778 | ||
|
|
c7d4d8d799 | ||
|
|
91c60f801c | ||
|
|
ae30f55728 | ||
|
|
63b0cc17c4 | ||
|
|
33393a7c00 | ||
|
|
7434b97511 | ||
|
|
29c8f8cfa1 | ||
|
|
36b2d99e3d | ||
|
|
3de1d21c86 | ||
|
|
83a0756b05 | ||
|
|
b8f2cd94aa | ||
|
|
64ff045559 | ||
|
|
109ab8eb68 | ||
|
|
bf09ea8ff5 | ||
|
|
7ce9fe9f97 | ||
|
|
aff9e0c459 | ||
|
|
a74df33feb | ||
|
|
dd63b614eb | ||
|
|
515e6e20c0 | ||
|
|
cc45fc9cc2 | ||
|
|
bc9c18f0b0 | ||
|
|
8ee21a7176 | ||
|
|
92aa9f2b8a | ||
|
|
c2243a0ea5 | ||
|
|
efbd23a6d9 | ||
|
|
26440fcf6a | ||
|
|
ac4c5451e4 | ||
|
|
c94c5cb7e4 | ||
|
|
78ca6edc99 | ||
|
|
d92dc22df3 | ||
|
|
48ab74f044 | ||
|
|
da68a63902 | ||
|
|
db99fd2d7d | ||
|
|
a61712c962 | ||
|
|
ea36555588 | ||
|
|
b02bc4d6d2 | ||
|
|
c836fe8723 | ||
|
|
8068ed317c | ||
|
|
0bd44ab745 | ||
|
|
172d12c75c | ||
|
|
6b6b9fffff | ||
|
|
f3fa5c8242 | ||
|
|
b0c5667f06 | ||
|
|
2d262c9755 | ||
|
|
571898bf63 | ||
|
|
0570277a2e | ||
|
|
557709d9bb | ||
|
|
0229508ca7 | ||
|
|
ace10ee42b | ||
|
|
4e36dcc98f | ||
|
|
a93ea184fe | ||
|
|
c309a3c07e | ||
|
|
30641c617f | ||
|
|
1aa23cd92b | ||
|
|
0bb4ba72d4 | ||
|
|
57a4d9ad4b | ||
|
|
bfc6617c12 | ||
|
|
aa09ab0cd9 | ||
|
|
8affe05767 | ||
|
|
3aa02eede3 | ||
|
|
c86f926d1b | ||
|
|
ff4419357c | ||
|
|
3c131da050 | ||
|
|
bae251d15a | ||
|
|
6edc0c7427 | ||
|
|
563189fec9 | ||
|
|
25d7db2e62 | ||
|
|
beedfa1e4e | ||
|
|
f68c2420e7 | ||
|
|
cdfd24171a | ||
|
|
718e549de3 | ||
|
|
81f76a24d8 | ||
|
|
292f297006 | ||
|
|
b7be57272a | ||
|
|
a0dc1dbbc0 | ||
|
|
2e604884dd | ||
|
|
2049542833 | ||
|
|
caf19b8458 | ||
|
|
c5180b2dfc | ||
|
|
91c5b717f0 | ||
|
|
cb6f540efb | ||
|
|
ec833b52ee | ||
|
|
ba36c1dee2 | ||
|
|
5cb510cdf7 | ||
|
|
a72de461cd | ||
|
|
228f0d24a7 | ||
|
|
73cf41d7e5 | ||
|
|
819d4c6c1f | ||
|
|
4de3e40349 | ||
|
|
03f1d47462 | ||
|
|
a88908572c | ||
|
|
55d357dbb4 | ||
|
|
49d00ae056 | ||
|
|
e9eed5cbe4 | ||
|
|
2652ae0fb8 | ||
|
|
3f48ef4af9 | ||
|
|
a9de308aea | ||
|
|
c266649454 | ||
|
|
74dc55152f | ||
|
|
bf2471b8f1 | ||
|
|
21821ef062 | ||
|
|
5ba3a6d4fc | ||
|
|
8492e58a82 | ||
|
|
e65e20e1cb | ||
|
|
de7c029c9f | ||
|
|
89c992a3c9 | ||
|
|
0b76c3de69 | ||
|
|
ff99979855 | ||
|
|
9ddbb59fe1 | ||
|
|
36f87f98f8 | ||
|
|
5914fe3a4a | ||
|
|
29f651a89c | ||
|
|
2e1bdd922e | ||
|
|
ab5d50cbc3 | ||
|
|
7902db17c2 | ||
|
|
5626ee369c | ||
|
|
e62f8d608d | ||
|
|
0fb57a405f | ||
|
|
999ce40ca6 | ||
|
|
bfa18ef30c | ||
|
|
81492aa5b2 | ||
|
|
e0efb8aec9 | ||
|
|
530f6865f9 | ||
|
|
f97d86cf4b | ||
|
|
781b9f561e | ||
|
|
a9ac33d994 | ||
|
|
c457a98d6a | ||
|
|
8d8439bf0b | ||
|
|
54ac93fb32 | ||
|
|
eddb5e139d | ||
|
|
5a53207723 | ||
|
|
0d3f6e5481 | ||
|
|
96a017262c | ||
|
|
549e16f069 | ||
|
|
93a6ecbbbc | ||
|
|
8bcc838f47 | ||
|
|
462e3d02dd | ||
|
|
541f9b2dc9 | ||
|
|
c5db47444e | ||
|
|
42800e4037 | ||
|
|
eaa1390a36 | ||
|
|
b38f01ef51 | ||
|
|
73bf2b5e04 | ||
|
|
c8c92fcf92 | ||
|
|
cf6b159da5 | ||
|
|
b1ff312ef5 | ||
|
|
319214cfb3 | ||
|
|
e1225efa03 | ||
|
|
37c7b1e22c | ||
|
|
c4234961bc | ||
|
|
42cfda23f3 | ||
|
|
78316b9ade | ||
|
|
dd09289d2b | ||
|
|
10a66e9f9a | ||
|
|
ad4719399d | ||
|
|
b8eac648ab | ||
|
|
53fb1a25b3 | ||
|
|
3fdaf2df0c | ||
|
|
4ba722f51c | ||
|
|
a89a69e7da | ||
|
|
3646590506 | ||
|
|
193bbddb4e | ||
|
|
6821bb82db | ||
|
|
ea5a986693 | ||
|
|
37ec94e2f0 | ||
|
|
157e3b032d | ||
|
|
910c71954e | ||
|
|
dd2ab67d2b | ||
|
|
9dd5634759 | ||
|
|
a521ba3abd | ||
|
|
6b0f05d075 | ||
|
|
61d6c02ecd | ||
|
|
df738acaa4 | ||
|
|
8722e50897 | ||
|
|
96ffa3e354 | ||
|
|
1c564ed5f7 | ||
|
|
9dd5f62e0e | ||
|
|
c4737fb66a | ||
|
|
43d3b2df91 | ||
|
|
87c5488c20 | ||
|
|
e0d5596e63 | ||
|
|
1f2671db3d | ||
|
|
940ab9bdb5 | ||
|
|
9acca40aaf | ||
|
|
bf2ed2c87a | ||
|
|
61c93a7f57 | ||
|
|
b042b8efbd | ||
|
|
8c00ba48ae | ||
|
|
d07f5c502f | ||
|
|
4a641fc498 | ||
|
|
4f20a815ec | ||
|
|
c9296c7371 | ||
|
|
4db36b214b | ||
|
|
a6d94c7504 | ||
|
|
045abb48bb | ||
|
|
10337c620b | ||
|
|
698f557aa3 | ||
|
|
692c7c1a09 | ||
|
|
1bdfdcdb38 | ||
|
|
cacfe00c1d | ||
|
|
0fd0fa9c73 | ||
|
|
52fdc0f734 | ||
|
|
451c11d5a1 | ||
|
|
e92fcf6d46 | ||
|
|
07140aceb8 | ||
|
|
2cc32928a4 | ||
|
|
153513d5e2 | ||
|
|
94308408a9 | ||
|
|
1ae6970b77 | ||
|
|
976cc79b0c | ||
|
|
9ce1821be0 | ||
|
|
eeff4847fe | ||
|
|
2956f88050 | ||
|
|
26d9c1c07b | ||
|
|
d099586632 | ||
|
|
058d95e441 | ||
|
|
43aa88e5a6 | ||
|
|
8fe2d519d2 | ||
|
|
07ed645f45 | ||
|
|
9485e8f5eb | ||
|
|
dc96616781 | ||
|
|
0c44b4ae05 | ||
|
|
3568464ca7 | ||
|
|
8e5296c71a | ||
|
|
eee971e3ef | ||
|
|
7a1f8b2d30 | ||
|
|
b12ab7eae4 | ||
|
|
10c8a923e6 | ||
|
|
3e9674eaa9 | ||
|
|
2bc2080fbe | ||
|
|
f02139f7ce | ||
|
|
d004e175e2 | ||
|
|
9ad4ee304b | ||
|
|
5bd280553d | ||
|
|
7e215c8220 | ||
|
|
2c23680163 | ||
|
|
c4f179daa0 | ||
|
|
c2f657a15a | ||
|
|
9332081875 | ||
|
|
1cec97568b | ||
|
|
b567713641 | ||
|
|
de776c1f32 | ||
|
|
c498ea74ec | ||
|
|
f4aad3a494 | ||
|
|
1cebf576c3 | ||
|
|
dd125c7999 | ||
|
|
5e3dce8088 | ||
|
|
4c64f2c2e8 | ||
|
|
aa6e11dfc0 | ||
|
|
e7d1e7dd54 | ||
|
|
03843fd3f0 | ||
|
|
294e9900ea | ||
|
|
f13651979e | ||
|
|
3d8ba4d09b | ||
|
|
63984c8dda | ||
|
|
e2fd8a5835 | ||
|
|
a0263870b9 | ||
|
|
3c4ae58aff | ||
|
|
5965707575 | ||
|
|
dbe0140578 | ||
|
|
6ffd5ad2a4 | ||
|
|
7ce8cbc01c | ||
|
|
12a7603c77 | ||
|
|
53a6355074 | ||
|
|
f8ad249e42 | ||
|
|
3c41d3961e | ||
|
|
18bc715bad | ||
|
|
3349d20663 | ||
|
|
bad70e3eab | ||
|
|
21286eb163 | ||
|
|
0e5f07558c | ||
|
|
6e26b901e4 | ||
|
|
81c67c8f12 | ||
|
|
990e21eefc | ||
|
|
7141144a2f | ||
|
|
8c343501c1 | ||
|
|
44f08686cd | ||
|
|
65883f8c2a | ||
|
|
bd28a8fad5 | ||
|
|
8ba86c2c67 | ||
|
|
d3cddf9e44 | ||
|
|
5f3babee5c | ||
|
|
26dfc9a872 | ||
|
|
e47439e8be | ||
|
|
1ef53758be | ||
|
|
8544042789 | ||
|
|
f564d43d98 | ||
|
|
32fa0666c9 |
@@ -14,6 +14,7 @@ public import Std.Internal.Http.Data.Status
|
||||
public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Headers
|
||||
public import Std.Internal.Http.Data.URI
|
||||
public import Std.Internal.Http.Data.Body
|
||||
|
||||
/-!
|
||||
# HTTP Data Types
|
||||
|
||||
24
src/Std/Internal/Http/Data/Body.lean
Normal file
24
src/Std/Internal/Http/Data/Body.lean
Normal file
@@ -0,0 +1,24 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Data.Body.Basic
|
||||
public import Std.Internal.Http.Data.Body.Length
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
public import Std.Internal.Http.Data.Body.Stream
|
||||
public import Std.Internal.Http.Data.Body.Empty
|
||||
public import Std.Internal.Http.Data.Body.Full
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body
|
||||
|
||||
This module re-exports all HTTP body types: `Body.Empty`, `Body.Full`, `Body.Stream`,
|
||||
`Body.Any`, and `Body.Length`, along with the `Http.Body` typeclass and conversion
|
||||
utilities (`ToByteArray`, `FromByteArray`).
|
||||
-/
|
||||
83
src/Std/Internal/Http/Data/Body/Any.lean
Normal file
83
src/Std/Internal/Http/Data/Body/Any.lean
Normal file
@@ -0,0 +1,83 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Data.Body.Basic
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Any
|
||||
|
||||
A type-erased body backed by closures. Implements `Http.Body` and can be constructed from any
|
||||
type that also implements `Http.Body`. Used as the default handler response body type.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
A type-erased body handle. Operations are stored as closures, making it open to any body type
|
||||
that implements `Http.Body`.
|
||||
-/
|
||||
structure Any where
|
||||
|
||||
/--
|
||||
Receives the next body chunk. Returns `none` at end-of-stream.
|
||||
-/
|
||||
recv : Async (Option Chunk)
|
||||
|
||||
/--
|
||||
Closes the body stream.
|
||||
-/
|
||||
close : Async Unit
|
||||
|
||||
/--
|
||||
Returns `true` when the body stream is closed.
|
||||
-/
|
||||
isClosed : Async Bool
|
||||
|
||||
/--
|
||||
Selector that resolves when a chunk is available or EOF is reached.
|
||||
-/
|
||||
recvSelector : Selector (Option Chunk)
|
||||
|
||||
/--
|
||||
Returns the declared size.
|
||||
-/
|
||||
getKnownSize : Async (Option Body.Length)
|
||||
|
||||
/--
|
||||
Sets the size of the body.
|
||||
-/
|
||||
setKnownSize : Option Body.Length → Async Unit
|
||||
namespace Any
|
||||
|
||||
/--
|
||||
Erases a body of any `Http.Body` instance into a `Body.Any`.
|
||||
-/
|
||||
def ofBody [Http.Body α] (body : α) : Any where
|
||||
recv := Http.Body.recv body
|
||||
close := Http.Body.close body
|
||||
isClosed := Http.Body.isClosed body
|
||||
recvSelector := Http.Body.recvSelector body
|
||||
getKnownSize := Http.Body.getKnownSize body
|
||||
setKnownSize := Http.Body.setKnownSize body
|
||||
|
||||
end Any
|
||||
|
||||
instance : Http.Body Any where
|
||||
recv := Any.recv
|
||||
close := Any.close
|
||||
isClosed := Any.isClosed
|
||||
recvSelector := Any.recvSelector
|
||||
getKnownSize := Any.getKnownSize
|
||||
setKnownSize := Any.setKnownSize
|
||||
|
||||
end Std.Http.Body
|
||||
102
src/Std/Internal/Http/Data/Body/Basic.lean
Normal file
102
src/Std/Internal/Http/Data/Body/Basic.lean
Normal file
@@ -0,0 +1,102 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async
|
||||
public import Std.Internal.Async.ContextAsync
|
||||
public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Headers
|
||||
public import Std.Internal.Http.Data.Body.Length
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Basic
|
||||
|
||||
This module defines the `Body` typeclass for HTTP body streams, and shared conversion types
|
||||
`ToByteArray` and `FromByteArray` used for encoding and decoding body content.
|
||||
-/
|
||||
|
||||
namespace Std.Http
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
Typeclass for values that can be read as HTTP body streams.
|
||||
-/
|
||||
class Body (α : Type) where
|
||||
/--
|
||||
Receives the next body chunk. Returns `none` at end-of-stream.
|
||||
-/
|
||||
recv : α → Async (Option Chunk)
|
||||
|
||||
/--
|
||||
Closes the body stream.
|
||||
-/
|
||||
close : α → Async Unit
|
||||
|
||||
/--
|
||||
Returns `true` when the body stream is closed.
|
||||
-/
|
||||
isClosed : α → Async Bool
|
||||
|
||||
/--
|
||||
Selector that resolves when a chunk is available or EOF is reached.
|
||||
-/
|
||||
recvSelector : α → Selector (Option Chunk)
|
||||
|
||||
/--
|
||||
Gets the declared size of the body.
|
||||
-/
|
||||
getKnownSize : α → Async (Option Body.Length)
|
||||
|
||||
/--
|
||||
Sets the declared size of a body.
|
||||
-/
|
||||
setKnownSize : α → Option Body.Length → Async Unit
|
||||
end Std.Http
|
||||
|
||||
namespace Std.Http.Body
|
||||
|
||||
/--
|
||||
Typeclass for types that can be converted to a `ByteArray`.
|
||||
-/
|
||||
class ToByteArray (α : Type) where
|
||||
|
||||
/--
|
||||
Transforms into a `ByteArray`.
|
||||
-/
|
||||
toByteArray : α → ByteArray
|
||||
|
||||
instance : ToByteArray ByteArray where
|
||||
toByteArray := id
|
||||
|
||||
instance : ToByteArray String where
|
||||
toByteArray := String.toUTF8
|
||||
|
||||
/--
|
||||
Typeclass for types that can be decoded from a `ByteArray`. The conversion may fail with an error
|
||||
message if the bytes are not valid for the target type.
|
||||
-/
|
||||
class FromByteArray (α : Type) where
|
||||
|
||||
/--
|
||||
Attempts to decode a `ByteArray` into the target type, returning an error message on failure.
|
||||
-/
|
||||
fromByteArray : ByteArray → Except String α
|
||||
|
||||
instance : FromByteArray ByteArray where
|
||||
fromByteArray := .ok
|
||||
|
||||
instance : FromByteArray String where
|
||||
fromByteArray bs :=
|
||||
match String.fromUTF8? bs with
|
||||
| some s => .ok s
|
||||
| none => .error "invalid UTF-8 encoding"
|
||||
|
||||
end Std.Http.Body
|
||||
116
src/Std/Internal/Http/Data/Body/Empty.lean
Normal file
116
src/Std/Internal/Http/Data/Body/Empty.lean
Normal file
@@ -0,0 +1,116 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Http.Data.Request
|
||||
public import Std.Internal.Http.Data.Response
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Empty
|
||||
|
||||
Represents an always-empty, already-closed body handle.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
An empty body handle.
|
||||
-/
|
||||
structure Empty where
|
||||
deriving Inhabited, BEq
|
||||
|
||||
namespace Empty
|
||||
|
||||
/--
|
||||
Receives from an empty body, always returning end-of-stream.
|
||||
-/
|
||||
@[inline]
|
||||
def recv (_ : Empty) : Async (Option Chunk) :=
|
||||
pure none
|
||||
|
||||
/--
|
||||
Closes an empty body (no-op).
|
||||
-/
|
||||
@[inline]
|
||||
def close (_ : Empty) : Async Unit :=
|
||||
pure ()
|
||||
|
||||
/--
|
||||
Empty bodies are always closed for reading.
|
||||
-/
|
||||
@[inline]
|
||||
def isClosed (_ : Empty) : Async Bool :=
|
||||
pure true
|
||||
|
||||
/--
|
||||
Selector that immediately resolves with end-of-stream for an empty body.
|
||||
-/
|
||||
@[inline]
|
||||
def recvSelector (_ : Empty) : Selector (Option Chunk) where
|
||||
tryFn := pure (some none)
|
||||
registerFn waiter := do
|
||||
let lose := pure ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok none)
|
||||
waiter.race lose win
|
||||
unregisterFn := pure ()
|
||||
|
||||
end Empty
|
||||
|
||||
instance : Http.Body Empty where
|
||||
recv := Empty.recv
|
||||
close := Empty.close
|
||||
isClosed := Empty.isClosed
|
||||
recvSelector := Empty.recvSelector
|
||||
getKnownSize _ := pure (some <| .fixed 0)
|
||||
setKnownSize _ _ := pure ()
|
||||
|
||||
|
||||
instance : Coe Empty Any := ⟨Any.ofBody⟩
|
||||
|
||||
instance : Coe (Response Empty) (Response Any) where
|
||||
coe f := { f with }
|
||||
|
||||
instance : Coe (ContextAsync (Response Empty)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
instance : Coe (Async (Response Empty)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
end Body
|
||||
|
||||
namespace Request.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a request with no body.
|
||||
-/
|
||||
def empty (builder : Builder) : Async (Request Body.Empty) :=
|
||||
pure <| builder.body {}
|
||||
|
||||
end Request.Builder
|
||||
|
||||
namespace Response.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a response with no body.
|
||||
-/
|
||||
def empty (builder : Builder) : Async (Response Body.Empty) :=
|
||||
pure <| builder.body {}
|
||||
|
||||
end Response.Builder
|
||||
232
src/Std/Internal/Http/Data/Body/Full.lean
Normal file
232
src/Std/Internal/Http/Data/Body/Full.lean
Normal file
@@ -0,0 +1,232 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Sync
|
||||
public import Std.Internal.Http.Data.Request
|
||||
public import Std.Internal.Http.Data.Response
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
public import Init.Data.ByteArray
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Full
|
||||
|
||||
A body backed by a fixed `ByteArray` held in a `Mutex`.
|
||||
|
||||
The byte array is consumed at most once: the first call to `recv` atomically takes the data
|
||||
and returns it as a single chunk; subsequent calls return `none` (end-of-stream).
|
||||
Closing the body discards any unconsumed data.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
A body backed by a fixed, mutex-protected `ByteArray`.
|
||||
|
||||
The data is consumed on the first read. Once consumed (or explicitly closed), the body
|
||||
behaves as a closed, empty channel.
|
||||
-/
|
||||
structure Full where
|
||||
private mk ::
|
||||
private state : Mutex (Option ByteArray)
|
||||
deriving Nonempty
|
||||
|
||||
namespace Full
|
||||
|
||||
private def takeChunk : AtomicT (Option ByteArray) Async (Option Chunk) := do
|
||||
match ← get with
|
||||
| none =>
|
||||
pure none
|
||||
| some data =>
|
||||
set (none : Option ByteArray)
|
||||
if data.isEmpty then
|
||||
pure none
|
||||
else
|
||||
pure (some (Chunk.ofByteArray data))
|
||||
|
||||
/--
|
||||
Creates a `Full` body from a `ByteArray`.
|
||||
-/
|
||||
def ofByteArray (data : ByteArray) : Async Full := do
|
||||
let state ← Mutex.new (some data)
|
||||
return { state }
|
||||
|
||||
/--
|
||||
Creates a `Full` body from a `String`.
|
||||
-/
|
||||
def ofString (data : String) : Async Full := do
|
||||
let state ← Mutex.new (some data.toUTF8)
|
||||
return { state }
|
||||
|
||||
/--
|
||||
Receives the body data. Returns the full byte array on the first call as a single chunk,
|
||||
then `none` on all subsequent calls.
|
||||
-/
|
||||
def recv (full : Full) : Async (Option Chunk) :=
|
||||
full.state.atomically do
|
||||
takeChunk
|
||||
|
||||
/--
|
||||
Closes the body, discarding any unconsumed data.
|
||||
-/
|
||||
def close (full : Full) : Async Unit :=
|
||||
full.state.atomically do
|
||||
set (none : Option ByteArray)
|
||||
|
||||
/--
|
||||
Returns `true` when the data has been consumed or the body has been closed.
|
||||
-/
|
||||
def isClosed (full : Full) : Async Bool :=
|
||||
full.state.atomically do
|
||||
return (← get).isNone
|
||||
|
||||
/--
|
||||
Returns the known size of the remaining data.
|
||||
Returns `some (.fixed n)` with the current byte count, or `some (.fixed 0)` if the body has
|
||||
already been consumed or closed.
|
||||
-/
|
||||
def getKnownSize (full : Full) : Async (Option Body.Length) :=
|
||||
full.state.atomically do
|
||||
match ← get with
|
||||
| none => pure (some (.fixed 0))
|
||||
| some data => pure (some (.fixed data.size))
|
||||
|
||||
/--
|
||||
Selector that immediately resolves to the remaining chunk (or EOF).
|
||||
-/
|
||||
def recvSelector (full : Full) : Selector (Option Chunk) where
|
||||
tryFn := do
|
||||
let chunk ← full.state.atomically do
|
||||
takeChunk
|
||||
pure (some chunk)
|
||||
|
||||
registerFn waiter := do
|
||||
full.state.atomically do
|
||||
let lose := pure ()
|
||||
|
||||
let win promise := do
|
||||
let chunk ← takeChunk
|
||||
promise.resolve (.ok chunk)
|
||||
|
||||
waiter.race lose win
|
||||
|
||||
unregisterFn := pure ()
|
||||
|
||||
end Full
|
||||
|
||||
instance : Http.Body Full where
|
||||
recv := Full.recv
|
||||
close := Full.close
|
||||
isClosed := Full.isClosed
|
||||
recvSelector := Full.recvSelector
|
||||
getKnownSize := Full.getKnownSize
|
||||
setKnownSize _ _ := pure ()
|
||||
|
||||
instance : Coe Full Any := ⟨Any.ofBody⟩
|
||||
|
||||
instance : Coe (Response Full) (Response Any) where
|
||||
coe f := { f with }
|
||||
|
||||
instance : Coe (ContextAsync (Response Full)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
instance : Coe (Async (Response Full)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
end Body
|
||||
|
||||
namespace Request.Builder
|
||||
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a request body from raw bytes without setting any headers.
|
||||
Use `bytes` instead if you want `Content-Type: application/octet-stream` set automatically.
|
||||
-/
|
||||
def fromBytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) := do
|
||||
return builder.body (← Body.Full.ofByteArray content)
|
||||
|
||||
/--
|
||||
Builds a request with a binary body.
|
||||
Sets `Content-Type: application/octet-stream`.
|
||||
Use `fromBytes` instead if you need to set a different `Content-Type` or none at all.
|
||||
-/
|
||||
def bytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream")) content
|
||||
|
||||
/--
|
||||
Builds a request with a text body.
|
||||
Sets `Content-Type: text/plain; charset=utf-8`.
|
||||
-/
|
||||
def text (builder : Builder) (content : String) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a request with a JSON body.
|
||||
Sets `Content-Type: application/json`.
|
||||
-/
|
||||
def json (builder : Builder) (content : String) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/json")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a request with an HTML body.
|
||||
Sets `Content-Type: text/html; charset=utf-8`.
|
||||
-/
|
||||
def html (builder : Builder) (content : String) : Async (Request Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8")) content.toUTF8
|
||||
|
||||
end Request.Builder
|
||||
|
||||
namespace Response.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a response body from raw bytes without setting any headers.
|
||||
Use `bytes` instead if you want `Content-Type: application/octet-stream` set automatically.
|
||||
-/
|
||||
def fromBytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) := do
|
||||
return builder.body (← Body.Full.ofByteArray content)
|
||||
|
||||
/--
|
||||
Builds a response with a binary body.
|
||||
Sets `Content-Type: application/octet-stream`.
|
||||
Use `fromBytes` instead if you need to set a different `Content-Type` or none at all.
|
||||
-/
|
||||
def bytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream")) content
|
||||
|
||||
/--
|
||||
Builds a response with a text body.
|
||||
Sets `Content-Type: text/plain; charset=utf-8`.
|
||||
-/
|
||||
def text (builder : Builder) (content : String) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a response with a JSON body.
|
||||
Sets `Content-Type: application/json`.
|
||||
-/
|
||||
def json (builder : Builder) (content : String) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "application/json")) content.toUTF8
|
||||
|
||||
/--
|
||||
Builds a response with an HTML body.
|
||||
Sets `Content-Type: text/html; charset=utf-8`.
|
||||
-/
|
||||
def html (builder : Builder) (content : String) : Async (Response Body.Full) :=
|
||||
fromBytes (builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8")) content.toUTF8
|
||||
|
||||
end Response.Builder
|
||||
60
src/Std/Internal/Http/Data/Body/Length.lean
Normal file
60
src/Std/Internal/Http/Data/Body/Length.lean
Normal file
@@ -0,0 +1,60 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Init.Data.Repr
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Length
|
||||
|
||||
This module defines the `Length` type, that represents the Content-Length or Transfer-Encoding
|
||||
of an HTTP request or response.
|
||||
-/
|
||||
|
||||
namespace Std.Http.Body
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
/--
|
||||
Size of the body of a response or request.
|
||||
-/
|
||||
inductive Length
|
||||
/--
|
||||
Indicates that the HTTP message body uses **chunked transfer encoding**.
|
||||
-/
|
||||
| chunked
|
||||
|
||||
/--
|
||||
Indicates that the HTTP message body has a **fixed, known length**, as specified by the
|
||||
`Content-Length` header.
|
||||
-/
|
||||
| fixed (n : Nat)
|
||||
deriving Repr, BEq
|
||||
|
||||
namespace Length
|
||||
|
||||
/--
|
||||
Checks if the `Length` is chunked.
|
||||
-/
|
||||
@[inline]
|
||||
def isChunked : Length → Bool
|
||||
| .chunked => true
|
||||
| _ => false
|
||||
|
||||
/--
|
||||
Checks if the `Length` is a fixed size.
|
||||
-/
|
||||
@[inline]
|
||||
def isFixed : Length → Bool
|
||||
| .fixed _ => true
|
||||
| _ => false
|
||||
|
||||
end Length
|
||||
|
||||
end Std.Http.Body
|
||||
650
src/Std/Internal/Http/Data/Body/Stream.lean
Normal file
650
src/Std/Internal/Http/Data/Body/Stream.lean
Normal file
@@ -0,0 +1,650 @@
|
||||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Sync
|
||||
public import Std.Internal.Async
|
||||
public import Std.Internal.Http.Data.Request
|
||||
public import Std.Internal.Http.Data.Response
|
||||
public import Std.Internal.Http.Data.Chunk
|
||||
public import Std.Internal.Http.Data.Body.Basic
|
||||
public import Std.Internal.Http.Data.Body.Any
|
||||
public import Init.Data.ByteArray
|
||||
|
||||
public section
|
||||
|
||||
/-!
|
||||
# Body.Stream
|
||||
|
||||
This module defines a zero-buffer rendezvous body channel (`Body.Stream`) that supports
|
||||
both sending and receiving chunks.
|
||||
|
||||
There is no queue and no capacity. A send waits for a receiver and a receive waits for a sender.
|
||||
At most one blocked producer and one blocked consumer are supported.
|
||||
-/
|
||||
|
||||
namespace Std.Http
|
||||
|
||||
namespace Body
|
||||
|
||||
open Std Internal IO Async
|
||||
|
||||
set_option linter.all true
|
||||
|
||||
namespace Channel
|
||||
|
||||
open Internal.IO.Async in
|
||||
private inductive Consumer where
|
||||
| normal (promise : IO.Promise (Option Chunk))
|
||||
| select (finished : Waiter (Option Chunk))
|
||||
|
||||
private def Consumer.resolve (c : Consumer) (x : Option Chunk) : BaseIO Bool := do
|
||||
match c with
|
||||
| .normal promise =>
|
||||
promise.resolve x
|
||||
return true
|
||||
| .select waiter =>
|
||||
let lose := return false
|
||||
let win promise := do
|
||||
promise.resolve (.ok x)
|
||||
return true
|
||||
waiter.race lose win
|
||||
|
||||
private structure Producer where
|
||||
chunk : Chunk
|
||||
|
||||
/--
|
||||
Resolved with `true` when consumed by a receiver, `false` when the channel closes.
|
||||
-/
|
||||
done : IO.Promise Bool
|
||||
|
||||
open Internal.IO.Async in
|
||||
private def resolveInterestWaiter (waiter : Waiter Bool) (x : Bool) : BaseIO Bool := do
|
||||
let lose := return false
|
||||
let win promise := do
|
||||
promise.resolve (.ok x)
|
||||
return true
|
||||
waiter.race lose win
|
||||
|
||||
private structure State where
|
||||
/--
|
||||
A single blocked producer waiting for a receiver.
|
||||
-/
|
||||
pendingProducer : Option Producer
|
||||
|
||||
/--
|
||||
A single blocked consumer waiting for a producer.
|
||||
-/
|
||||
pendingConsumer : Option Consumer
|
||||
|
||||
/--
|
||||
A waiter for `Stream.interestSelector`.
|
||||
-/
|
||||
interestWaiter : Option (Internal.IO.Async.Waiter Bool)
|
||||
|
||||
/--
|
||||
Whether the channel is closed.
|
||||
-/
|
||||
closed : Bool
|
||||
|
||||
/--
|
||||
Known size of the stream if available.
|
||||
-/
|
||||
knownSize : Option Body.Length
|
||||
|
||||
/--
|
||||
Buffered partial chunk data accumulated from `Stream.send ... (incomplete := true)`.
|
||||
These partial pieces are collapsed and emitted as a single chunk on the next complete send.
|
||||
-/
|
||||
pendingIncompleteChunk : Option Chunk := none
|
||||
deriving Nonempty
|
||||
|
||||
end Channel
|
||||
|
||||
/--
|
||||
A zero-buffer rendezvous body channel that supports both sending and receiving chunks.
|
||||
-/
|
||||
structure Stream where
|
||||
private mk ::
|
||||
private state : Mutex Channel.State
|
||||
deriving Nonempty, TypeName
|
||||
|
||||
/--
|
||||
Creates a rendezvous body stream.
|
||||
-/
|
||||
def mkStream : Async Stream := do
|
||||
let state ← Mutex.new {
|
||||
pendingProducer := none
|
||||
pendingConsumer := none
|
||||
interestWaiter := none
|
||||
closed := false
|
||||
knownSize := none
|
||||
}
|
||||
return { state }
|
||||
|
||||
namespace Channel
|
||||
|
||||
private def decreaseKnownSize (knownSize : Option Body.Length) (chunk : Chunk) : Option Body.Length :=
|
||||
match knownSize with
|
||||
| some (.fixed res) => some (Body.Length.fixed (res - chunk.data.size))
|
||||
| _ => knownSize
|
||||
|
||||
private def pruneFinishedWaiters [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
AtomicT State m Unit := do
|
||||
let st ← get
|
||||
|
||||
let pendingConsumer ←
|
||||
match st.pendingConsumer with
|
||||
| some (.select waiter) =>
|
||||
if ← waiter.checkFinished then
|
||||
pure none
|
||||
else
|
||||
pure st.pendingConsumer
|
||||
| _ =>
|
||||
pure st.pendingConsumer
|
||||
|
||||
let interestWaiter ←
|
||||
match st.interestWaiter with
|
||||
| some waiter =>
|
||||
if ← waiter.checkFinished then
|
||||
pure none
|
||||
else
|
||||
pure st.interestWaiter
|
||||
| none =>
|
||||
pure none
|
||||
|
||||
set { st with pendingConsumer, interestWaiter }
|
||||
|
||||
private def signalInterest [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
|
||||
AtomicT State m Unit := do
|
||||
let st ← get
|
||||
if let some waiter := st.interestWaiter then
|
||||
discard <| resolveInterestWaiter waiter true
|
||||
set { st with interestWaiter := none }
|
||||
|
||||
private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
AtomicT State m Bool := do
|
||||
let st ← get
|
||||
return st.pendingProducer.isSome || st.closed
|
||||
|
||||
private def hasInterest' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
AtomicT State m Bool := do
|
||||
let st ← get
|
||||
return st.pendingConsumer.isSome
|
||||
|
||||
private def tryRecv' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
|
||||
AtomicT State m (Option Chunk) := do
|
||||
let st ← get
|
||||
if let some producer := st.pendingProducer then
|
||||
set {
|
||||
st with
|
||||
pendingProducer := none
|
||||
knownSize := decreaseKnownSize st.knownSize producer.chunk
|
||||
}
|
||||
discard <| producer.done.resolve true
|
||||
return some producer.chunk
|
||||
else
|
||||
return none
|
||||
|
||||
private def close' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] :
|
||||
AtomicT State m Unit := do
|
||||
let st ← get
|
||||
if st.closed then
|
||||
return ()
|
||||
|
||||
if let some consumer := st.pendingConsumer then
|
||||
discard <| consumer.resolve none
|
||||
|
||||
if let some waiter := st.interestWaiter then
|
||||
discard <| resolveInterestWaiter waiter false
|
||||
|
||||
if let some producer := st.pendingProducer then
|
||||
discard <| producer.done.resolve false
|
||||
|
||||
set {
|
||||
st with
|
||||
pendingProducer := none
|
||||
pendingConsumer := none
|
||||
interestWaiter := none
|
||||
pendingIncompleteChunk := none
|
||||
closed := true
|
||||
}
|
||||
|
||||
end Channel
|
||||
|
||||
namespace Stream
|
||||
|
||||
/--
|
||||
Attempts to receive a chunk from the channel without blocking.
|
||||
Returns `some chunk` only when a producer is already waiting.
|
||||
-/
|
||||
def tryRecv (stream : Stream) : Async (Option Chunk) :=
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
Channel.tryRecv'
|
||||
|
||||
private def recv' (stream : Stream) : BaseIO (AsyncTask (Option Chunk)) := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
|
||||
if let some chunk ← Channel.tryRecv' then
|
||||
return AsyncTask.pure (some chunk)
|
||||
|
||||
let st ← get
|
||||
if st.closed then
|
||||
return AsyncTask.pure none
|
||||
|
||||
if st.pendingConsumer.isSome then
|
||||
return Task.pure (.error (IO.Error.userError "only one blocked consumer is allowed"))
|
||||
|
||||
let promise ← IO.Promise.new
|
||||
set { st with pendingConsumer := some (.normal promise) }
|
||||
Channel.signalInterest
|
||||
return promise.result?.map (sync := true) fun
|
||||
| none => .error (IO.Error.userError "the promise linked to the consumer was dropped")
|
||||
| some res => .ok res
|
||||
|
||||
/--
|
||||
Receives a chunk from the channel. Blocks until a producer sends one.
|
||||
Returns `none` if the channel is closed and no producer is waiting.
|
||||
-/
|
||||
def recv (stream : Stream) : Async (Option Chunk) := do
|
||||
Async.ofAsyncTask (← recv' stream)
|
||||
|
||||
/--
|
||||
Closes the channel.
|
||||
-/
|
||||
def close (stream : Stream) : Async Unit :=
|
||||
stream.state.atomically do
|
||||
Channel.close'
|
||||
|
||||
/--
|
||||
Checks whether the channel is closed.
|
||||
-/
|
||||
@[always_inline, inline]
|
||||
def isClosed (stream : Stream) : Async Bool :=
|
||||
stream.state.atomically do
|
||||
return (← get).closed
|
||||
|
||||
/--
|
||||
Gets the known size if available.
|
||||
-/
|
||||
@[always_inline, inline]
|
||||
def getKnownSize (stream : Stream) : Async (Option Body.Length) :=
|
||||
stream.state.atomically do
|
||||
return (← get).knownSize
|
||||
|
||||
/--
|
||||
Sets known size metadata.
|
||||
-/
|
||||
@[always_inline, inline]
|
||||
def setKnownSize (stream : Stream) (size : Option Body.Length) : Async Unit :=
|
||||
stream.state.atomically do
|
||||
modify fun st => { st with knownSize := size }
|
||||
|
||||
open Internal.IO.Async in
|
||||
|
||||
/--
|
||||
Creates a selector that resolves when a producer is waiting (or the channel closes).
|
||||
-/
|
||||
def recvSelector (stream : Stream) : Selector (Option Chunk) where
|
||||
tryFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
if ← Channel.recvReady' then
|
||||
return some (← Channel.tryRecv')
|
||||
else
|
||||
return none
|
||||
|
||||
registerFn waiter := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
if ← Channel.recvReady' then
|
||||
let lose := return ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok (← Channel.tryRecv'))
|
||||
waiter.race lose win
|
||||
else
|
||||
let st ← get
|
||||
if st.pendingConsumer.isSome then
|
||||
throw (.userError "only one blocked consumer is allowed")
|
||||
|
||||
set { st with pendingConsumer := some (.select waiter) }
|
||||
Channel.signalInterest
|
||||
|
||||
unregisterFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
|
||||
/--
|
||||
Iterates over chunks until the channel closes.
|
||||
-/
|
||||
@[inline]
|
||||
protected partial def forIn
|
||||
{β : Type} (stream : Stream) (acc : β)
|
||||
(step : Chunk → β → Async (ForInStep β)) : Async β := do
|
||||
|
||||
let rec @[specialize] loop (stream : Stream) (acc : β) : Async β := do
|
||||
if let some chunk ← stream.recv then
|
||||
match ← step chunk acc with
|
||||
| .done res => return res
|
||||
| .yield res => loop stream res
|
||||
else
|
||||
return acc
|
||||
|
||||
loop stream acc
|
||||
|
||||
/--
|
||||
Context-aware iteration over chunks until the channel closes.
|
||||
-/
|
||||
@[inline]
|
||||
protected partial def forIn'
|
||||
{β : Type} (stream : Stream) (acc : β)
|
||||
(step : Chunk → β → ContextAsync (ForInStep β)) : ContextAsync β := do
|
||||
|
||||
let rec @[specialize] loop (stream : Stream) (acc : β) : ContextAsync β := do
|
||||
let data ← Selectable.one #[
|
||||
.case stream.recvSelector pure,
|
||||
.case (← ContextAsync.doneSelector) (fun _ => pure none),
|
||||
]
|
||||
|
||||
if let some chunk := data then
|
||||
match ← step chunk acc with
|
||||
| .done res => return res
|
||||
| .yield res => loop stream res
|
||||
else
|
||||
return acc
|
||||
|
||||
loop stream acc
|
||||
|
||||
/--
|
||||
Abstracts over how the next chunk is received, allowing `readAll` to work in both `Async`
|
||||
(no cancellation) and `ContextAsync` (races with cancellation via `doneSelector`).
|
||||
-/
|
||||
class NextChunk (m : Type → Type) where
|
||||
/--
|
||||
Receives the next chunk, stopping at EOF or (in `ContextAsync`) when the context is cancelled.
|
||||
-/
|
||||
nextChunk : Stream → m (Option Chunk)
|
||||
|
||||
instance : NextChunk Async where
|
||||
nextChunk := Stream.recv
|
||||
|
||||
instance : NextChunk ContextAsync where
|
||||
nextChunk stream := do
|
||||
Selectable.one #[
|
||||
.case stream.recvSelector pure,
|
||||
.case (← ContextAsync.doneSelector) (fun _ => pure none),
|
||||
]
|
||||
|
||||
/--
|
||||
Reads all remaining chunks and decodes them into `α`.
|
||||
|
||||
Works in both `Async` (reads until EOF, no cancellation) and `ContextAsync` (also stops if the
|
||||
context is cancelled).
|
||||
-/
|
||||
partial def readAll
|
||||
[FromByteArray α]
|
||||
[Monad m] [MonadExceptOf IO.Error m] [NextChunk m]
|
||||
(stream : Stream)
|
||||
(maximumSize : Option UInt64 := none) :
|
||||
m α := do
|
||||
|
||||
let rec loop (result : ByteArray) : m ByteArray := do
|
||||
match ← NextChunk.nextChunk stream with
|
||||
| none => return result
|
||||
| some chunk =>
|
||||
let result := result ++ chunk.data
|
||||
if let some max := maximumSize then
|
||||
if result.size.toUInt64 > max then
|
||||
throw (.userError s!"body exceeded maximum size of {max} bytes")
|
||||
loop result
|
||||
|
||||
let result ← loop ByteArray.empty
|
||||
|
||||
match FromByteArray.fromByteArray result with
|
||||
| .ok a => return a
|
||||
| .error msg => throw (.userError msg)
|
||||
|
||||
private def collapseForSend
|
||||
(stream : Stream)
|
||||
(chunk : Chunk)
|
||||
(incomplete : Bool) : BaseIO (Except IO.Error (Option Chunk)) := do
|
||||
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
|
||||
if st.closed then
|
||||
return .error (.userError "channel closed")
|
||||
|
||||
let merged := match st.pendingIncompleteChunk with
|
||||
| some pending =>
|
||||
{
|
||||
data := pending.data ++ chunk.data
|
||||
extensions := if pending.extensions.isEmpty then chunk.extensions else pending.extensions
|
||||
}
|
||||
| none => chunk
|
||||
|
||||
if incomplete then
|
||||
set { st with pendingIncompleteChunk := some merged }
|
||||
return .ok none
|
||||
else
|
||||
set { st with pendingIncompleteChunk := none }
|
||||
return .ok (some merged)
|
||||
|
||||
/--
|
||||
Sends a chunk, retrying if a select-mode consumer races and loses. If no consumer is ready,
|
||||
installs the chunk as a pending producer and awaits acknowledgement from the receiver.
|
||||
-/
|
||||
private partial def send' (stream : Stream) (chunk : Chunk) : Async Unit := do
|
||||
let done ← IO.Promise.new
|
||||
let result : Except IO.Error (Option Bool) ← stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
|
||||
if st.closed then
|
||||
return .error (IO.Error.userError "channel closed")
|
||||
|
||||
if let some consumer := st.pendingConsumer then
|
||||
let success ← consumer.resolve (some chunk)
|
||||
|
||||
if success then
|
||||
set {
|
||||
st with
|
||||
pendingConsumer := none
|
||||
knownSize := Channel.decreaseKnownSize st.knownSize chunk
|
||||
}
|
||||
return .ok (some true)
|
||||
else
|
||||
set { st with pendingConsumer := none }
|
||||
return .ok (some false)
|
||||
else if st.pendingProducer.isSome then
|
||||
return .error (IO.Error.userError "only one blocked producer is allowed")
|
||||
else
|
||||
set { st with pendingProducer := some { chunk, done } }
|
||||
return .ok none
|
||||
|
||||
match result with
|
||||
| .error err =>
|
||||
throw err
|
||||
| .ok (some true) =>
|
||||
return ()
|
||||
| .ok (some false) =>
|
||||
-- The select-mode consumer raced and lost; recurse to allocate a fresh `done` promise.
|
||||
send' stream chunk
|
||||
| .ok none =>
|
||||
match ← await done.result? with
|
||||
| some true => return ()
|
||||
| _ => throw (IO.Error.userError "channel closed")
|
||||
|
||||
/--
|
||||
Sends a chunk.
|
||||
|
||||
If `incomplete := true`, the chunk is buffered and collapsed with subsequent chunks, and is not
|
||||
delivered to the receiver yet.
|
||||
If `incomplete := false`, any buffered incomplete pieces are collapsed with this chunk and the
|
||||
single merged chunk is sent.
|
||||
-/
|
||||
def send (stream : Stream) (chunk : Chunk) (incomplete : Bool := false) : Async Unit := do
|
||||
match (← collapseForSend stream chunk incomplete) with
|
||||
| .error err => throw err
|
||||
| .ok none => pure ()
|
||||
| .ok (some toSend) =>
|
||||
if toSend.data.isEmpty ∧ toSend.extensions.isEmpty then
|
||||
return ()
|
||||
|
||||
send' stream toSend
|
||||
|
||||
/--
|
||||
Returns `true` when a consumer is currently blocked waiting for data.
|
||||
-/
|
||||
def hasInterest (stream : Stream) : Async Bool :=
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
Channel.hasInterest'
|
||||
|
||||
open Internal.IO.Async in
|
||||
/--
|
||||
Creates a selector that resolves when consumer interest is present.
|
||||
Returns `true` when a consumer is waiting, `false` when the channel closes first.
|
||||
-/
|
||||
def interestSelector (stream : Stream) : Selector Bool where
|
||||
tryFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
if st.pendingConsumer.isSome then
|
||||
return some true
|
||||
else if st.closed then
|
||||
return some false
|
||||
else
|
||||
return none
|
||||
|
||||
registerFn waiter := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
let st ← get
|
||||
|
||||
if st.pendingConsumer.isSome then
|
||||
let lose := return ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok true)
|
||||
waiter.race lose win
|
||||
else if st.closed then
|
||||
let lose := return ()
|
||||
let win promise := do
|
||||
promise.resolve (.ok false)
|
||||
waiter.race lose win
|
||||
else if st.interestWaiter.isSome then
|
||||
throw (.userError "only one blocked interest selector is allowed")
|
||||
else
|
||||
set { st with interestWaiter := some waiter }
|
||||
|
||||
unregisterFn := do
|
||||
stream.state.atomically do
|
||||
Channel.pruneFinishedWaiters
|
||||
|
||||
end Stream
|
||||
|
||||
/--
|
||||
Creates a body from a producer function.
|
||||
Returns the stream immediately and runs `gen` in a detached task.
|
||||
The channel is always closed when `gen` returns or throws.
|
||||
Errors from `gen` are not rethrown here; consumers observe end-of-stream via `recv = none`.
|
||||
-/
|
||||
def stream (gen : Stream → Async Unit) : Async Stream := do
|
||||
let s ← mkStream
|
||||
background <| do
|
||||
try
|
||||
gen s
|
||||
finally
|
||||
s.close
|
||||
return s
|
||||
|
||||
/--
|
||||
Creates a body from a fixed byte array.
|
||||
-/
|
||||
def fromBytes (content : ByteArray) : Async Stream := do
|
||||
stream fun s => do
|
||||
s.setKnownSize (some (.fixed content.size))
|
||||
if content.size > 0 then
|
||||
s.send (Chunk.ofByteArray content)
|
||||
|
||||
/--
|
||||
Creates an empty `Stream` body channel (already closed, no data).
|
||||
|
||||
Prefer `Body.Empty` when you need a concrete zero-cost type. Use this when the calling
|
||||
context requires a `Stream` specifically.
|
||||
-/
|
||||
def empty : Async Stream := do
|
||||
let s ← mkStream
|
||||
s.setKnownSize (some (.fixed 0))
|
||||
s.close
|
||||
return s
|
||||
|
||||
instance : ForIn Async Stream Chunk where
|
||||
forIn := Stream.forIn
|
||||
|
||||
instance : ForIn ContextAsync Stream Chunk where
|
||||
forIn := Stream.forIn'
|
||||
|
||||
instance : Http.Body Stream where
|
||||
recv := Stream.recv
|
||||
close := Stream.close
|
||||
isClosed := Stream.isClosed
|
||||
recvSelector := Stream.recvSelector
|
||||
getKnownSize := Stream.getKnownSize
|
||||
setKnownSize := Stream.setKnownSize
|
||||
|
||||
instance : Coe Stream Any := ⟨Any.ofBody⟩
|
||||
|
||||
instance : Coe (Response Stream) (Response Any) where
|
||||
coe f := { f with }
|
||||
|
||||
instance : Coe (ContextAsync (Response Stream)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
instance : Coe (Async (Response Stream)) (ContextAsync (Response Any)) where
|
||||
coe action := do
|
||||
let response ← action
|
||||
pure (response : Response Any)
|
||||
|
||||
end Body
|
||||
|
||||
namespace Request.Builder
|
||||
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a request with a streaming body generator.
|
||||
-/
|
||||
def stream
|
||||
(builder : Builder)
|
||||
(gen : Body.Stream → Async Unit) :
|
||||
Async (Request Body.Stream) := do
|
||||
let s ← Body.stream gen
|
||||
return Request.Builder.body builder s
|
||||
|
||||
end Request.Builder
|
||||
|
||||
namespace Response.Builder
|
||||
open Internal.IO.Async
|
||||
|
||||
/--
|
||||
Builds a response with a streaming body generator.
|
||||
-/
|
||||
def stream
|
||||
(builder : Builder)
|
||||
(gen : Body.Stream → Async Unit) :
|
||||
Async (Response Body.Stream) := do
|
||||
let s ← Body.stream gen
|
||||
return Response.Builder.body builder s
|
||||
|
||||
end Response.Builder
|
||||
@@ -124,12 +124,6 @@ def new : Builder := { }
|
||||
|
||||
namespace Builder
|
||||
|
||||
/--
|
||||
Creates a new HTTP request builder with the default head
|
||||
(method: GET, version: HTTP/1.1, target: `*`).
|
||||
-/
|
||||
def empty : Builder := { }
|
||||
|
||||
/--
|
||||
Sets the HTTP method for the request being built.
|
||||
-/
|
||||
|
||||
@@ -111,7 +111,7 @@ namespace Builder
|
||||
/--
|
||||
Creates a new HTTP Response builder with default head (status: 200 OK, version: HTTP/1.1).
|
||||
-/
|
||||
def empty : Builder := { }
|
||||
def new : Builder := { }
|
||||
|
||||
/--
|
||||
Sets the HTTP status code for the response being built.
|
||||
@@ -173,66 +173,66 @@ end Builder
|
||||
Creates a new HTTP Response builder with the 200 status code.
|
||||
-/
|
||||
def ok : Builder :=
|
||||
.empty |>.status .ok
|
||||
.new |>.status .ok
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the provided status.
|
||||
-/
|
||||
def withStatus (status : Status) : Builder :=
|
||||
.empty |>.status status
|
||||
.new |>.status status
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 404 status code.
|
||||
-/
|
||||
def notFound : Builder :=
|
||||
.empty |>.status .notFound
|
||||
.new |>.status .notFound
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 500 status code.
|
||||
-/
|
||||
def internalServerError : Builder :=
|
||||
.empty |>.status .internalServerError
|
||||
.new |>.status .internalServerError
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 400 status code.
|
||||
-/
|
||||
def badRequest : Builder :=
|
||||
.empty |>.status .badRequest
|
||||
.new |>.status .badRequest
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 201 status code.
|
||||
-/
|
||||
def created : Builder :=
|
||||
.empty |>.status .created
|
||||
.new |>.status .created
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 202 status code.
|
||||
-/
|
||||
def accepted : Builder :=
|
||||
.empty |>.status .accepted
|
||||
.new |>.status .accepted
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 401 status code.
|
||||
-/
|
||||
def unauthorized : Builder :=
|
||||
.empty |>.status .unauthorized
|
||||
.new |>.status .unauthorized
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 403 status code.
|
||||
-/
|
||||
def forbidden : Builder :=
|
||||
.empty |>.status .forbidden
|
||||
.new |>.status .forbidden
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 409 status code.
|
||||
-/
|
||||
def conflict : Builder :=
|
||||
.empty |>.status .conflict
|
||||
.new |>.status .conflict
|
||||
|
||||
/--
|
||||
Creates a new HTTP Response builder with the 503 status code.
|
||||
-/
|
||||
def serviceUnavailable : Builder :=
|
||||
.empty |>.status .serviceUnavailable
|
||||
.new |>.status .serviceUnavailable
|
||||
|
||||
end Response
|
||||
|
||||
@@ -94,4 +94,3 @@ def parseOrRoot (s : String) : Std.Http.URI.Path :=
|
||||
parse? s |>.getD { segments := #[], absolute := true }
|
||||
|
||||
end Std.Http.URI.Path
|
||||
|
||||
|
||||
739
tests/elab/async_http_body.lean
Normal file
739
tests/elab/async_http_body.lean
Normal file
@@ -0,0 +1,739 @@
|
||||
import Std.Internal.Http.Data.Body
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std.Http
|
||||
open Std.Http.Body
|
||||
|
||||
/-! ## Stream tests -/
|
||||
|
||||
-- Test send and recv on stream
|
||||
|
||||
def channelSendRecv : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let chunk := Chunk.ofByteArray "hello".toUTF8
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send chunk
|
||||
let result ← stream.recv
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelSendRecv.block
|
||||
|
||||
|
||||
-- Test tryRecv on empty stream returns none
|
||||
|
||||
def channelTryRecvEmpty : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let result ← stream.tryRecv
|
||||
assert! result.isNone
|
||||
|
||||
#eval channelTryRecvEmpty.block
|
||||
|
||||
-- Test tryRecv consumes a waiting producer
|
||||
|
||||
def channelTryRecvWithPendingSend : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "data".toUTF8)
|
||||
let mut result := none
|
||||
let mut fuel := 100
|
||||
while result.isNone && fuel > 0 do
|
||||
result ← stream.tryRecv
|
||||
if result.isNone then
|
||||
let _ ← Selectable.one #[
|
||||
.case (← Selector.sleep 1) pure
|
||||
]
|
||||
fuel := fuel - 1
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "data".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelTryRecvWithPendingSend.block
|
||||
|
||||
-- Test close sets closed flag
|
||||
|
||||
def channelClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
assert! !(← stream.isClosed)
|
||||
stream.close
|
||||
assert! (← stream.isClosed)
|
||||
|
||||
#eval channelClose.block
|
||||
|
||||
-- Test recv on closed stream returns none
|
||||
|
||||
def channelRecvAfterClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.close
|
||||
let result ← stream.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval channelRecvAfterClose.block
|
||||
|
||||
-- Test for-in iteration collects chunks until close
|
||||
|
||||
def channelForIn : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let producer ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "a".toUTF8)
|
||||
stream.send (Chunk.ofByteArray "b".toUTF8)
|
||||
stream.close
|
||||
|
||||
let mut acc : ByteArray := .empty
|
||||
for chunk in stream do
|
||||
acc := acc ++ chunk.data
|
||||
|
||||
assert! acc == "ab".toUTF8
|
||||
await producer
|
||||
|
||||
#eval channelForIn.block
|
||||
|
||||
-- Test chunk extensions are preserved
|
||||
|
||||
def channelExtensions : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let chunk := { data := "hello".toUTF8, extensions := #[(.mk "key", some (Chunk.ExtensionValue.ofString! "value"))] : Chunk }
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send chunk
|
||||
let result ← stream.recv
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.extensions.size == 1
|
||||
assert! result.get!.extensions[0]! == (Chunk.ExtensionName.mk "key", some <| .ofString! "value")
|
||||
await sendTask
|
||||
|
||||
#eval channelExtensions.block
|
||||
|
||||
-- Test known size metadata
|
||||
|
||||
def channelKnownSize : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.setKnownSize (some (.fixed 100))
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 100)
|
||||
|
||||
#eval channelKnownSize.block
|
||||
|
||||
-- Test known size decreases when a chunk is consumed
|
||||
|
||||
def channelKnownSizeDecreases : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.setKnownSize (some (.fixed 5))
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "hello".toUTF8)
|
||||
let _ ← stream.recv
|
||||
await sendTask
|
||||
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 0)
|
||||
|
||||
#eval channelKnownSizeDecreases.block
|
||||
|
||||
-- Test only one blocked producer is allowed
|
||||
|
||||
def channelSingleProducerRule : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
let send1 ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "one".toUTF8)
|
||||
|
||||
-- Yield so `send1` can occupy the single pending-producer slot.
|
||||
let _ ← Selectable.one #[
|
||||
.case (← Selector.sleep 5) pure
|
||||
]
|
||||
|
||||
let send2Failed ←
|
||||
try
|
||||
stream.send (Chunk.ofByteArray "two".toUTF8)
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
assert! send2Failed
|
||||
|
||||
let first ← stream.recv
|
||||
assert! first.isSome
|
||||
assert! first.get!.data == "one".toUTF8
|
||||
|
||||
await send1
|
||||
|
||||
#eval channelSingleProducerRule.block
|
||||
|
||||
-- Test only one blocked consumer is allowed
|
||||
|
||||
def channelSingleConsumerRule : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let recv1 ← async (t := AsyncTask) <| stream.recv
|
||||
|
||||
let hasInterest ← Selectable.one #[
|
||||
.case stream.interestSelector pure
|
||||
]
|
||||
assert! hasInterest
|
||||
|
||||
let recv2Failed ←
|
||||
try
|
||||
let _ ← stream.recv
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
|
||||
assert! recv2Failed
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "ok".toUTF8)
|
||||
let r1 ← await recv1
|
||||
|
||||
assert! r1.isSome
|
||||
assert! r1.get!.data == "ok".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelSingleConsumerRule.block
|
||||
|
||||
-- Test hasInterest reflects blocked receiver state
|
||||
|
||||
def channelHasInterest : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
assert! !(← stream.hasInterest)
|
||||
|
||||
let recvTask ← async (t := AsyncTask) <| stream.recv
|
||||
|
||||
let hasInterest ← Selectable.one #[
|
||||
.case stream.interestSelector pure
|
||||
]
|
||||
assert! hasInterest
|
||||
assert! (← stream.hasInterest)
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| stream.send (Chunk.ofByteArray "x".toUTF8)
|
||||
let _ ← await recvTask
|
||||
await sendTask
|
||||
|
||||
assert! !(← stream.hasInterest)
|
||||
|
||||
#eval channelHasInterest.block
|
||||
|
||||
-- Test interestSelector resolves false when stream closes first
|
||||
|
||||
def channelInterestSelectorClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let waitInterest ← async (t := AsyncTask) <|
|
||||
Selectable.one #[
|
||||
.case stream.interestSelector pure
|
||||
]
|
||||
|
||||
stream.close
|
||||
let interested ← await waitInterest
|
||||
assert! interested == false
|
||||
|
||||
#eval channelInterestSelectorClose.block
|
||||
|
||||
-- Test incomplete sends are buffered and merged into one chunk on the final send
|
||||
|
||||
def channelIncompleteChunks : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "hel".toUTF8) (incomplete := true)
|
||||
stream.send (Chunk.ofByteArray "lo".toUTF8)
|
||||
|
||||
let result ← stream.recv
|
||||
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelIncompleteChunks.block
|
||||
|
||||
-- Test sending to a closed stream raises an error
|
||||
|
||||
def channelSendAfterClose : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.close
|
||||
|
||||
let failed ←
|
||||
try
|
||||
stream.send (Chunk.ofByteArray "test".toUTF8)
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
assert! failed
|
||||
|
||||
#eval channelSendAfterClose.block
|
||||
|
||||
-- Test Body.stream runs producer and returns the stream handle
|
||||
|
||||
def channelStreamHelper : Async Unit := do
|
||||
let stream ← Body.stream fun s => do
|
||||
s.send (Chunk.ofByteArray "hello".toUTF8)
|
||||
|
||||
let result ← stream.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
|
||||
let eof ← stream.recv
|
||||
assert! eof.isNone
|
||||
|
||||
#eval channelStreamHelper.block
|
||||
|
||||
-- Test Body.fromBytes creates a Stream with correct known-size metadata
|
||||
|
||||
def channelFromBytesHelper : Async Unit := do
|
||||
let stream ← Body.fromBytes "world".toUTF8
|
||||
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 5)
|
||||
|
||||
let result ← stream.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "world".toUTF8
|
||||
|
||||
#eval channelFromBytesHelper.block
|
||||
|
||||
-- Test Body.empty creates an already-closed Stream
|
||||
|
||||
def channelEmptyHelper : Async Unit := do
|
||||
let stream ← Body.empty
|
||||
assert! (← stream.isClosed)
|
||||
|
||||
let result ← stream.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval channelEmptyHelper.block
|
||||
|
||||
-- Test Stream.readAll concatenates all chunks
|
||||
|
||||
def channelReadAll : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "foo".toUTF8)
|
||||
stream.send (Chunk.ofByteArray "bar".toUTF8)
|
||||
stream.close
|
||||
|
||||
let result : ByteArray ← stream.readAll
|
||||
assert! result == "foobar".toUTF8
|
||||
await sendTask
|
||||
|
||||
#eval channelReadAll.block
|
||||
|
||||
-- Test Stream.readAll enforces a maximum size limit
|
||||
|
||||
def channelReadAllMaxSize : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
|
||||
let sendTask ← async (t := AsyncTask) <| do
|
||||
stream.send (Chunk.ofByteArray "abcdefgh".toUTF8)
|
||||
stream.close
|
||||
|
||||
let failed ←
|
||||
try
|
||||
let _ : ByteArray ← stream.readAll (maximumSize := some 4)
|
||||
pure false
|
||||
catch _ =>
|
||||
pure true
|
||||
assert! failed
|
||||
await sendTask
|
||||
|
||||
#eval channelReadAllMaxSize.block
|
||||
|
||||
-- Test Stream.getKnownSize reflects the value set via setKnownSize
|
||||
|
||||
def channelKnownSizeRoundtrip : Async Unit := do
|
||||
let stream ← Body.mkStream
|
||||
stream.setKnownSize (some (.fixed 42))
|
||||
|
||||
let size ← stream.getKnownSize
|
||||
assert! size == some (.fixed 42)
|
||||
|
||||
#eval channelKnownSizeRoundtrip.block
|
||||
|
||||
/-! ## Full tests -/
|
||||
|
||||
-- Test Full.recv returns content once then EOF
|
||||
|
||||
def fullRecvConsumesOnce : Async Unit := do
|
||||
let full ← Body.Full.ofString "hello"
|
||||
let first ← full.recv
|
||||
let second ← full.recv
|
||||
|
||||
assert! first.isSome
|
||||
assert! first.get!.data == "hello".toUTF8
|
||||
assert! second.isNone
|
||||
|
||||
#eval fullRecvConsumesOnce.block
|
||||
|
||||
-- Test Full known-size metadata tracks consumption
|
||||
|
||||
def fullKnownSizeLifecycle : Async Unit := do
|
||||
let data := ByteArray.mk #[0x01, 0x02, 0x03, 0x04]
|
||||
let full ← Body.Full.ofByteArray data
|
||||
|
||||
assert! (← full.getKnownSize) == some (.fixed 4)
|
||||
let chunk ← full.recv
|
||||
assert! chunk.isSome
|
||||
assert! chunk.get!.data == data
|
||||
assert! (← full.getKnownSize) == some (.fixed 0)
|
||||
|
||||
#eval fullKnownSizeLifecycle.block
|
||||
|
||||
-- Test Full.close discards remaining content
|
||||
|
||||
def fullClose : Async Unit := do
|
||||
let full ← Body.Full.ofString "bye"
|
||||
assert! !(← full.isClosed)
|
||||
full.close
|
||||
assert! (← full.isClosed)
|
||||
assert! (← full.recv).isNone
|
||||
|
||||
#eval fullClose.block
|
||||
|
||||
-- Test Full from an empty ByteArray returns none on the first recv
|
||||
|
||||
def fullEmptyBytes : Async Unit := do
|
||||
let full ← Body.Full.ofByteArray ByteArray.empty
|
||||
let result ← full.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval fullEmptyBytes.block
|
||||
|
||||
-- Test Full.recvSelector resolves immediately with the stored chunk
|
||||
|
||||
def fullRecvSelectorResolves : Async Unit := do
|
||||
let full ← Body.Full.ofString "world"
|
||||
let result ← Selectable.one #[
|
||||
.case full.recvSelector pure
|
||||
]
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "world".toUTF8
|
||||
|
||||
#eval fullRecvSelectorResolves.block
|
||||
|
||||
-- Test Full.getKnownSize returns 0 after close
|
||||
|
||||
def fullKnownSizeAfterClose : Async Unit := do
|
||||
let full ← Body.Full.ofString "data"
|
||||
assert! (← full.getKnownSize) == some (.fixed 4)
|
||||
full.close
|
||||
assert! (← full.getKnownSize) == some (.fixed 0)
|
||||
|
||||
#eval fullKnownSizeAfterClose.block
|
||||
|
||||
-- Test Full.tryRecv succeeds once and returns none thereafter
|
||||
|
||||
def fullTryRecvIdempotent : Async Unit := do
|
||||
let full ← Body.Full.ofString "once"
|
||||
let first ← full.recv
|
||||
let second ← full.recv
|
||||
assert! first.isSome
|
||||
assert! first.get!.data == "once".toUTF8
|
||||
assert! second.isNone
|
||||
|
||||
#eval fullTryRecvIdempotent.block
|
||||
|
||||
/-! ## Empty tests -/
|
||||
|
||||
-- Test Empty.recv always returns none
|
||||
|
||||
def emptyBodyRecv : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
let result ← body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval emptyBodyRecv.block
|
||||
|
||||
-- Test Empty.isClosed is always true
|
||||
|
||||
def emptyBodyIsClosed : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
assert! (← body.isClosed)
|
||||
|
||||
#eval emptyBodyIsClosed.block
|
||||
|
||||
-- Test Empty.close is a no-op: still closed and recv still returns none
|
||||
|
||||
def emptyBodyClose : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
body.close
|
||||
assert! (← body.isClosed)
|
||||
let result ← body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval emptyBodyClose.block
|
||||
|
||||
-- Test Empty.recvSelector resolves immediately with none
|
||||
|
||||
def emptyBodyRecvSelector : Async Unit := do
|
||||
let body : Body.Empty := {}
|
||||
let result ← Selectable.one #[
|
||||
.case body.recvSelector pure
|
||||
]
|
||||
assert! result.isNone
|
||||
|
||||
#eval emptyBodyRecvSelector.block
|
||||
|
||||
/-! ## Any tests -/
|
||||
|
||||
-- Test Any wrapping a Full body forwards recv correctly
|
||||
|
||||
def anyFromFull : Async Unit := do
|
||||
let full ← Body.Full.ofString "hello"
|
||||
let any : Body.Any := full
|
||||
let result ← any.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "hello".toUTF8
|
||||
|
||||
#eval anyFromFull.block
|
||||
|
||||
-- Test Any wrapping an Empty body returns none and reports closed
|
||||
|
||||
def anyFromEmpty : Async Unit := do
|
||||
let empty : Body.Empty := {}
|
||||
let any : Body.Any := empty
|
||||
let result ← any.recv
|
||||
assert! result.isNone
|
||||
assert! (← any.isClosed)
|
||||
|
||||
#eval anyFromEmpty.block
|
||||
|
||||
-- Test Any.close closes the underlying body
|
||||
|
||||
def anyCloseForwards : Async Unit := do
|
||||
let full ← Body.Full.ofString "test"
|
||||
let any : Body.Any := full
|
||||
any.close
|
||||
assert! (← any.isClosed)
|
||||
let result ← any.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval anyCloseForwards.block
|
||||
|
||||
-- Test Any.recvSelector resolves immediately for a Full body
|
||||
|
||||
def anyRecvSelectorFromFull : Async Unit := do
|
||||
let full ← Body.Full.ofString "sel"
|
||||
let any : Body.Any := full
|
||||
let result ← Selectable.one #[
|
||||
.case any.recvSelector pure
|
||||
]
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "sel".toUTF8
|
||||
|
||||
#eval anyRecvSelectorFromFull.block
|
||||
|
||||
/-! ## Request.Builder body tests -/
|
||||
|
||||
private def recvBuiltBody (body : Body.Full) : Async (Option Chunk) :=
|
||||
body.recv
|
||||
|
||||
-- Test Request.Builder.text sets correct headers
|
||||
|
||||
def requestBuilderText : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.text "Hello, World!"
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8")
|
||||
assert! req.line.headers.get? Header.Name.contentLength == none
|
||||
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "Hello, World!".toUTF8
|
||||
|
||||
#eval requestBuilderText.block
|
||||
|
||||
-- Test Request.Builder.json sets correct headers
|
||||
|
||||
def requestBuilderJson : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.json "{\"key\": \"value\"}"
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json")
|
||||
assert! req.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "{\"key\": \"value\"}".toUTF8
|
||||
|
||||
#eval requestBuilderJson.block
|
||||
|
||||
-- Test Request.Builder.fromBytes sets body
|
||||
|
||||
def requestBuilderFromBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0x01, 0x02, 0x03]
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.fromBytes data
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval requestBuilderFromBytes.block
|
||||
|
||||
-- Test Request.Builder.noBody creates empty body
|
||||
|
||||
def requestBuilderNoBody : Async Unit := do
|
||||
let req ← Request.get (.originForm! "/api")
|
||||
|>.empty
|
||||
|
||||
assert! req.body == {}
|
||||
|
||||
#eval requestBuilderNoBody.block
|
||||
|
||||
-- Test Request.Builder.bytes sets application/octet-stream content type
|
||||
|
||||
def requestBuilderBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0xde, 0xad, 0xbe, 0xef]
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.bytes data
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/octet-stream")
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval requestBuilderBytes.block
|
||||
|
||||
-- Test Request.Builder.html sets text/html content type
|
||||
|
||||
def requestBuilderHtml : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.html "<h1>Hello</h1>"
|
||||
|
||||
assert! req.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/html; charset=utf-8")
|
||||
let body ← recvBuiltBody req.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "<h1>Hello</h1>".toUTF8
|
||||
|
||||
#eval requestBuilderHtml.block
|
||||
|
||||
-- Test Request.Builder.stream creates a streaming body
|
||||
|
||||
def requestBuilderStream : Async Unit := do
|
||||
let req ← Request.post (.originForm! "/api")
|
||||
|>.stream fun s => do
|
||||
s.send (Chunk.ofByteArray "streamed".toUTF8)
|
||||
|
||||
let result ← req.body.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "streamed".toUTF8
|
||||
|
||||
#eval requestBuilderStream.block
|
||||
|
||||
-- Test Request.Builder.noBody body is always closed and returns none
|
||||
|
||||
def requestBuilderNoBodyAlwaysClosed : Async Unit := do
|
||||
let req ← Request.get (.originForm! "/api")
|
||||
|>.empty
|
||||
|
||||
assert! (← req.body.isClosed)
|
||||
let result ← req.body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval requestBuilderNoBodyAlwaysClosed.block
|
||||
|
||||
/-! ## Response.Builder body tests -/
|
||||
|
||||
-- Test Response.Builder.text sets correct headers
|
||||
|
||||
def responseBuilderText : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.text "Hello, World!"
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8")
|
||||
assert! res.line.headers.get? Header.Name.contentLength == none
|
||||
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "Hello, World!".toUTF8
|
||||
|
||||
#eval responseBuilderText.block
|
||||
|
||||
-- Test Response.Builder.json sets correct headers
|
||||
|
||||
def responseBuilderJson : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.json "{\"status\": \"ok\"}"
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json")
|
||||
assert! res.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "{\"status\": \"ok\"}".toUTF8
|
||||
|
||||
#eval responseBuilderJson.block
|
||||
|
||||
-- Test Response.Builder.fromBytes sets body
|
||||
|
||||
def responseBuilderFromBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0xaa, 0xbb]
|
||||
let res ← Response.ok
|
||||
|>.fromBytes data
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentLength == none
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval responseBuilderFromBytes.block
|
||||
|
||||
-- Test Response.Builder.noBody creates empty body
|
||||
|
||||
def responseBuilderNoBody : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.empty
|
||||
|
||||
assert! res.body == {}
|
||||
|
||||
#eval responseBuilderNoBody.block
|
||||
|
||||
-- Test Response.Builder.bytes sets application/octet-stream content type
|
||||
|
||||
def responseBuilderBytes : Async Unit := do
|
||||
let data := ByteArray.mk #[0xca, 0xfe]
|
||||
let res ← Response.ok
|
||||
|>.bytes data
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/octet-stream")
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == data
|
||||
|
||||
#eval responseBuilderBytes.block
|
||||
|
||||
-- Test Response.Builder.html sets text/html content type
|
||||
|
||||
def responseBuilderHtml : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.html "<p>OK</p>"
|
||||
|
||||
assert! res.line.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/html; charset=utf-8")
|
||||
let body ← recvBuiltBody res.body
|
||||
assert! body.isSome
|
||||
assert! body.get!.data == "<p>OK</p>".toUTF8
|
||||
|
||||
#eval responseBuilderHtml.block
|
||||
|
||||
-- Test Response.Builder.stream creates a streaming body
|
||||
|
||||
def responseBuilderStream : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.stream fun s => do
|
||||
s.send (Chunk.ofByteArray "streamed".toUTF8)
|
||||
|
||||
let result ← res.body.recv
|
||||
assert! result.isSome
|
||||
assert! result.get!.data == "streamed".toUTF8
|
||||
|
||||
#eval responseBuilderStream.block
|
||||
|
||||
-- Test Response.Builder.noBody body is always closed and returns none
|
||||
|
||||
def responseBuilderNoBodyAlwaysClosed : Async Unit := do
|
||||
let res ← Response.ok
|
||||
|>.empty
|
||||
|
||||
assert! (← res.body.isClosed)
|
||||
let result ← res.body.recv
|
||||
assert! result.isNone
|
||||
|
||||
#eval responseBuilderNoBodyAlwaysClosed.block
|
||||
Reference in New Issue
Block a user