Compare commits

...

5 Commits

Author SHA1 Message Date
Sofia Rodrigues
7a6a5a6faf fix: broadcast test 2026-01-23 17:26:31 -03:00
Sofia Rodrigues
60641faa10 fix: remove bracket 2026-01-22 15:56:09 -03:00
Sofia Rodrigues
bfb060641f fix: remove dup 2026-01-22 15:48:10 -03:00
Sofia Rodrigues
6a27cb38aa fix: ofIOPromise 2026-01-22 15:41:39 -03:00
Sofia Rodrigues
2a4ca6f5d0 refactor: move async namespace 2026-01-22 15:36:20 -03:00
40 changed files with 249 additions and 379 deletions

View File

@@ -6,6 +6,7 @@ Authors: Sebastian Ullrich
module
prelude
public import Std.Async
public import Std.Data
public import Std.Do
public import Std.Sat

19
src/Std/Async.lean Normal file
View File

@@ -0,0 +1,19 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
module
prelude
public import Std.Async.Basic
public import Std.Async.ContextAsync
public import Std.Async.Timer
public import Std.Async.TCP
public import Std.Async.UDP
public import Std.Async.DNS
public import Std.Async.Select
public import Std.Async.Process
public import Std.Async.System
public import Std.Async.Signal
public import Std.Async.IO

View File

@@ -10,10 +10,7 @@ public import Init.System.Promise
public section
namespace Std
namespace Internal
namespace IO
namespace Async
namespace Std.Async
/-!
@@ -104,10 +101,6 @@ instance [Monad m] [MonadAwait t m] : MonadAwait t (ReaderT n m) where
instance [MonadAwait t m] : MonadAwait t (StateRefT' s n m) where
await := liftM (m := m) MonadAwait.await
@[default_instance]
instance [Monad m] [MonadAwait t m] : MonadAwait t (StateT s m) where
await := liftM (m := m) MonadAwait.await
@[default_instance]
instance [MonadAsync t m] : MonadAsync t (ReaderT n m) where
async p prio := MonadAsync.async (prio := prio) p
@@ -122,6 +115,19 @@ instance [Monad m] [Functor t] [inst : MonadAsync t m] : MonadAsync t (StateT s
let t inst.async (prio := prio) (p s)
pure (t <&> Prod.fst, s)
/--
Type class for converting a source type into an async computation.
This provides a uniform interface for lifting various types (tasks, promises, etc.)
into async monads.
-/
class ToAsync (source : Type) (target : outParam Type) where
/--
Convert the source value into an async computation using the default error message.
-/
toAsync : source target
/--
A `Task` that may resolve to either a value of type `α` or an error value of type `.
-/
@@ -177,14 +183,12 @@ protected def mapEIO (f : α → EIO ε β) (x : ETask ε α) (prio := Task.Prio
| .error e => throw e
/--
Block until the `ETask` in `x` finishes and returns its value. Propagates any error encountered
Wait until the `ETask` in `x` finishes and returns its value. Propagates any error encountered
during execution.
-/
@[inline]
def block (x : ETask ε α) : EIO ε α := do
match x.get with
| .ok a => return a
| .error e => throw e
def block (x : ETask ε α) : EIO ε α :=
liftExcept x.get
/--
Create an `ETask` that resolves to the value of the promise `x`. If the promise gets dropped then it
@@ -200,7 +204,7 @@ panics.
-/
@[inline]
def ofPurePromise (x : IO.Promise α) : ETask ε α :=
x.result!.map pure (sync := true)
x.result!.map .ok (sync := true)
/--
Obtain the `IO.TaskState` of `x`.
@@ -212,9 +216,8 @@ def getState (x : ETask ε α) : BaseIO IO.TaskState :=
instance : Functor (ETask ε) where
map := ETask.map
instance : Monad (ETask ε) where
pure := ETask.pure
bind := ETask.bind
instance : ToAsync (IO.Promise (Except ε α)) (ETask ε α) where
toAsync := ETask.ofPromise!
end ETask
@@ -225,6 +228,13 @@ abbrev AsyncTask := ETask IO.Error
namespace AsyncTask
/--
Waits for the result of the `AsyncTask`, blocking if necessary.
-/
@[inline]
protected def block (self : AsyncTask α) : IO α :=
ETask.block self
/--
Similar to `map`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
@@ -235,87 +245,39 @@ protected def mapIO (f : α → IO β) (x : AsyncTask α) (prio := Task.Priority
| .ok a => f a
| .error e => throw e
/--
Construct an `AsyncTask` that is already resolved with value `x`.
-/
@[inline]
protected def pure (x : α) : AsyncTask α :=
Task.pure <| .ok x
/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that resolves to the error.
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
-/
@[inline]
protected def bind (x : AsyncTask α) (f : α AsyncTask β) (prio := Task.Priority.default) (sync := false) : AsyncTask β :=
Task.bind x (prio := prio) (sync := sync) fun
| .ok a => f a
| .error e => Task.pure <| .error e
/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that resolves to the error.
- succeeds, return an `AsyncTask` that resolves to `f x`.
-/
@[inline]
def map (f : α β) (x : AsyncTask α) (prio := Task.Priority.default) (sync := false) : AsyncTask β :=
Task.map (x := x) (f <$> ·) prio sync
/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def bindIO (x : AsyncTask α) (f : α IO (AsyncTask β)) (prio := Task.Priority.default) (sync := false) : BaseIO (AsyncTask β) :=
protected def bindIO (x : AsyncTask α) (f : α IO (AsyncTask β)) (prio := Task.Priority.default) (sync := false) : BaseIO (AsyncTask β) :=
IO.bindTask x (prio := prio) (sync := sync) fun
| .ok a => f a
| .error e => throw e
/--
Similar to `map`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
Create an `AsyncTask` that resolves to the value of `x`. Returns an error if the promise is dropped.
-/
@[inline]
def mapTaskIO (f : α IO β) (x : AsyncTask α) (prio := Task.Priority.default) (sync := false) : BaseIO (AsyncTask β) :=
IO.mapTask (t := x) (prio := prio) (sync := sync) fun
| .ok a => f a
| .error e => throw e
/--
Block until the `AsyncTask` in `x` finishes.
-/
def block (x : AsyncTask α) : IO α :=
match x.get with
| .ok a => return a
| .error e => throw e
/--
Create an `AsyncTask` that resolves to the value of `x`.
-/
@[inline]
def ofPromise (x : IO.Promise (Except IO.Error α)) (error : String := "the promise linked to the Async Task was dropped") : AsyncTask α :=
def ofPromise (x : IO.Promise (Except IO.Error α)) (error : String := "the promise linked to the AsyncTask was dropped") : AsyncTask α :=
x.result?.map fun
| none => .error error
| some res => res
/--
Create an `AsyncTask` that resolves to the value of `x`.
Create an `AsyncTask` that resolves to the value of `x`. Returns an error if the promise is dropped.
-/
@[inline]
def ofPurePromise (x : IO.Promise α) (error : String := "the promise linked to the Async Task was dropped") : AsyncTask α :=
def ofPurePromise (x : IO.Promise α) (error : String := "the promise linked to the AsyncTask was dropped") : AsyncTask α :=
x.result?.map (sync := true) fun
| none => .error error
| some res => pure res
/--
Obtain the `IO.TaskState` of `x`.
-/
@[inline]
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
IO.getTaskState x
instance : ToAsync (IO.Promise (Except IO.Error α)) (AsyncTask α) where
toAsync x := AsyncTask.ofPromise x "the promise linked to the AsyncTask was dropped"
instance : ToAsync (IO.Promise α) (AsyncTask α) where
toAsync x := AsyncTask.ofPurePromise x "the promise linked to the AsyncTask was dropped"
end AsyncTask
@@ -397,10 +359,11 @@ def mk (x : BaseIO (MaybeTask α)) : BaseAsync α :=
x
/--
Converts a `BaseAsync` into a `BaseIO`
Unwraps a `BaseAsync` to access the underlying `BaseIO (MaybeTask α)`.
This is an implementation detail and should not be used directly.
-/
@[inline]
def toRawBaseIO (x : BaseAsync α) : BaseIO (MaybeTask α) :=
protected def unwrap (x : BaseAsync α) : BaseIO (MaybeTask α) :=
x
/--
@@ -408,7 +371,7 @@ Converts a `BaseAsync` to a `BaseIO Task`.
-/
@[inline]
protected def toBaseIO (x : BaseAsync α) : BaseIO (Task α) :=
MaybeTask.toTask <$> x.toRawBaseIO
MaybeTask.toTask <$> x.unwrap
/--
Creates a new `BaseAsync` out of a `Task`.
@@ -429,32 +392,29 @@ Maps the result of a `BaseAsync` computation with a function.
-/
@[inline]
protected def map (f : α β) (self : BaseAsync α) (prio := Task.Priority.default) (sync := false) : BaseAsync β :=
mk <| (·.map f prio sync) <$> self.toRawBaseIO
mk <| (·.map f prio sync) <$> self.unwrap
/--
Sequences two computations, allowing the second to depend on the value computed by the first.
-/
@[inline]
protected def bind (self : BaseAsync α) (f : α BaseAsync β) (prio := Task.Priority.default) (sync := false) : BaseAsync β :=
mk <| self.toRawBaseIO >>= (bindAsyncTask · f |>.toRawBaseIO)
where
bindAsyncTask (t : MaybeTask α) (f : α BaseAsync β) : BaseAsync β := .mk <|
match t with
| .pure a => (f a) |>.toRawBaseIO
| .ofTask t => .ofTask <$> BaseIO.bindTask t (fun a => MaybeTask.toTask <$> (f a |>.toRawBaseIO)) prio sync
mk <| self.unwrap >>= fun
| .pure a => (f a).unwrap
| .ofTask t => .ofTask <$> BaseIO.bindTask t (fun a => MaybeTask.toTask <$> (f a).unwrap) prio sync
/--
Lifts a `BaseIO` action into a `BaseAsync` computation.
-/
@[inline]
protected def lift (x : BaseIO α) : BaseAsync α :=
.mk <| (pure pure) =<< x
.mk <| MaybeTask.pure <$> x
/--
Waits for the result of the `BaseAsync` computation, blocking if necessary.
-/
@[inline]
protected def wait (self : BaseAsync α) : BaseIO α :=
protected def block (self : BaseAsync α) : BaseIO α :=
pure Task.get =<< self.toBaseIO
/--
@@ -462,7 +422,7 @@ Lifts a `BaseAsync` computation into a `Task` that can be awaited and joined.
-/
@[inline]
protected def asTask (x : BaseAsync α) (prio := Task.Priority.default) : BaseIO (Task α) := do
let res BaseIO.asTask (prio := prio) x.toRawBaseIO
let res BaseIO.asTask (prio := prio) x.unwrap
return MaybeTask.joinTask res
/--
@@ -527,16 +487,20 @@ The other result is lost and the other task is not cancelled, so the task will c
until the end.
-/
@[inline, specialize]
def race [Inhabited α] (x : BaseAsync α) (y : BaseAsync α) (prio := Task.Priority.default) : BaseAsync α := do
let promise IO.Promise.new
def race (x : BaseAsync α) (y : BaseAsync α) (prio := Task.Priority.default) : BaseAsync α := do
-- Use Option α for the promise to avoid Inhabited constraint
let promise : IO.Promise (Option α) IO.Promise.new
let task₁ : Task _ MonadAsync.async (prio := prio) x
let task₂ : Task _ MonadAsync.async (prio := prio) y
BaseIO.chainTask task₁ (liftM promise.resolve)
BaseIO.chainTask task₂ (liftM promise.resolve)
BaseIO.chainTask task₁ (liftM promise.resolve some)
BaseIO.chainTask task₂ (liftM promise.resolve some)
MonadAwait.await promise.result!
let result MonadAwait.await promise.result!
match result with
| some a => pure a
| none => MonadAwait.await task₁ -- Fallback, shouldn't happen in practice
/--
Runs all computations in an `Array` concurrently and returns all results as an array.
@@ -561,6 +525,12 @@ def raceAll [Inhabited α] [ForM BaseAsync c (BaseAsync α)] (xs : c) (prio := T
MonadAwait.await promise.result!
instance : ToAsync (Task α) (BaseAsync α) where
toAsync := BaseAsync.ofTask
instance : ToAsync (Except Empty α) (BaseAsync α) where
toAsync := BaseAsync.ofExcept
end BaseAsync
/--
@@ -575,10 +545,10 @@ Converts a `EAsync` to a `ETask`.
-/
@[inline]
protected def toBaseIO (x : EAsync ε α) : BaseIO (ETask ε α) :=
MaybeTask.toTask <$> x.toRawBaseIO
MaybeTask.toTask <$> x.unwrap
/--
Creates a new `EAsync` out of a `RTask`.
Creates a new `EAsync` out of an `ETask`.
-/
@[inline]
protected def ofTask (x : ETask ε α) : EAsync ε α :=
@@ -589,14 +559,7 @@ Converts a `BaseAsync` to a `EIO ETask`.
-/
@[inline]
protected def toEIO (x : EAsync ε α) : EIO ε (ETask ε α) :=
MaybeTask.toTask <$> x.toRawBaseIO
/--
Creates a new `EAsync` out of a `ETask`.
-/
@[inline]
protected def ofETask (x : ETask ε α) : EAsync ε α :=
.mk <| BaseAsync.ofTask x
MaybeTask.toTask <$> x.unwrap
/--
Creates an `EAsync` computation that immediately returns the given value.
@@ -609,15 +572,15 @@ protected def pure (a : α) : EAsync ε α :=
Maps the result of an `EAsync` computation with a pure function.
-/
@[inline]
protected def map (f : α β) (self : EAsync ε α) : EAsync ε β :=
.mk <| BaseAsync.map (.map f) self
protected def map (f : α β) (self : EAsync ε α) (prio := Task.Priority.default) (sync := false) : EAsync ε β :=
.mk <| BaseAsync.map (.map f) self prio sync
/--
Sequences two computations, allowing the second to depend on the value computed by the first.
-/
@[inline]
protected def bind (self : EAsync ε α) (f : α EAsync ε β) : EAsync ε β :=
.mk <| BaseAsync.bind self fun
protected def bind (self : EAsync ε α) (f : α EAsync ε β) (prio := Task.Priority.default) (sync := false) : EAsync ε β :=
.mk <| BaseAsync.bind (prio := prio) (sync := sync) self fun
| .ok a => f a
| .error e => BaseAsync.pure (.error e)
@@ -632,11 +595,8 @@ protected def lift (x : EIO ε α) : EAsync ε α :=
Waits for the result of the `EAsync` computation, blocking if necessary.
-/
@[inline]
protected def wait (self : EAsync ε α) : EIO ε α := do
let result self |> BaseAsync.wait
match result with
| .ok a => return a
| .error e => throw e
protected def block (self : EAsync ε α) : EIO ε α :=
liftExcept =<< BaseAsync.block self
/--
Lifts an `EAsync` computation into an `ETask` that can be awaited and joined.
@@ -645,13 +605,6 @@ Lifts an `EAsync` computation into an `ETask` that can be awaited and joined.
protected def asTask (x : EAsync ε α) (prio := Task.Priority.default) : EIO ε (ETask ε α) :=
x |> BaseAsync.asTask (prio := prio)
/--
Block until the `EAsync` finishes and returns its value. Propagates any error encountered during execution.
-/
@[inline]
protected def block (x : EAsync ε α) (prio := Task.Priority.default) : EIO ε α :=
x.asTask (prio := prio) >>= ETask.block
/--
Raises an error of type ` within the `EAsync` monad.
-/
@@ -671,11 +624,9 @@ protected def tryCatch (x : EAsync ε α) (f : ε → EAsync ε α) (prio := Tas
/--
Runs an action, ensuring that some other action always happens afterward.
-/
protected def tryFinally'
(x : EAsync ε α) (f : Option α EAsync ε β)
(prio := Task.Priority.default) (sync := false) :
EAsync ε (α × β) :=
.mk <| BaseAsync.bind x (prio := prio) (sync := sync) fun
@[inline]
protected def tryFinally' (x : EAsync ε α) (f : Option α EAsync ε β) : EAsync ε (α × β) :=
.mk <| BaseAsync.bind x fun
| .ok a => do
match (f (some a)) with
| .ok b => BaseAsync.pure (.ok (a, b))
@@ -776,7 +727,7 @@ protected partial def forIn
| .ok (.yield b) => loop b
loop init
.mk <| EAsync.ofETask promise.result!
.mk <| EAsync.ofTask promise.result!
instance : ForIn (EAsync ε) Lean.Loop Unit where
forIn _ := EAsync.forIn
@@ -805,19 +756,22 @@ The other result is lost and the other task is not cancelled, so the task will c
until the end.
-/
@[inline, specialize]
def race [Inhabited α] (x : EAsync ε α) (y : EAsync ε α)
def race (x : EAsync ε α) (y : EAsync ε α)
(prio := Task.Priority.default) :
EAsync ε α := do
let promise IO.Promise.new
-- Use Option to avoid Inhabited constraint
let promise : IO.Promise (Option (Except ε α)) IO.Promise.new
let task₁ : ETask ε _ MonadAsync.async (prio := prio) x
let task₂ : ETask ε _ MonadAsync.async (prio := prio) y
BaseIO.chainTask task₁ (liftM promise.resolve)
BaseIO.chainTask task₂ (liftM promise.resolve)
BaseIO.chainTask task₁ (liftM promise.resolve some)
BaseIO.chainTask task₂ (liftM promise.resolve some)
let result MonadAwait.await promise.result!
EAsync.ofExcept result
match result with
| some res => EAsync.ofExcept res
| none => MonadAwait.await task₁ -- Fallback, shouldn't happen
/--
Runs all computations in an `Array` concurrently and returns all results as an array.
@@ -843,6 +797,12 @@ def raceAll [Inhabited α] [ForM (EAsync ε) c (EAsync ε α)] (xs : c) (prio :=
let result MonadAwait.await promise.result!
EAsync.ofExcept result
instance : ToAsync (ETask ε α) (EAsync ε α) where
toAsync := EAsync.ofTask
instance : ToAsync (Except ε α) (EAsync ε α) where
toAsync := EAsync.ofExcept
end EAsync
/--
@@ -857,20 +817,20 @@ Converts a `Async` to a `AsyncTask`.
-/
@[inline]
protected def toIO (x : Async α) : IO (AsyncTask α) :=
MaybeTask.toTask <$> x.toRawBaseIO
MaybeTask.toTask <$> x.unwrap
/--
Block until the `Async` finishes and returns its value. Propagates any error encountered during execution.
Waits for the result of the `Async` computation, blocking if necessary.
-/
@[inline]
protected def block (x : Async α) (prio := Task.Priority.default) : IO α :=
x.asTask (prio := prio) >>= ETask.block
protected def block (self : Async α) : IO α :=
EAsync.block self
/--
Converts `Promise` into `Async`.
-/
@[inline]
protected def ofPromise (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α := do
protected def ofIOPromise (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α := do
match task.toBaseIO with
| .ok data => pure (f := BaseIO) <| MaybeTask.ofTask <| data.result?.map fun
| none => .error error
@@ -932,12 +892,8 @@ instance : MonadAwait IO.Promise Async :=
Runs two computations concurrently and returns both results as a pair.
-/
@[inline, specialize]
def concurrently (x : Async α) (y : Async β) (prio := Task.Priority.default) : Async (α × β) := do
let taskX MonadAsync.async x (prio := prio)
let taskY MonadAsync.async y (prio := prio)
let resultX MonadAwait.await taskX
let resultY MonadAwait.await taskY
return (resultX, resultY)
def concurrently (x : Async α) (y : Async β) (prio := Task.Priority.default) : Async (α × β) :=
EAsync.concurrently x y prio
/--
Runs two computations concurrently and returns the result of the one that finishes first.
@@ -945,27 +901,15 @@ The other result is lost and the other task is not cancelled, so the task will c
until the end.
-/
@[inline, specialize]
def race [Inhabited α] (x : Async α) (y : Async α)
(prio := Task.Priority.default) :
Async α := do
let promise IO.Promise.new
let task₁ MonadAsync.async (t := AsyncTask) (prio := prio) x
let task₂ MonadAsync.async (t := AsyncTask) (prio := prio) y
BaseIO.chainTask task₁ (liftM promise.resolve)
BaseIO.chainTask task₂ (liftM promise.resolve)
let result MonadAwait.await promise
Async.ofExcept result
def race (x : Async α) (y : Async α) (prio := Task.Priority.default) : Async α :=
EAsync.race x y prio
/--
Runs all computations in an `Array` concurrently and returns all results as an array.
-/
@[inline, specialize]
def concurrentlyAll (xs : Array (Async α)) (prio := Task.Priority.default) : Async (Array α) := do
let tasks : Array (AsyncTask α) xs.mapM (MonadAsync.async (prio := prio))
tasks.mapM MonadAwait.await
def concurrentlyAll (xs : Array (Async α)) (prio := Task.Priority.default) : Async (Array α) :=
EAsync.concurrentlyAll xs prio
/--
Runs all computations concurrently and returns the result of the first one to finish.
@@ -973,15 +917,23 @@ All other results are lost, and the tasks are not cancelled, so they'll continue
until the end.
-/
@[inline, specialize]
def raceAll [ForM Async c (Async α)] (xs : c) (prio := Task.Priority.default) : Async α := do
let promise IO.Promise.new
def raceAll [Inhabited α] [ForM (EAsync IO.Error) c (EAsync IO.Error α)] (xs : c) (prio := Task.Priority.default) : Async α :=
EAsync.raceAll xs prio
ForM.forM xs fun x => do
let task₁ MonadAsync.async (t := AsyncTask) (prio := prio) x
BaseIO.chainTask task₁ (liftM promise.resolve)
instance : ToAsync (AsyncTask α) (Async α) where
toAsync := Async.ofAsyncTask
let result MonadAwait.await promise
Async.ofExcept result
instance : ToAsync (Task α) (Async α) where
toAsync := Async.ofTask
instance : ToAsync (Except IO.Error α) (Async α) where
toAsync := Async.ofExcept
instance : ToAsync (IO (IO.Promise (Except IO.Error α))) (Async α) where
toAsync x := Async.ofIOPromise x "the promise linked to the Async was dropped"
instance : ToAsync (IO (IO.Promise α)) (Async α) where
toAsync x := Async.ofPurePromise x "the promise linked to the Async was dropped"
end Async
@@ -995,7 +947,4 @@ This function transforms the operation inside the monad `m` into a task and let
def background [Monad m] [MonadAsync t m] (action : m α) (prio := Task.Priority.default) : m Unit :=
discard (async (t := t) (prio := prio) action)
end Async
end IO
end Internal
end Std
end Std.Async

View File

@@ -8,8 +8,8 @@ module
prelude
public import Std.Time
public import Std.Internal.UV
public import Std.Internal.Async.Basic
public import Std.Internal.Async.Timer
public import Std.Async.Basic
public import Std.Async.Timer
public import Std.Sync.CancellationContext
public section
@@ -19,10 +19,7 @@ This module contains the implementation of `ContextAsync`, a monad for asynchron
cooperative cancellation support that must be explicitly checked for and cancelled explicitly.
-/
namespace Std
namespace Internal
namespace IO
namespace Async
namespace Std.Async
/--
An asynchronous computation with cooperative cancellation support via a `CancellationContext`. `ContextAsync α`
@@ -267,7 +264,4 @@ This is useful for selecting on cancellation alongside other asynchronous operat
def Selector.cancelled : ContextAsync (Selector Unit) := do
ContextAsync.doneSelector
end Async
end IO
end Internal
end Std
end Std.Async

View File

@@ -8,17 +8,13 @@ module
prelude
public import Std.Time
public import Std.Internal.UV
public import Std.Internal.Async.Basic
public import Std.Async.Basic
public section
namespace Std
namespace Internal
namespace IO
namespace Async
namespace DNS
namespace Std.Async.DNS
open Std.Net
open Std.Net Internal UV
/--
Represents a resolved hostname and service name from a socket address.
@@ -39,7 +35,7 @@ Asynchronously resolves a hostname and service to an array of socket addresses.
-/
@[inline]
def getAddrInfo (host : String) (service : String) (addrFamily : Option AddressFamily := none) : Async (Array IPAddr) := do
Async.ofPromise <| UV.DNS.getAddrInfo
Async.ofIOPromise <| UV.DNS.getAddrInfo
host
service
(match addrFamily with
@@ -53,11 +49,7 @@ Performs a reverse DNS lookup on a `SocketAddress`.
@[inline]
def getNameInfo (host : @& SocketAddress) : Async NameInfo :=
UV.DNS.getNameInfo host
|> Async.ofPromise
|> Async.ofIOPromise
|>.map (Function.uncurry NameInfo.mk)
end DNS
end Async
end IO
end Internal
end Std
end Std.Async.DNS

View File

@@ -6,16 +6,11 @@ Authors: Sofia Rodrigues
module
prelude
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
namespace Std
namespace Internal
namespace Async
namespace IO
open Std.Internal.IO.Async
namespace Std.Async
/-!
This module provides buffered asynchronous I/O operations for efficient reading and writing.
@@ -48,7 +43,4 @@ class AsyncStream (α : Type) (β : outParam Type) where
stop : α IO Unit :=
fun _ => pure ()
end IO
end Async
end Internal
end Std
end Std.Async

View File

@@ -12,13 +12,9 @@ public import Std.Data.HashMap
public section
namespace Std.Async.Process
open Std Time
open System
namespace Std
namespace Internal
namespace IO
namespace Process
open Internal UV System
/--
Represents resource usage statistics for a process or thread.
@@ -236,7 +232,4 @@ Returns the available memory for allocation in bytes.
def availableMemory : IO UInt64 :=
UV.System.availableMemory
end Process
end IO
end Internal
end Std
end Std.Async.Process

View File

@@ -7,7 +7,7 @@ module
prelude
public import Init.Data.Random
public import Std.Internal.Async.Basic
public import Std.Async.Basic
import Init.Data.ByteArray.Extra
public section
@@ -18,10 +18,7 @@ The main entrypoint for users is `Selectable.one` and the various functions to p
`Selector`s from other modules.
-/
namespace Std
namespace Internal
namespace IO
namespace Async
namespace Std.Async
/--
The core data structure for racing on winning a `Selectable.one` if multiple event sources are ready
@@ -166,7 +163,7 @@ partial def Selectable.one (selectables : Array (Selectable α)) : Async α := d
async.toBaseIO
Async.ofPromise (pure promise)
Async.ofIOPromise (pure promise)
/--
Performs fair and data-loss free non-blocking multiplexing on the `Selectable`s in `selectables`.
@@ -245,7 +242,4 @@ def Selectable.combine (selectables : Array (Selectable α)) : IO (Selector α)
selectable.selector.unregisterFn
}
end Async
end IO
end Internal
end Std
end Std.Async

View File

@@ -8,14 +8,11 @@ module
prelude
public import Std.Time
public import Std.Internal.UV.Signal
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
namespace Std
namespace Internal
namespace IO
namespace Async
namespace Std.Async
/--
Unix style signals for Unix and Windows. SIGKILL and SIGSTOP are missing because they cannot be caught.
@@ -261,3 +258,5 @@ def selector (s : Signal.Waiter) : Selector Unit :=
unregisterFn := s.native.cancel
}
end Std.Async.Signal.Waiter

View File

@@ -18,13 +18,9 @@ manipulation.
-/
open Std Time
open System
open Internal UV System
namespace Std
namespace Internal
namespace IO
namespace Async
namespace System
namespace Std.Async.System
/--
A group identifier, represented by a numeric ID in UNIX systems (e.g. 1000).
@@ -314,8 +310,4 @@ def getGroup (groupId : GroupId) : IO (Option GroupInfo) := do
members := group.members
}
end System
end Async
end IO
end Internal
end Std
end Std.Async.System

View File

@@ -8,15 +8,11 @@ module
prelude
public import Std.Time
public import Std.Internal.UV.TCP
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
namespace Std
namespace Internal
namespace IO
namespace Async
namespace TCP
namespace Std.Async.TCP
open Std.Net
namespace Socket
@@ -66,7 +62,7 @@ Accepts an incoming connection.
@[inline]
def accept (s : Server) : Async Client := do
s.native.accept
|> Async.ofPromise
|> Async.ofIOPromise
|>.map Client.ofNative
/--
@@ -153,21 +149,21 @@ Connects the client socket to the given address.
-/
@[inline]
def connect (s : Client) (addr : SocketAddress) : Async Unit :=
Async.ofPromise <| s.native.connect addr
Async.ofIOPromise <| s.native.connect addr
/--
Sends multiple data buffers through the client socket.
-/
@[inline]
def sendAll (s : Client) (data : Array ByteArray) : Async Unit :=
Async.ofPromise <| s.native.send data
Async.ofIOPromise <| s.native.send data
/--
Sends data through the client socket.
-/
@[inline]
def send (s : Client) (data : ByteArray) : Async Unit :=
Async.ofPromise <| s.native.send #[data]
Async.ofIOPromise <| s.native.send #[data]
/--
Receives data from the client socket. If data is received, its wrapped in .some. If EOF is reached,
@@ -177,7 +173,7 @@ Furthermore calling this function in parallel with `recvSelector` is not support
-/
@[inline]
def recv? (s : Client) (size : UInt64) : Async (Option ByteArray) :=
Async.ofPromise <| s.native.recv? size
Async.ofIOPromise <| s.native.recv? size
/--
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
@@ -224,7 +220,7 @@ Shuts down the write side of the client socket.
-/
@[inline]
def shutdown (s : Client) : Async Unit :=
Async.ofPromise <| s.native.shutdown
Async.ofIOPromise <| s.native.shutdown
/--
Gets the remote address of the client socket.
@@ -256,8 +252,4 @@ def keepAlive (s : Client) (enable : Bool) (delay : Std.Time.Second.Offset) (_ :
end Client
end Socket
end TCP
end Async
end IO
end Internal
end Std
end Std.Async.TCP

View File

@@ -8,15 +8,12 @@ module
prelude
public import Std.Time
public import Std.Internal.UV.Timer
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
namespace Std
namespace Internal
namespace IO
namespace Async
namespace Std.Async
/--
`Sleep` can be used to sleep for some duration once.
@@ -167,7 +164,4 @@ def stop (i : Interval) : IO Unit :=
end Interval
end Async
end IO
end Internal
end Std
end Std.Async

View File

@@ -8,15 +8,11 @@ module
prelude
public import Std.Time
public import Std.Internal.UV.UDP
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
namespace Std
namespace Internal
namespace IO
namespace Async
namespace UDP
namespace Std.Async.UDP
open Std.Net
@@ -66,7 +62,7 @@ address. If `addr` is `none`, the data is sent to the default peer address set b
-/
@[inline]
def sendAll (s : Socket) (data : Array ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
Async.ofPromise <| s.native.send data addr
Async.ofIOPromise <| s.native.send data addr
/--
Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr`
@@ -74,7 +70,7 @@ is `none`, the data is sent to the default peer address set by `connect`.
-/
@[inline]
def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
Async.ofPromise <| s.native.send #[data] addr
Async.ofIOPromise <| s.native.send #[data] addr
/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive.
@@ -85,7 +81,7 @@ Furthermore calling this function in parallel with `recvSelector` is not support
-/
@[inline]
def recv (s : Socket) (size : UInt64) : Async (ByteArray × Option SocketAddress) :=
Async.ofPromise <| s.native.recv size
Async.ofIOPromise <| s.native.recv size
/--
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
@@ -190,8 +186,4 @@ def setTTL (s : Socket) (ttl : UInt32) : IO Unit :=
end Socket
end UDP
end Async
end IO
end Internal
end Std
end Std.Async.UDP

View File

@@ -6,7 +6,6 @@ Authors: Henrik Böving
module
prelude
public import Std.Internal.Async
public import Std.Internal.Parsec
public import Std.Internal.UV

View File

@@ -1,19 +0,0 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
module
prelude
public import Std.Internal.Async.Basic
public import Std.Internal.Async.ContextAsync
public import Std.Internal.Async.Timer
public import Std.Internal.Async.TCP
public import Std.Internal.Async.UDP
public import Std.Internal.Async.DNS
public import Std.Internal.Async.Select
public import Std.Internal.Async.Process
public import Std.Internal.Async.System
public import Std.Internal.Async.Signal
public import Std.Internal.Async.IO

View File

@@ -10,14 +10,13 @@ public import Std.Data
public import Init.Data.Queue
public import Init.Data.Vector
public import Std.Sync.Mutex
public import Std.Internal.Async.IO
public import Std.Async.IO
public section
namespace Std
open Std.Internal.Async.IO
open Std.Internal.IO.Async
open Std.Async
/-!
The `Std.Sync.Broadcast` module implements a broadcasting primitive for sending values
@@ -60,7 +59,7 @@ instance instMonadLiftBroadcastIO : MonadLift (EIO Broadcast.Error) IO where
private structure Broadcast.Consumer (α : Type) where
promise : IO.Promise Bool
waiter : Option (Internal.IO.Async.Waiter (Option α))
waiter : Option (Async.Waiter (Option α))
private def Broadcast.Consumer.resolve (c : Broadcast.Consumer α) (b : Bool) : BaseIO Unit :=
c.promise.resolve b
@@ -403,7 +402,6 @@ private def recvReady'
let slotVal slot.get
return slotVal.pos = next
open Internal.IO.Async in
private partial def recvSelector (ch : Bounded.Receiver α) : Selector (Option α) where
tryFn := do
ch.state.atomically do
@@ -537,8 +535,6 @@ the next available message. This will block until a message is available.
def recv [Inhabited α] (ch : Broadcast.Receiver α) : BaseIO (Task (Option α)) := do
Std.Bounded.Receiver.recv ch.inner
open Internal.IO.Async in
/--
Creates a `Selector` that resolves once the broadcast channel `ch` has data available and provides that data.
-/
@@ -567,7 +563,7 @@ instance [Inhabited α] : AsyncStream (Broadcast.Receiver α) (Option α) where
stop channel := channel.unsubscribe
instance [Inhabited α] : AsyncRead (Broadcast.Receiver α) (Option α) where
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
read receiver := Async.ofIOTask receiver.recv
instance [Inhabited α] : AsyncWrite (Broadcast α) α where
write receiver x := do

View File

@@ -11,7 +11,7 @@ public import Init.System.Promise
public import Init.Data.Queue
public import Std.Sync.Mutex
public import Std.Sync.CancellationToken
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
@@ -21,7 +21,7 @@ automatically cancels all child contexts.
-/
namespace Std
open Std.Internal.IO.Async
open Std.Async
structure CancellationContext.State where
/--

View File

@@ -9,7 +9,7 @@ prelude
public import Std.Data
public import Init.Data.Queue
public import Std.Sync.Mutex
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
@@ -21,7 +21,7 @@ that a cancellation has occurred.
-/
namespace Std
open Std.Internal.IO.Async
open Std.Async
/--
Reasons for cancellation.

View File

@@ -8,13 +8,13 @@ module
prelude
public import Init.Data.Queue
public import Std.Sync.Mutex
public import Std.Internal.Async.IO
public import Std.Async.IO
import Init.Data.Vector.Basic
public section
open Std.Internal.Async.IO
open Std.Internal.IO.Async
open Std.Async
open Std.Async
/-!
This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer
@@ -51,7 +51,6 @@ instance : ToString Error where
instance : MonadLift (EIO Error) IO where
monadLift x := EIO.toIO (.userError <| toString ·) x
open Internal.IO.Async in
private inductive Consumer (α : Type) where
| normal (promise : IO.Promise (Option α))
| select (finished : Waiter (Option α))
@@ -172,7 +171,6 @@ private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
let st get
return !st.values.isEmpty || st.closed
open Internal.IO.Async in
private def recvSelector (ch : Unbounded α) : Selector (Option α) where
tryFn := do
ch.state.atomically do
@@ -322,7 +320,6 @@ private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
let st get
return !st.producers.isEmpty || st.closed
open Internal.IO.Async in
private def recvSelector (ch : Zero α) : Selector (Option α) where
tryFn := do
ch.state.atomically do
@@ -356,7 +353,6 @@ private def recvSelector (ch : Zero α) : Selector (Option α) where
end Zero
open Internal.IO.Async in
private structure Bounded.Consumer (α : Type) where
promise : IO.Promise Bool
waiter : Option (Waiter (Option α))
@@ -558,7 +554,6 @@ private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
let st get
return st.bufCount != 0 || st.closed
open Internal.IO.Async in
private partial def recvSelector (ch : Bounded α) : Selector (Option α) where
tryFn := do
ch.state.atomically do
@@ -732,7 +727,6 @@ def recv (ch : CloseableChannel α) : BaseIO (Task (Option α)) :=
| .zero ch => CloseableChannel.Zero.recv ch
| .bounded ch => CloseableChannel.Bounded.recv ch
open Internal.IO.Async in
/--
Creates a `Selector` that resolves once `ch` has data available and provides that data.
In particular if `ch` is closed while waiting on this `Selector` and no data is available already
@@ -759,7 +753,7 @@ instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where
next channel := channel.recvSelector
instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
read receiver := Async.ofIOTask receiver.recv
instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where
write receiver x := do
@@ -877,7 +871,6 @@ def recv [Inhabited α] (ch : Channel α) : BaseIO (Task α) := do
| some val => return .pure val
| none => unreachable!
open Internal.IO.Async in
/--
Creates a `Selector` that resolves once `ch` has data available and provides that data.
-/
@@ -909,7 +902,7 @@ instance [Inhabited α] : AsyncStream (Channel α) α where
next channel := channel.recvSelector
instance [Inhabited α] : AsyncRead (Channel α) α where
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
read receiver := Async.ofIOTask receiver.recv
instance [Inhabited α] : AsyncWrite (Channel α) α where
write receiver x := do

View File

@@ -8,7 +8,7 @@ module
prelude
public import Init.Data.Queue
public import Std.Sync.Mutex
public import Std.Internal.Async.Select
public import Std.Async.Select
public section
@@ -24,7 +24,7 @@ will be woken up per notification.
-/
namespace Std
open Std.Internal.IO.Async
open Std.Async
inductive Notify.Consumer (α : Type) where
| normal (promise : IO.Promise α)

View File

@@ -8,19 +8,17 @@ module
prelude
public import Std.Data
public import Init.Data.Queue
public import Std.Internal.Async.IO
public import Std.Async.IO
public section
open Std.Internal.Async.IO
open Std.Internal.IO.Async
/-!
This module provides `StreamMap`, a container that maps keys to async streams.
It allows for dynamic management of multiple named streams with async operations.
-/
namespace Std
open Std.Async
/--
This is an existential wrapper for AsyncStream that is used for the `.ofArray` function

View File

@@ -1,9 +1,9 @@
import Std.Internal.Async
import Std.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO.Async.UDP
open Std.Internal.IO.Async
open Std.Async.UDP
open Std.Async
open Std.Net
def t : IO (Async Nat) := do

View File

@@ -1,9 +1,9 @@
import Std.Internal.Async
import Std.Async
import Std.Sync.Mutex
open Std
open Std.Internal.IO.Async
open Std.Async
def wait (ms : UInt32) (ref : Std.Mutex Nat) (val : Nat) : Async Unit := do
ref.atomically (·.modify (· * val))

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
def cancellableSelector [Monad m] [MonadLift IO m] [MonadAsync AsyncTask m] (fn : Std.CancellationToken m α) : m (Selector (Except IO.Error α)) := do
let signal Std.CancellationToken.new

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
-- Test basic cancellation with default reason
def testBasicCancellationWithReason : Async Unit := do

View File

@@ -1,8 +1,8 @@
import Std.Internal.Async
import Std.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO Async
open Std Async
open Std.Net
open Std.Net

View File

@@ -1,6 +1,6 @@
import Std.Internal.Async.Timer
import Std.Internal.Async.TCP
import Std.Internal.Async.UDP
import Std.Async.Timer
import Std.Async.TCP
import Std.Async.UDP
#exit -- TODO: remove `#exit` after nondet issue is resolved.

View File

@@ -1,4 +1,4 @@
import Std.Internal.Async.Timer
import Std.Async.Timer
open Std Internal IO Async

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
-- Test basic message reception from multiple channels
def testSimpleMessages : Async Unit := do

View File

@@ -1,11 +1,11 @@
import Std.Internal.Async.Timer
import Std.Async.Timer
/-
these tests are just some preliminary ones as `async_sleep.lean` already contains extensive tests
for the entire timer state machine and `Async.Timer` is merely a light wrapper around it.
-/
open Std.Internal.IO.Async
open Std.Async
def BASE_DURATION : Std.Time.Millisecond.Offset := 10

View File

@@ -1,9 +1,9 @@
import Std.Internal.Async.System
import Std.Internal.Async.Process
import Std.Async.System
import Std.Async.Process
import Lean.Runtime
open Std.Internal.IO.Async.System
open Std.Internal.IO.Process
open Std.Async.System
open Std.Async.Process
#eval do
assert! ( getUpTime) > 0

View File

@@ -1,8 +1,8 @@
import Std.Internal.Async
import Std.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO Async
open Std Async
open Std.Net
def assertBEq [BEq α] [ToString α] (actual expected : α) : IO Unit := do

View File

@@ -1,8 +1,8 @@
import Std.Internal.Async
import Std.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO Async
open Std Async
open Std.Net
-- Using this function to create IO Error. For some reason the assert! is not pausing the execution.

View File

@@ -1,8 +1,8 @@
import Std.Internal.Async
import Std.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO Async
open Std Async
open Std.Net
-- Using this function to create IO Error. For some reason the assert! is not pausing the execution.

View File

@@ -1,9 +1,9 @@
import Std.Internal.Async
import Std.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO.Async.UDP
open Std.Internal.IO.Async
open Std.Async.UDP
open Std.Async
open Std.Net
def assertBEq [BEq α] [ToString α] (actual expected : α) : IO Unit := do

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
-- Test tryRecv with empty channel
def tryRecvEmpty : Async Unit := do
@@ -245,9 +245,9 @@ def recvConditions : Async Unit := do
let subs2 channel.subscribe
let subs3 channel.subscribe
discard <| EAsync.ofETask ( channel.send 1)
discard <| EAsync.ofETask ( channel.send 2)
discard <| EAsync.ofETask ( channel.send 3)
discard <| EAsync.ofTask ( channel.send 1)
discard <| EAsync.ofTask ( channel.send 2)
discard <| EAsync.ofTask ( channel.send 3)
channel.close
@@ -308,9 +308,9 @@ def selectableConditions : Async Unit := do
let subs2 channel.subscribe
let subs3 channel.subscribe
discard <| EAsync.ofETask ( channel.send 1)
discard <| EAsync.ofETask ( channel.send 2)
discard <| EAsync.ofETask ( channel.send 3)
discard <| EAsync.ofTask ( channel.send 1)
discard <| EAsync.ofTask ( channel.send 2)
discard <| EAsync.ofTask ( channel.send 3)
channel.close

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
/-- Test basic tree cancellation -/
partial def testCancelTree : IO Unit := do

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
/-- Test ContextAsync cancellation check -/
def testIsCancelled : IO Unit := do

View File

@@ -1,7 +1,7 @@
import Std.Internal.Async
import Std.Async
import Std.Sync
open Std.Internal.IO Async
open Std Async
-- Test basic wait and notifyOne functionality
def testBasicWaitNotifyOne : Async Unit := do

View File

@@ -1,8 +1,8 @@
import Std.Internal.Async.Signal
import Std.Internal.Async.Select
import Std.Internal.Async
import Std.Async.Signal
import Std.Async.Select
import Std.Async
open Std.Internal.IO.Async
open Std.Async
def assertBEq [BEq α] [Repr α] (actual expected : α) : IO Unit := do
unless actual == expected do