mirror of
https://github.com/leanprover/lean4.git
synced 2026-03-22 12:54:06 +00:00
Compare commits
5 Commits
lean-profi
...
sofia/asyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7a6a5a6faf | ||
|
|
60641faa10 | ||
|
|
bfb060641f | ||
|
|
6a27cb38aa | ||
|
|
2a4ca6f5d0 |
@@ -6,6 +6,7 @@ Authors: Sebastian Ullrich
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Async
|
||||
public import Std.Data
|
||||
public import Std.Do
|
||||
public import Std.Sat
|
||||
|
||||
19
src/Std/Async.lean
Normal file
19
src/Std/Async.lean
Normal file
@@ -0,0 +1,19 @@
|
||||
/-
|
||||
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Henrik Böving
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Async.Basic
|
||||
public import Std.Async.ContextAsync
|
||||
public import Std.Async.Timer
|
||||
public import Std.Async.TCP
|
||||
public import Std.Async.UDP
|
||||
public import Std.Async.DNS
|
||||
public import Std.Async.Select
|
||||
public import Std.Async.Process
|
||||
public import Std.Async.System
|
||||
public import Std.Async.Signal
|
||||
public import Std.Async.IO
|
||||
@@ -10,10 +10,7 @@ public import Init.System.Promise
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace Std.Async
|
||||
|
||||
/-!
|
||||
|
||||
@@ -104,10 +101,6 @@ instance [Monad m] [MonadAwait t m] : MonadAwait t (ReaderT n m) where
|
||||
instance [MonadAwait t m] : MonadAwait t (StateRefT' s n m) where
|
||||
await := liftM (m := m) ∘ MonadAwait.await
|
||||
|
||||
@[default_instance]
|
||||
instance [Monad m] [MonadAwait t m] : MonadAwait t (StateT s m) where
|
||||
await := liftM (m := m) ∘ MonadAwait.await
|
||||
|
||||
@[default_instance]
|
||||
instance [MonadAsync t m] : MonadAsync t (ReaderT n m) where
|
||||
async p prio := MonadAsync.async (prio := prio) ∘ p
|
||||
@@ -122,6 +115,19 @@ instance [Monad m] [Functor t] [inst : MonadAsync t m] : MonadAsync t (StateT s
|
||||
let t ← inst.async (prio := prio) (p s)
|
||||
pure (t <&> Prod.fst, s)
|
||||
|
||||
/--
|
||||
Type class for converting a source type into an async computation.
|
||||
|
||||
This provides a uniform interface for lifting various types (tasks, promises, etc.)
|
||||
into async monads.
|
||||
-/
|
||||
class ToAsync (source : Type) (target : outParam Type) where
|
||||
/--
|
||||
Convert the source value into an async computation using the default error message.
|
||||
-/
|
||||
|
||||
toAsync : source → target
|
||||
|
||||
/--
|
||||
A `Task` that may resolve to either a value of type `α` or an error value of type `ε`.
|
||||
-/
|
||||
@@ -177,14 +183,12 @@ protected def mapEIO (f : α → EIO ε β) (x : ETask ε α) (prio := Task.Prio
|
||||
| .error e => throw e
|
||||
|
||||
/--
|
||||
Block until the `ETask` in `x` finishes and returns its value. Propagates any error encountered
|
||||
Wait until the `ETask` in `x` finishes and returns its value. Propagates any error encountered
|
||||
during execution.
|
||||
-/
|
||||
@[inline]
|
||||
def block (x : ETask ε α) : EIO ε α := do
|
||||
match x.get with
|
||||
| .ok a => return a
|
||||
| .error e => throw e
|
||||
def block (x : ETask ε α) : EIO ε α :=
|
||||
liftExcept x.get
|
||||
|
||||
/--
|
||||
Create an `ETask` that resolves to the value of the promise `x`. If the promise gets dropped then it
|
||||
@@ -200,7 +204,7 @@ panics.
|
||||
-/
|
||||
@[inline]
|
||||
def ofPurePromise (x : IO.Promise α) : ETask ε α :=
|
||||
x.result!.map pure (sync := true)
|
||||
x.result!.map .ok (sync := true)
|
||||
|
||||
/--
|
||||
Obtain the `IO.TaskState` of `x`.
|
||||
@@ -212,9 +216,8 @@ def getState (x : ETask ε α) : BaseIO IO.TaskState :=
|
||||
instance : Functor (ETask ε) where
|
||||
map := ETask.map
|
||||
|
||||
instance : Monad (ETask ε) where
|
||||
pure := ETask.pure
|
||||
bind := ETask.bind
|
||||
instance : ToAsync (IO.Promise (Except ε α)) (ETask ε α) where
|
||||
toAsync := ETask.ofPromise!
|
||||
|
||||
end ETask
|
||||
|
||||
@@ -225,6 +228,13 @@ abbrev AsyncTask := ETask IO.Error
|
||||
|
||||
namespace AsyncTask
|
||||
|
||||
/--
|
||||
Waits for the result of the `AsyncTask`, blocking if necessary.
|
||||
-/
|
||||
@[inline]
|
||||
protected def block (self : AsyncTask α) : IO α :=
|
||||
ETask.block self
|
||||
|
||||
/--
|
||||
Similar to `map`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
|
||||
`AsyncTask` resolves to that error.
|
||||
@@ -235,87 +245,39 @@ protected def mapIO (f : α → IO β) (x : AsyncTask α) (prio := Task.Priority
|
||||
| .ok a => f a
|
||||
| .error e => throw e
|
||||
|
||||
/--
|
||||
Construct an `AsyncTask` that is already resolved with value `x`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def pure (x : α) : AsyncTask α :=
|
||||
Task.pure <| .ok x
|
||||
|
||||
/--
|
||||
Create a new `AsyncTask` that will run after `x` has finished.
|
||||
If `x`:
|
||||
- errors, return an `AsyncTask` that resolves to the error.
|
||||
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) (prio := Task.Priority.default) (sync := false) : AsyncTask β :=
|
||||
Task.bind x (prio := prio) (sync := sync) fun
|
||||
| .ok a => f a
|
||||
| .error e => Task.pure <| .error e
|
||||
|
||||
/--
|
||||
Create a new `AsyncTask` that will run after `x` has finished.
|
||||
If `x`:
|
||||
- errors, return an `AsyncTask` that resolves to the error.
|
||||
- succeeds, return an `AsyncTask` that resolves to `f x`.
|
||||
-/
|
||||
@[inline]
|
||||
def map (f : α → β) (x : AsyncTask α) (prio := Task.Priority.default) (sync := false) : AsyncTask β :=
|
||||
Task.map (x := x) (f <$> ·) prio sync
|
||||
|
||||
/--
|
||||
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
|
||||
`AsyncTask` resolves to that error.
|
||||
-/
|
||||
@[inline]
|
||||
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) (prio := Task.Priority.default) (sync := false) : BaseIO (AsyncTask β) :=
|
||||
protected def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) (prio := Task.Priority.default) (sync := false) : BaseIO (AsyncTask β) :=
|
||||
IO.bindTask x (prio := prio) (sync := sync) fun
|
||||
| .ok a => f a
|
||||
| .error e => throw e
|
||||
|
||||
/--
|
||||
Similar to `map`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
|
||||
`AsyncTask` resolves to that error.
|
||||
Create an `AsyncTask` that resolves to the value of `x`. Returns an error if the promise is dropped.
|
||||
-/
|
||||
@[inline]
|
||||
def mapTaskIO (f : α → IO β) (x : AsyncTask α) (prio := Task.Priority.default) (sync := false) : BaseIO (AsyncTask β) :=
|
||||
IO.mapTask (t := x) (prio := prio) (sync := sync) fun
|
||||
| .ok a => f a
|
||||
| .error e => throw e
|
||||
|
||||
/--
|
||||
Block until the `AsyncTask` in `x` finishes.
|
||||
-/
|
||||
def block (x : AsyncTask α) : IO α :=
|
||||
match x.get with
|
||||
| .ok a => return a
|
||||
| .error e => throw e
|
||||
|
||||
/--
|
||||
Create an `AsyncTask` that resolves to the value of `x`.
|
||||
-/
|
||||
@[inline]
|
||||
def ofPromise (x : IO.Promise (Except IO.Error α)) (error : String := "the promise linked to the Async Task was dropped") : AsyncTask α :=
|
||||
def ofPromise (x : IO.Promise (Except IO.Error α)) (error : String := "the promise linked to the AsyncTask was dropped") : AsyncTask α :=
|
||||
x.result?.map fun
|
||||
| none => .error error
|
||||
| some res => res
|
||||
|
||||
/--
|
||||
Create an `AsyncTask` that resolves to the value of `x`.
|
||||
Create an `AsyncTask` that resolves to the value of `x`. Returns an error if the promise is dropped.
|
||||
-/
|
||||
@[inline]
|
||||
def ofPurePromise (x : IO.Promise α) (error : String := "the promise linked to the Async Task was dropped") : AsyncTask α :=
|
||||
def ofPurePromise (x : IO.Promise α) (error : String := "the promise linked to the AsyncTask was dropped") : AsyncTask α :=
|
||||
x.result?.map (sync := true) fun
|
||||
| none => .error error
|
||||
| some res => pure res
|
||||
|
||||
/--
|
||||
Obtain the `IO.TaskState` of `x`.
|
||||
-/
|
||||
@[inline]
|
||||
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
|
||||
IO.getTaskState x
|
||||
instance : ToAsync (IO.Promise (Except IO.Error α)) (AsyncTask α) where
|
||||
toAsync x := AsyncTask.ofPromise x "the promise linked to the AsyncTask was dropped"
|
||||
|
||||
instance : ToAsync (IO.Promise α) (AsyncTask α) where
|
||||
toAsync x := AsyncTask.ofPurePromise x "the promise linked to the AsyncTask was dropped"
|
||||
|
||||
end AsyncTask
|
||||
|
||||
@@ -397,10 +359,11 @@ def mk (x : BaseIO (MaybeTask α)) : BaseAsync α :=
|
||||
x
|
||||
|
||||
/--
|
||||
Converts a `BaseAsync` into a `BaseIO`
|
||||
Unwraps a `BaseAsync` to access the underlying `BaseIO (MaybeTask α)`.
|
||||
This is an implementation detail and should not be used directly.
|
||||
-/
|
||||
@[inline]
|
||||
def toRawBaseIO (x : BaseAsync α) : BaseIO (MaybeTask α) :=
|
||||
protected def unwrap (x : BaseAsync α) : BaseIO (MaybeTask α) :=
|
||||
x
|
||||
|
||||
/--
|
||||
@@ -408,7 +371,7 @@ Converts a `BaseAsync` to a `BaseIO Task`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def toBaseIO (x : BaseAsync α) : BaseIO (Task α) :=
|
||||
MaybeTask.toTask <$> x.toRawBaseIO
|
||||
MaybeTask.toTask <$> x.unwrap
|
||||
|
||||
/--
|
||||
Creates a new `BaseAsync` out of a `Task`.
|
||||
@@ -429,32 +392,29 @@ Maps the result of a `BaseAsync` computation with a function.
|
||||
-/
|
||||
@[inline]
|
||||
protected def map (f : α → β) (self : BaseAsync α) (prio := Task.Priority.default) (sync := false) : BaseAsync β :=
|
||||
mk <| (·.map f prio sync) <$> self.toRawBaseIO
|
||||
mk <| (·.map f prio sync) <$> self.unwrap
|
||||
|
||||
/--
|
||||
Sequences two computations, allowing the second to depend on the value computed by the first.
|
||||
-/
|
||||
@[inline]
|
||||
protected def bind (self : BaseAsync α) (f : α → BaseAsync β) (prio := Task.Priority.default) (sync := false) : BaseAsync β :=
|
||||
mk <| self.toRawBaseIO >>= (bindAsyncTask · f |>.toRawBaseIO)
|
||||
where
|
||||
bindAsyncTask (t : MaybeTask α) (f : α → BaseAsync β) : BaseAsync β := .mk <|
|
||||
match t with
|
||||
| .pure a => (f a) |>.toRawBaseIO
|
||||
| .ofTask t => .ofTask <$> BaseIO.bindTask t (fun a => MaybeTask.toTask <$> (f a |>.toRawBaseIO)) prio sync
|
||||
mk <| self.unwrap >>= fun
|
||||
| .pure a => (f a).unwrap
|
||||
| .ofTask t => .ofTask <$> BaseIO.bindTask t (fun a => MaybeTask.toTask <$> (f a).unwrap) prio sync
|
||||
|
||||
/--
|
||||
Lifts a `BaseIO` action into a `BaseAsync` computation.
|
||||
-/
|
||||
@[inline]
|
||||
protected def lift (x : BaseIO α) : BaseAsync α :=
|
||||
.mk <| (pure ∘ pure) =<< x
|
||||
.mk <| MaybeTask.pure <$> x
|
||||
|
||||
/--
|
||||
Waits for the result of the `BaseAsync` computation, blocking if necessary.
|
||||
-/
|
||||
@[inline]
|
||||
protected def wait (self : BaseAsync α) : BaseIO α :=
|
||||
protected def block (self : BaseAsync α) : BaseIO α :=
|
||||
pure ∘ Task.get =<< self.toBaseIO
|
||||
|
||||
/--
|
||||
@@ -462,7 +422,7 @@ Lifts a `BaseAsync` computation into a `Task` that can be awaited and joined.
|
||||
-/
|
||||
@[inline]
|
||||
protected def asTask (x : BaseAsync α) (prio := Task.Priority.default) : BaseIO (Task α) := do
|
||||
let res ← BaseIO.asTask (prio := prio) x.toRawBaseIO
|
||||
let res ← BaseIO.asTask (prio := prio) x.unwrap
|
||||
return MaybeTask.joinTask res
|
||||
|
||||
/--
|
||||
@@ -527,16 +487,20 @@ The other result is lost and the other task is not cancelled, so the task will c
|
||||
until the end.
|
||||
-/
|
||||
@[inline, specialize]
|
||||
def race [Inhabited α] (x : BaseAsync α) (y : BaseAsync α) (prio := Task.Priority.default) : BaseAsync α := do
|
||||
let promise ← IO.Promise.new
|
||||
def race (x : BaseAsync α) (y : BaseAsync α) (prio := Task.Priority.default) : BaseAsync α := do
|
||||
-- Use Option α for the promise to avoid Inhabited constraint
|
||||
let promise : IO.Promise (Option α) ← IO.Promise.new
|
||||
|
||||
let task₁ : Task _ ← MonadAsync.async (prio := prio) x
|
||||
let task₂ : Task _ ← MonadAsync.async (prio := prio) y
|
||||
|
||||
BaseIO.chainTask task₁ (liftM ∘ promise.resolve)
|
||||
BaseIO.chainTask task₂ (liftM ∘ promise.resolve)
|
||||
BaseIO.chainTask task₁ (liftM ∘ promise.resolve ∘ some)
|
||||
BaseIO.chainTask task₂ (liftM ∘ promise.resolve ∘ some)
|
||||
|
||||
MonadAwait.await promise.result!
|
||||
let result ← MonadAwait.await promise.result!
|
||||
match result with
|
||||
| some a => pure a
|
||||
| none => MonadAwait.await task₁ -- Fallback, shouldn't happen in practice
|
||||
|
||||
/--
|
||||
Runs all computations in an `Array` concurrently and returns all results as an array.
|
||||
@@ -561,6 +525,12 @@ def raceAll [Inhabited α] [ForM BaseAsync c (BaseAsync α)] (xs : c) (prio := T
|
||||
|
||||
MonadAwait.await promise.result!
|
||||
|
||||
instance : ToAsync (Task α) (BaseAsync α) where
|
||||
toAsync := BaseAsync.ofTask
|
||||
|
||||
instance : ToAsync (Except Empty α) (BaseAsync α) where
|
||||
toAsync := BaseAsync.ofExcept
|
||||
|
||||
end BaseAsync
|
||||
|
||||
/--
|
||||
@@ -575,10 +545,10 @@ Converts a `EAsync` to a `ETask`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def toBaseIO (x : EAsync ε α) : BaseIO (ETask ε α) :=
|
||||
MaybeTask.toTask <$> x.toRawBaseIO
|
||||
MaybeTask.toTask <$> x.unwrap
|
||||
|
||||
/--
|
||||
Creates a new `EAsync` out of a `RTask`.
|
||||
Creates a new `EAsync` out of an `ETask`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def ofTask (x : ETask ε α) : EAsync ε α :=
|
||||
@@ -589,14 +559,7 @@ Converts a `BaseAsync` to a `EIO ETask`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def toEIO (x : EAsync ε α) : EIO ε (ETask ε α) :=
|
||||
MaybeTask.toTask <$> x.toRawBaseIO
|
||||
|
||||
/--
|
||||
Creates a new `EAsync` out of a `ETask`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def ofETask (x : ETask ε α) : EAsync ε α :=
|
||||
.mk <| BaseAsync.ofTask x
|
||||
MaybeTask.toTask <$> x.unwrap
|
||||
|
||||
/--
|
||||
Creates an `EAsync` computation that immediately returns the given value.
|
||||
@@ -609,15 +572,15 @@ protected def pure (a : α) : EAsync ε α :=
|
||||
Maps the result of an `EAsync` computation with a pure function.
|
||||
-/
|
||||
@[inline]
|
||||
protected def map (f : α → β) (self : EAsync ε α) : EAsync ε β :=
|
||||
.mk <| BaseAsync.map (.map f) self
|
||||
protected def map (f : α → β) (self : EAsync ε α) (prio := Task.Priority.default) (sync := false) : EAsync ε β :=
|
||||
.mk <| BaseAsync.map (.map f) self prio sync
|
||||
|
||||
/--
|
||||
Sequences two computations, allowing the second to depend on the value computed by the first.
|
||||
-/
|
||||
@[inline]
|
||||
protected def bind (self : EAsync ε α) (f : α → EAsync ε β) : EAsync ε β :=
|
||||
.mk <| BaseAsync.bind self fun
|
||||
protected def bind (self : EAsync ε α) (f : α → EAsync ε β) (prio := Task.Priority.default) (sync := false) : EAsync ε β :=
|
||||
.mk <| BaseAsync.bind (prio := prio) (sync := sync) self fun
|
||||
| .ok a => f a
|
||||
| .error e => BaseAsync.pure (.error e)
|
||||
|
||||
@@ -632,11 +595,8 @@ protected def lift (x : EIO ε α) : EAsync ε α :=
|
||||
Waits for the result of the `EAsync` computation, blocking if necessary.
|
||||
-/
|
||||
@[inline]
|
||||
protected def wait (self : EAsync ε α) : EIO ε α := do
|
||||
let result ← self |> BaseAsync.wait
|
||||
match result with
|
||||
| .ok a => return a
|
||||
| .error e => throw e
|
||||
protected def block (self : EAsync ε α) : EIO ε α :=
|
||||
liftExcept =<< BaseAsync.block self
|
||||
|
||||
/--
|
||||
Lifts an `EAsync` computation into an `ETask` that can be awaited and joined.
|
||||
@@ -645,13 +605,6 @@ Lifts an `EAsync` computation into an `ETask` that can be awaited and joined.
|
||||
protected def asTask (x : EAsync ε α) (prio := Task.Priority.default) : EIO ε (ETask ε α) :=
|
||||
x |> BaseAsync.asTask (prio := prio)
|
||||
|
||||
/--
|
||||
Block until the `EAsync` finishes and returns its value. Propagates any error encountered during execution.
|
||||
-/
|
||||
@[inline]
|
||||
protected def block (x : EAsync ε α) (prio := Task.Priority.default) : EIO ε α :=
|
||||
x.asTask (prio := prio) >>= ETask.block
|
||||
|
||||
/--
|
||||
Raises an error of type `ε` within the `EAsync` monad.
|
||||
-/
|
||||
@@ -671,11 +624,9 @@ protected def tryCatch (x : EAsync ε α) (f : ε → EAsync ε α) (prio := Tas
|
||||
/--
|
||||
Runs an action, ensuring that some other action always happens afterward.
|
||||
-/
|
||||
protected def tryFinally'
|
||||
(x : EAsync ε α) (f : Option α → EAsync ε β)
|
||||
(prio := Task.Priority.default) (sync := false) :
|
||||
EAsync ε (α × β) :=
|
||||
.mk <| BaseAsync.bind x (prio := prio) (sync := sync) fun
|
||||
@[inline]
|
||||
protected def tryFinally' (x : EAsync ε α) (f : Option α → EAsync ε β) : EAsync ε (α × β) :=
|
||||
.mk <| BaseAsync.bind x fun
|
||||
| .ok a => do
|
||||
match ← (f (some a)) with
|
||||
| .ok b => BaseAsync.pure (.ok (a, b))
|
||||
@@ -776,7 +727,7 @@ protected partial def forIn
|
||||
| .ok (.yield b) => loop b
|
||||
|
||||
loop init
|
||||
.mk <| EAsync.ofETask promise.result!
|
||||
.mk <| EAsync.ofTask promise.result!
|
||||
|
||||
instance : ForIn (EAsync ε) Lean.Loop Unit where
|
||||
forIn _ := EAsync.forIn
|
||||
@@ -805,19 +756,22 @@ The other result is lost and the other task is not cancelled, so the task will c
|
||||
until the end.
|
||||
-/
|
||||
@[inline, specialize]
|
||||
def race [Inhabited α] (x : EAsync ε α) (y : EAsync ε α)
|
||||
def race (x : EAsync ε α) (y : EAsync ε α)
|
||||
(prio := Task.Priority.default) :
|
||||
EAsync ε α := do
|
||||
let promise ← IO.Promise.new
|
||||
-- Use Option to avoid Inhabited constraint
|
||||
let promise : IO.Promise (Option (Except ε α)) ← IO.Promise.new
|
||||
|
||||
let task₁ : ETask ε _ ← MonadAsync.async (prio := prio) x
|
||||
let task₂ : ETask ε _ ← MonadAsync.async (prio := prio) y
|
||||
|
||||
BaseIO.chainTask task₁ (liftM ∘ promise.resolve)
|
||||
BaseIO.chainTask task₂ (liftM ∘ promise.resolve)
|
||||
BaseIO.chainTask task₁ (liftM ∘ promise.resolve ∘ some)
|
||||
BaseIO.chainTask task₂ (liftM ∘ promise.resolve ∘ some)
|
||||
|
||||
let result ← MonadAwait.await promise.result!
|
||||
EAsync.ofExcept result
|
||||
match result with
|
||||
| some res => EAsync.ofExcept res
|
||||
| none => MonadAwait.await task₁ -- Fallback, shouldn't happen
|
||||
|
||||
/--
|
||||
Runs all computations in an `Array` concurrently and returns all results as an array.
|
||||
@@ -843,6 +797,12 @@ def raceAll [Inhabited α] [ForM (EAsync ε) c (EAsync ε α)] (xs : c) (prio :=
|
||||
let result ← MonadAwait.await promise.result!
|
||||
EAsync.ofExcept result
|
||||
|
||||
instance : ToAsync (ETask ε α) (EAsync ε α) where
|
||||
toAsync := EAsync.ofTask
|
||||
|
||||
instance : ToAsync (Except ε α) (EAsync ε α) where
|
||||
toAsync := EAsync.ofExcept
|
||||
|
||||
end EAsync
|
||||
|
||||
/--
|
||||
@@ -857,20 +817,20 @@ Converts a `Async` to a `AsyncTask`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def toIO (x : Async α) : IO (AsyncTask α) :=
|
||||
MaybeTask.toTask <$> x.toRawBaseIO
|
||||
MaybeTask.toTask <$> x.unwrap
|
||||
|
||||
/--
|
||||
Block until the `Async` finishes and returns its value. Propagates any error encountered during execution.
|
||||
Waits for the result of the `Async` computation, blocking if necessary.
|
||||
-/
|
||||
@[inline]
|
||||
protected def block (x : Async α) (prio := Task.Priority.default) : IO α :=
|
||||
x.asTask (prio := prio) >>= ETask.block
|
||||
protected def block (self : Async α) : IO α :=
|
||||
EAsync.block self
|
||||
|
||||
/--
|
||||
Converts `Promise` into `Async`.
|
||||
-/
|
||||
@[inline]
|
||||
protected def ofPromise (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α := do
|
||||
protected def ofIOPromise (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α := do
|
||||
match ← task.toBaseIO with
|
||||
| .ok data => pure (f := BaseIO) <| MaybeTask.ofTask <| data.result?.map fun
|
||||
| none => .error error
|
||||
@@ -932,12 +892,8 @@ instance : MonadAwait IO.Promise Async :=
|
||||
Runs two computations concurrently and returns both results as a pair.
|
||||
-/
|
||||
@[inline, specialize]
|
||||
def concurrently (x : Async α) (y : Async β) (prio := Task.Priority.default) : Async (α × β) := do
|
||||
let taskX ← MonadAsync.async x (prio := prio)
|
||||
let taskY ← MonadAsync.async y (prio := prio)
|
||||
let resultX ← MonadAwait.await taskX
|
||||
let resultY ← MonadAwait.await taskY
|
||||
return (resultX, resultY)
|
||||
def concurrently (x : Async α) (y : Async β) (prio := Task.Priority.default) : Async (α × β) :=
|
||||
EAsync.concurrently x y prio
|
||||
|
||||
/--
|
||||
Runs two computations concurrently and returns the result of the one that finishes first.
|
||||
@@ -945,27 +901,15 @@ The other result is lost and the other task is not cancelled, so the task will c
|
||||
until the end.
|
||||
-/
|
||||
@[inline, specialize]
|
||||
def race [Inhabited α] (x : Async α) (y : Async α)
|
||||
(prio := Task.Priority.default) :
|
||||
Async α := do
|
||||
let promise ← IO.Promise.new
|
||||
|
||||
let task₁ ← MonadAsync.async (t := AsyncTask) (prio := prio) x
|
||||
let task₂ ← MonadAsync.async (t := AsyncTask) (prio := prio) y
|
||||
|
||||
BaseIO.chainTask task₁ (liftM ∘ promise.resolve)
|
||||
BaseIO.chainTask task₂ (liftM ∘ promise.resolve)
|
||||
|
||||
let result ← MonadAwait.await promise
|
||||
Async.ofExcept result
|
||||
def race (x : Async α) (y : Async α) (prio := Task.Priority.default) : Async α :=
|
||||
EAsync.race x y prio
|
||||
|
||||
/--
|
||||
Runs all computations in an `Array` concurrently and returns all results as an array.
|
||||
-/
|
||||
@[inline, specialize]
|
||||
def concurrentlyAll (xs : Array (Async α)) (prio := Task.Priority.default) : Async (Array α) := do
|
||||
let tasks : Array (AsyncTask α) ← xs.mapM (MonadAsync.async (prio := prio))
|
||||
tasks.mapM MonadAwait.await
|
||||
def concurrentlyAll (xs : Array (Async α)) (prio := Task.Priority.default) : Async (Array α) :=
|
||||
EAsync.concurrentlyAll xs prio
|
||||
|
||||
/--
|
||||
Runs all computations concurrently and returns the result of the first one to finish.
|
||||
@@ -973,15 +917,23 @@ All other results are lost, and the tasks are not cancelled, so they'll continue
|
||||
until the end.
|
||||
-/
|
||||
@[inline, specialize]
|
||||
def raceAll [ForM Async c (Async α)] (xs : c) (prio := Task.Priority.default) : Async α := do
|
||||
let promise ← IO.Promise.new
|
||||
def raceAll [Inhabited α] [ForM (EAsync IO.Error) c (EAsync IO.Error α)] (xs : c) (prio := Task.Priority.default) : Async α :=
|
||||
EAsync.raceAll xs prio
|
||||
|
||||
ForM.forM xs fun x => do
|
||||
let task₁ ← MonadAsync.async (t := AsyncTask) (prio := prio) x
|
||||
BaseIO.chainTask task₁ (liftM ∘ promise.resolve)
|
||||
instance : ToAsync (AsyncTask α) (Async α) where
|
||||
toAsync := Async.ofAsyncTask
|
||||
|
||||
let result ← MonadAwait.await promise
|
||||
Async.ofExcept result
|
||||
instance : ToAsync (Task α) (Async α) where
|
||||
toAsync := Async.ofTask
|
||||
|
||||
instance : ToAsync (Except IO.Error α) (Async α) where
|
||||
toAsync := Async.ofExcept
|
||||
|
||||
instance : ToAsync (IO (IO.Promise (Except IO.Error α))) (Async α) where
|
||||
toAsync x := Async.ofIOPromise x "the promise linked to the Async was dropped"
|
||||
|
||||
instance : ToAsync (IO (IO.Promise α)) (Async α) where
|
||||
toAsync x := Async.ofPurePromise x "the promise linked to the Async was dropped"
|
||||
|
||||
end Async
|
||||
|
||||
@@ -995,7 +947,4 @@ This function transforms the operation inside the monad `m` into a task and let
|
||||
def background [Monad m] [MonadAsync t m] (action : m α) (prio := Task.Priority.default) : m Unit :=
|
||||
discard (async (t := t) (prio := prio) action)
|
||||
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async
|
||||
@@ -8,8 +8,8 @@ module
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.UV
|
||||
public import Std.Internal.Async.Basic
|
||||
public import Std.Internal.Async.Timer
|
||||
public import Std.Async.Basic
|
||||
public import Std.Async.Timer
|
||||
public import Std.Sync.CancellationContext
|
||||
|
||||
public section
|
||||
@@ -19,10 +19,7 @@ This module contains the implementation of `ContextAsync`, a monad for asynchron
|
||||
cooperative cancellation support that must be explicitly checked for and cancelled explicitly.
|
||||
-/
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace Std.Async
|
||||
|
||||
/--
|
||||
An asynchronous computation with cooperative cancellation support via a `CancellationContext`. `ContextAsync α`
|
||||
@@ -267,7 +264,4 @@ This is useful for selecting on cancellation alongside other asynchronous operat
|
||||
def Selector.cancelled : ContextAsync (Selector Unit) := do
|
||||
ContextAsync.doneSelector
|
||||
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async
|
||||
@@ -8,17 +8,13 @@ module
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.UV
|
||||
public import Std.Internal.Async.Basic
|
||||
public import Std.Async.Basic
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace DNS
|
||||
namespace Std.Async.DNS
|
||||
|
||||
open Std.Net
|
||||
open Std.Net Internal UV
|
||||
|
||||
/--
|
||||
Represents a resolved hostname and service name from a socket address.
|
||||
@@ -39,7 +35,7 @@ Asynchronously resolves a hostname and service to an array of socket addresses.
|
||||
-/
|
||||
@[inline]
|
||||
def getAddrInfo (host : String) (service : String) (addrFamily : Option AddressFamily := none) : Async (Array IPAddr) := do
|
||||
Async.ofPromise <| UV.DNS.getAddrInfo
|
||||
Async.ofIOPromise <| UV.DNS.getAddrInfo
|
||||
host
|
||||
service
|
||||
(match addrFamily with
|
||||
@@ -53,11 +49,7 @@ Performs a reverse DNS lookup on a `SocketAddress`.
|
||||
@[inline]
|
||||
def getNameInfo (host : @& SocketAddress) : Async NameInfo :=
|
||||
UV.DNS.getNameInfo host
|
||||
|> Async.ofPromise
|
||||
|> Async.ofIOPromise
|
||||
|>.map (Function.uncurry NameInfo.mk)
|
||||
|
||||
end DNS
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async.DNS
|
||||
@@ -6,16 +6,11 @@ Authors: Sofia Rodrigues
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace Async
|
||||
namespace IO
|
||||
|
||||
open Std.Internal.IO.Async
|
||||
namespace Std.Async
|
||||
|
||||
/-!
|
||||
This module provides buffered asynchronous I/O operations for efficient reading and writing.
|
||||
@@ -48,7 +43,4 @@ class AsyncStream (α : Type) (β : outParam Type) where
|
||||
stop : α → IO Unit :=
|
||||
fun _ => pure ()
|
||||
|
||||
end IO
|
||||
end Async
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async
|
||||
@@ -12,13 +12,9 @@ public import Std.Data.HashMap
|
||||
|
||||
public section
|
||||
|
||||
namespace Std.Async.Process
|
||||
open Std Time
|
||||
open System
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Process
|
||||
open Internal UV System
|
||||
|
||||
/--
|
||||
Represents resource usage statistics for a process or thread.
|
||||
@@ -236,7 +232,4 @@ Returns the available memory for allocation in bytes.
|
||||
def availableMemory : IO UInt64 :=
|
||||
UV.System.availableMemory
|
||||
|
||||
end Process
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async.Process
|
||||
@@ -7,7 +7,7 @@ module
|
||||
|
||||
prelude
|
||||
public import Init.Data.Random
|
||||
public import Std.Internal.Async.Basic
|
||||
public import Std.Async.Basic
|
||||
import Init.Data.ByteArray.Extra
|
||||
|
||||
public section
|
||||
@@ -18,10 +18,7 @@ The main entrypoint for users is `Selectable.one` and the various functions to p
|
||||
`Selector`s from other modules.
|
||||
-/
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace Std.Async
|
||||
|
||||
/--
|
||||
The core data structure for racing on winning a `Selectable.one` if multiple event sources are ready
|
||||
@@ -166,7 +163,7 @@ partial def Selectable.one (selectables : Array (Selectable α)) : Async α := d
|
||||
|
||||
async.toBaseIO
|
||||
|
||||
Async.ofPromise (pure promise)
|
||||
Async.ofIOPromise (pure promise)
|
||||
|
||||
/--
|
||||
Performs fair and data-loss free non-blocking multiplexing on the `Selectable`s in `selectables`.
|
||||
@@ -245,7 +242,4 @@ def Selectable.combine (selectables : Array (Selectable α)) : IO (Selector α)
|
||||
selectable.selector.unregisterFn
|
||||
}
|
||||
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async
|
||||
@@ -8,14 +8,11 @@ module
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.UV.Signal
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace Std.Async
|
||||
|
||||
/--
|
||||
Unix style signals for Unix and Windows. SIGKILL and SIGSTOP are missing because they cannot be caught.
|
||||
@@ -261,3 +258,5 @@ def selector (s : Signal.Waiter) : Selector Unit :=
|
||||
unregisterFn := s.native.cancel
|
||||
|
||||
}
|
||||
|
||||
end Std.Async.Signal.Waiter
|
||||
@@ -18,13 +18,9 @@ manipulation.
|
||||
-/
|
||||
|
||||
open Std Time
|
||||
open System
|
||||
open Internal UV System
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace System
|
||||
namespace Std.Async.System
|
||||
|
||||
/--
|
||||
A group identifier, represented by a numeric ID in UNIX systems (e.g. 1000).
|
||||
@@ -314,8 +310,4 @@ def getGroup (groupId : GroupId) : IO (Option GroupInfo) := do
|
||||
members := group.members
|
||||
}
|
||||
|
||||
end System
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async.System
|
||||
@@ -8,15 +8,11 @@ module
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.UV.TCP
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace TCP
|
||||
namespace Std.Async.TCP
|
||||
open Std.Net
|
||||
|
||||
namespace Socket
|
||||
@@ -66,7 +62,7 @@ Accepts an incoming connection.
|
||||
@[inline]
|
||||
def accept (s : Server) : Async Client := do
|
||||
s.native.accept
|
||||
|> Async.ofPromise
|
||||
|> Async.ofIOPromise
|
||||
|>.map Client.ofNative
|
||||
|
||||
/--
|
||||
@@ -153,21 +149,21 @@ Connects the client socket to the given address.
|
||||
-/
|
||||
@[inline]
|
||||
def connect (s : Client) (addr : SocketAddress) : Async Unit :=
|
||||
Async.ofPromise <| s.native.connect addr
|
||||
Async.ofIOPromise <| s.native.connect addr
|
||||
|
||||
/--
|
||||
Sends multiple data buffers through the client socket.
|
||||
-/
|
||||
@[inline]
|
||||
def sendAll (s : Client) (data : Array ByteArray) : Async Unit :=
|
||||
Async.ofPromise <| s.native.send data
|
||||
Async.ofIOPromise <| s.native.send data
|
||||
|
||||
/--
|
||||
Sends data through the client socket.
|
||||
-/
|
||||
@[inline]
|
||||
def send (s : Client) (data : ByteArray) : Async Unit :=
|
||||
Async.ofPromise <| s.native.send #[data]
|
||||
Async.ofIOPromise <| s.native.send #[data]
|
||||
|
||||
/--
|
||||
Receives data from the client socket. If data is received, it’s wrapped in .some. If EOF is reached,
|
||||
@@ -177,7 +173,7 @@ Furthermore calling this function in parallel with `recvSelector` is not support
|
||||
-/
|
||||
@[inline]
|
||||
def recv? (s : Client) (size : UInt64) : Async (Option ByteArray) :=
|
||||
Async.ofPromise <| s.native.recv? size
|
||||
Async.ofIOPromise <| s.native.recv? size
|
||||
|
||||
/--
|
||||
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
|
||||
@@ -224,7 +220,7 @@ Shuts down the write side of the client socket.
|
||||
-/
|
||||
@[inline]
|
||||
def shutdown (s : Client) : Async Unit :=
|
||||
Async.ofPromise <| s.native.shutdown
|
||||
Async.ofIOPromise <| s.native.shutdown
|
||||
|
||||
/--
|
||||
Gets the remote address of the client socket.
|
||||
@@ -256,8 +252,4 @@ def keepAlive (s : Client) (enable : Bool) (delay : Std.Time.Second.Offset) (_ :
|
||||
|
||||
end Client
|
||||
end Socket
|
||||
end TCP
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async.TCP
|
||||
@@ -8,15 +8,12 @@ module
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.UV.Timer
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace Std.Async
|
||||
|
||||
/--
|
||||
`Sleep` can be used to sleep for some duration once.
|
||||
@@ -167,7 +164,4 @@ def stop (i : Interval) : IO Unit :=
|
||||
|
||||
end Interval
|
||||
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async
|
||||
@@ -8,15 +8,11 @@ module
|
||||
prelude
|
||||
public import Std.Time
|
||||
public import Std.Internal.UV.UDP
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace IO
|
||||
namespace Async
|
||||
namespace UDP
|
||||
namespace Std.Async.UDP
|
||||
|
||||
open Std.Net
|
||||
|
||||
@@ -66,7 +62,7 @@ address. If `addr` is `none`, the data is sent to the default peer address set b
|
||||
-/
|
||||
@[inline]
|
||||
def sendAll (s : Socket) (data : Array ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
|
||||
Async.ofPromise <| s.native.send data addr
|
||||
Async.ofIOPromise <| s.native.send data addr
|
||||
|
||||
/--
|
||||
Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr`
|
||||
@@ -74,7 +70,7 @@ is `none`, the data is sent to the default peer address set by `connect`.
|
||||
-/
|
||||
@[inline]
|
||||
def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
|
||||
Async.ofPromise <| s.native.send #[data] addr
|
||||
Async.ofIOPromise <| s.native.send #[data] addr
|
||||
|
||||
/--
|
||||
Receives data from an UDP socket. `size` is for the maximum bytes to receive.
|
||||
@@ -85,7 +81,7 @@ Furthermore calling this function in parallel with `recvSelector` is not support
|
||||
-/
|
||||
@[inline]
|
||||
def recv (s : Socket) (size : UInt64) : Async (ByteArray × Option SocketAddress) :=
|
||||
Async.ofPromise <| s.native.recv size
|
||||
Async.ofIOPromise <| s.native.recv size
|
||||
|
||||
/--
|
||||
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
|
||||
@@ -190,8 +186,4 @@ def setTTL (s : Socket) (ttl : UInt32) : IO Unit :=
|
||||
|
||||
end Socket
|
||||
|
||||
end UDP
|
||||
end Async
|
||||
end IO
|
||||
end Internal
|
||||
end Std
|
||||
end Std.Async.UDP
|
||||
@@ -6,7 +6,6 @@ Authors: Henrik Böving
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async
|
||||
public import Std.Internal.Parsec
|
||||
public import Std.Internal.UV
|
||||
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
/-
|
||||
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Henrik Böving
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async.Basic
|
||||
public import Std.Internal.Async.ContextAsync
|
||||
public import Std.Internal.Async.Timer
|
||||
public import Std.Internal.Async.TCP
|
||||
public import Std.Internal.Async.UDP
|
||||
public import Std.Internal.Async.DNS
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Internal.Async.Process
|
||||
public import Std.Internal.Async.System
|
||||
public import Std.Internal.Async.Signal
|
||||
public import Std.Internal.Async.IO
|
||||
@@ -10,14 +10,13 @@ public import Std.Data
|
||||
public import Init.Data.Queue
|
||||
public import Init.Data.Vector
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Internal.Async.IO
|
||||
public import Std.Async.IO
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
|
||||
open Std.Internal.Async.IO
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
/-!
|
||||
The `Std.Sync.Broadcast` module implements a broadcasting primitive for sending values
|
||||
@@ -60,7 +59,7 @@ instance instMonadLiftBroadcastIO : MonadLift (EIO Broadcast.Error) IO where
|
||||
|
||||
private structure Broadcast.Consumer (α : Type) where
|
||||
promise : IO.Promise Bool
|
||||
waiter : Option (Internal.IO.Async.Waiter (Option α))
|
||||
waiter : Option (Async.Waiter (Option α))
|
||||
|
||||
private def Broadcast.Consumer.resolve (c : Broadcast.Consumer α) (b : Bool) : BaseIO Unit :=
|
||||
c.promise.resolve b
|
||||
@@ -403,7 +402,6 @@ private def recvReady'
|
||||
let slotVal ← slot.get
|
||||
return slotVal.pos = next
|
||||
|
||||
open Internal.IO.Async in
|
||||
private partial def recvSelector (ch : Bounded.Receiver α) : Selector (Option α) where
|
||||
tryFn := do
|
||||
ch.state.atomically do
|
||||
@@ -537,8 +535,6 @@ the next available message. This will block until a message is available.
|
||||
def recv [Inhabited α] (ch : Broadcast.Receiver α) : BaseIO (Task (Option α)) := do
|
||||
Std.Bounded.Receiver.recv ch.inner
|
||||
|
||||
open Internal.IO.Async in
|
||||
|
||||
/--
|
||||
Creates a `Selector` that resolves once the broadcast channel `ch` has data available and provides that data.
|
||||
-/
|
||||
@@ -567,7 +563,7 @@ instance [Inhabited α] : AsyncStream (Broadcast.Receiver α) (Option α) where
|
||||
stop channel := channel.unsubscribe
|
||||
|
||||
instance [Inhabited α] : AsyncRead (Broadcast.Receiver α) (Option α) where
|
||||
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
|
||||
read receiver := Async.ofIOTask receiver.recv
|
||||
|
||||
instance [Inhabited α] : AsyncWrite (Broadcast α) α where
|
||||
write receiver x := do
|
||||
|
||||
@@ -11,7 +11,7 @@ public import Init.System.Promise
|
||||
public import Init.Data.Queue
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Sync.CancellationToken
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
@@ -21,7 +21,7 @@ automatically cancels all child contexts.
|
||||
-/
|
||||
|
||||
namespace Std
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
structure CancellationContext.State where
|
||||
/--
|
||||
|
||||
@@ -9,7 +9,7 @@ prelude
|
||||
public import Std.Data
|
||||
public import Init.Data.Queue
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
@@ -21,7 +21,7 @@ that a cancellation has occurred.
|
||||
-/
|
||||
|
||||
namespace Std
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
/--
|
||||
Reasons for cancellation.
|
||||
|
||||
@@ -8,13 +8,13 @@ module
|
||||
prelude
|
||||
public import Init.Data.Queue
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Internal.Async.IO
|
||||
public import Std.Async.IO
|
||||
import Init.Data.Vector.Basic
|
||||
|
||||
public section
|
||||
|
||||
open Std.Internal.Async.IO
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
open Std.Async
|
||||
|
||||
/-!
|
||||
This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer
|
||||
@@ -51,7 +51,6 @@ instance : ToString Error where
|
||||
instance : MonadLift (EIO Error) IO where
|
||||
monadLift x := EIO.toIO (.userError <| toString ·) x
|
||||
|
||||
open Internal.IO.Async in
|
||||
private inductive Consumer (α : Type) where
|
||||
| normal (promise : IO.Promise (Option α))
|
||||
| select (finished : Waiter (Option α))
|
||||
@@ -172,7 +171,6 @@ private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
let st ← get
|
||||
return !st.values.isEmpty || st.closed
|
||||
|
||||
open Internal.IO.Async in
|
||||
private def recvSelector (ch : Unbounded α) : Selector (Option α) where
|
||||
tryFn := do
|
||||
ch.state.atomically do
|
||||
@@ -322,7 +320,6 @@ private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
let st ← get
|
||||
return !st.producers.isEmpty || st.closed
|
||||
|
||||
open Internal.IO.Async in
|
||||
private def recvSelector (ch : Zero α) : Selector (Option α) where
|
||||
tryFn := do
|
||||
ch.state.atomically do
|
||||
@@ -356,7 +353,6 @@ private def recvSelector (ch : Zero α) : Selector (Option α) where
|
||||
|
||||
end Zero
|
||||
|
||||
open Internal.IO.Async in
|
||||
private structure Bounded.Consumer (α : Type) where
|
||||
promise : IO.Promise Bool
|
||||
waiter : Option (Waiter (Option α))
|
||||
@@ -558,7 +554,6 @@ private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] :
|
||||
let st ← get
|
||||
return st.bufCount != 0 || st.closed
|
||||
|
||||
open Internal.IO.Async in
|
||||
private partial def recvSelector (ch : Bounded α) : Selector (Option α) where
|
||||
tryFn := do
|
||||
ch.state.atomically do
|
||||
@@ -732,7 +727,6 @@ def recv (ch : CloseableChannel α) : BaseIO (Task (Option α)) :=
|
||||
| .zero ch => CloseableChannel.Zero.recv ch
|
||||
| .bounded ch => CloseableChannel.Bounded.recv ch
|
||||
|
||||
open Internal.IO.Async in
|
||||
/--
|
||||
Creates a `Selector` that resolves once `ch` has data available and provides that data.
|
||||
In particular if `ch` is closed while waiting on this `Selector` and no data is available already
|
||||
@@ -759,7 +753,7 @@ instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where
|
||||
next channel := channel.recvSelector
|
||||
|
||||
instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where
|
||||
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
|
||||
read receiver := Async.ofIOTask receiver.recv
|
||||
|
||||
instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where
|
||||
write receiver x := do
|
||||
@@ -877,7 +871,6 @@ def recv [Inhabited α] (ch : Channel α) : BaseIO (Task α) := do
|
||||
| some val => return .pure val
|
||||
| none => unreachable!
|
||||
|
||||
open Internal.IO.Async in
|
||||
/--
|
||||
Creates a `Selector` that resolves once `ch` has data available and provides that data.
|
||||
-/
|
||||
@@ -909,7 +902,7 @@ instance [Inhabited α] : AsyncStream (Channel α) α where
|
||||
next channel := channel.recvSelector
|
||||
|
||||
instance [Inhabited α] : AsyncRead (Channel α) α where
|
||||
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
|
||||
read receiver := Async.ofIOTask receiver.recv
|
||||
|
||||
instance [Inhabited α] : AsyncWrite (Channel α) α where
|
||||
write receiver x := do
|
||||
|
||||
@@ -8,7 +8,7 @@ module
|
||||
prelude
|
||||
public import Init.Data.Queue
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
@@ -24,7 +24,7 @@ will be woken up per notification.
|
||||
-/
|
||||
|
||||
namespace Std
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
inductive Notify.Consumer (α : Type) where
|
||||
| normal (promise : IO.Promise α)
|
||||
|
||||
@@ -8,19 +8,17 @@ module
|
||||
prelude
|
||||
public import Std.Data
|
||||
public import Init.Data.Queue
|
||||
public import Std.Internal.Async.IO
|
||||
public import Std.Async.IO
|
||||
|
||||
public section
|
||||
|
||||
open Std.Internal.Async.IO
|
||||
open Std.Internal.IO.Async
|
||||
|
||||
/-!
|
||||
This module provides `StreamMap`, a container that maps keys to async streams.
|
||||
It allows for dynamic management of multiple named streams with async operations.
|
||||
-/
|
||||
|
||||
namespace Std
|
||||
open Std.Async
|
||||
|
||||
/--
|
||||
This is an existential wrapper for AsyncStream that is used for the `.ofArray` function
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Internal.UV
|
||||
import Std.Net.Addr
|
||||
|
||||
open Std.Internal.IO.Async.UDP
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async.UDP
|
||||
open Std.Async
|
||||
open Std.Net
|
||||
|
||||
def t : IO (Async Nat) := do
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync.Mutex
|
||||
|
||||
open Std
|
||||
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
def wait (ms : UInt32) (ref : Std.Mutex Nat) (val : Nat) : Async Unit := do
|
||||
ref.atomically (·.modify (· * val))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
def cancellableSelector [Monad m] [MonadLift IO m] [MonadAsync AsyncTask m] (fn : Std.CancellationToken → m α) : m (Selector (Except IO.Error α)) := do
|
||||
let signal ← Std.CancellationToken.new
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
-- Test basic cancellation with default reason
|
||||
def testBasicCancellationWithReason : Async Unit := do
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Internal.UV
|
||||
import Std.Net.Addr
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
open Std.Net
|
||||
|
||||
open Std.Net
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import Std.Internal.Async.Timer
|
||||
import Std.Internal.Async.TCP
|
||||
import Std.Internal.Async.UDP
|
||||
import Std.Async.Timer
|
||||
import Std.Async.TCP
|
||||
import Std.Async.UDP
|
||||
|
||||
#exit -- TODO: remove `#exit` after nondet issue is resolved.
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import Std.Internal.Async.Timer
|
||||
import Std.Async.Timer
|
||||
|
||||
open Std Internal IO Async
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
-- Test basic message reception from multiple channels
|
||||
def testSimpleMessages : Async Unit := do
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import Std.Internal.Async.Timer
|
||||
import Std.Async.Timer
|
||||
|
||||
/-
|
||||
these tests are just some preliminary ones as `async_sleep.lean` already contains extensive tests
|
||||
for the entire timer state machine and `Async.Timer` is merely a light wrapper around it.
|
||||
-/
|
||||
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
def BASE_DURATION : Std.Time.Millisecond.Offset := 10
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import Std.Internal.Async.System
|
||||
import Std.Internal.Async.Process
|
||||
import Std.Async.System
|
||||
import Std.Async.Process
|
||||
import Lean.Runtime
|
||||
|
||||
open Std.Internal.IO.Async.System
|
||||
open Std.Internal.IO.Process
|
||||
open Std.Async.System
|
||||
open Std.Async.Process
|
||||
|
||||
#eval do
|
||||
assert! (← getUpTime) > 0
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Internal.UV
|
||||
import Std.Net.Addr
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
open Std.Net
|
||||
|
||||
def assertBEq [BEq α] [ToString α] (actual expected : α) : IO Unit := do
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Internal.UV
|
||||
import Std.Net.Addr
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
open Std.Net
|
||||
|
||||
-- Using this function to create IO Error. For some reason the assert! is not pausing the execution.
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Internal.UV
|
||||
import Std.Net.Addr
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
open Std.Net
|
||||
|
||||
-- Using this function to create IO Error. For some reason the assert! is not pausing the execution.
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Internal.UV
|
||||
import Std.Net.Addr
|
||||
|
||||
open Std.Internal.IO.Async.UDP
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async.UDP
|
||||
open Std.Async
|
||||
open Std.Net
|
||||
|
||||
def assertBEq [BEq α] [ToString α] (actual expected : α) : IO Unit := do
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
-- Test tryRecv with empty channel
|
||||
def tryRecvEmpty : Async Unit := do
|
||||
@@ -245,9 +245,9 @@ def recvConditions : Async Unit := do
|
||||
let subs2 ← channel.subscribe
|
||||
let subs3 ← channel.subscribe
|
||||
|
||||
discard <| EAsync.ofETask (← channel.send 1)
|
||||
discard <| EAsync.ofETask (← channel.send 2)
|
||||
discard <| EAsync.ofETask (← channel.send 3)
|
||||
discard <| EAsync.ofTask (← channel.send 1)
|
||||
discard <| EAsync.ofTask (← channel.send 2)
|
||||
discard <| EAsync.ofTask (← channel.send 3)
|
||||
|
||||
channel.close
|
||||
|
||||
@@ -308,9 +308,9 @@ def selectableConditions : Async Unit := do
|
||||
let subs2 ← channel.subscribe
|
||||
let subs3 ← channel.subscribe
|
||||
|
||||
discard <| EAsync.ofETask (← channel.send 1)
|
||||
discard <| EAsync.ofETask (← channel.send 2)
|
||||
discard <| EAsync.ofETask (← channel.send 3)
|
||||
discard <| EAsync.ofTask (← channel.send 1)
|
||||
discard <| EAsync.ofTask (← channel.send 2)
|
||||
discard <| EAsync.ofTask (← channel.send 3)
|
||||
|
||||
channel.close
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
/-- Test basic tree cancellation -/
|
||||
partial def testCancelTree : IO Unit := do
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
/-- Test ContextAsync cancellation check -/
|
||||
def testIsCancelled : IO Unit := do
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Std.Internal.Async
|
||||
import Std.Async
|
||||
import Std.Sync
|
||||
|
||||
open Std.Internal.IO Async
|
||||
open Std Async
|
||||
|
||||
-- Test basic wait and notifyOne functionality
|
||||
def testBasicWaitNotifyOne : Async Unit := do
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import Std.Internal.Async.Signal
|
||||
import Std.Internal.Async.Select
|
||||
import Std.Internal.Async
|
||||
import Std.Async.Signal
|
||||
import Std.Async.Select
|
||||
import Std.Async
|
||||
|
||||
open Std.Internal.IO.Async
|
||||
open Std.Async
|
||||
|
||||
def assertBEq [BEq α] [Repr α] (actual expected : α) : IO Unit := do
|
||||
unless actual == expected do
|
||||
|
||||
Reference in New Issue
Block a user