Compare commits

...

4 Commits

Author SHA1 Message Date
Sofia Rodrigues
8be5d6e1cb fix: stream can only return one type 2025-09-22 15:09:06 -03:00
Sofia Rodrigues
c6236eb4fe fix: remove useless function 2025-09-22 15:09:06 -03:00
Sofia Rodrigues
49b964053d feat: remove outparams 2025-09-22 15:09:06 -03:00
Sofia Rodrigues
e64416e047 feat: async type classes 2025-09-22 15:09:06 -03:00
3 changed files with 81 additions and 2 deletions

View File

@@ -15,5 +15,6 @@ public import Std.Internal.Async.Select
public import Std.Internal.Async.Process
public import Std.Internal.Async.System
public import Std.Internal.Async.Signal
public import Std.Internal.Async.IO
public section

View File

@@ -0,0 +1,54 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
module
prelude
public import Std.Internal.Async.Select
public section
namespace Std
namespace Internal
namespace Async
namespace IO
open Std.Internal.IO.Async
/-!
This module provides buffered asynchronous I/O operations for efficient reading and writing.
-/
/--
Interface for asynchronous reading operations.
-/
class AsyncRead (α : Type) (β : Type) where
read : α Async β
/--
Interface for asynchronous writing operations.
-/
class AsyncWrite (α : Type) (β : Type) where
write : α β Async Unit
writeAll : α Array β Async Unit :=
fun socket data => data.forM (write socket)
flush : α Async Unit :=
fun _ => pure ()
/--
Interface for asynchronous streaming with selector-based iteration.
-/
class AsyncStream (α : Type) (β : outParam Type) where
next : α Selector β
stop : α IO Unit :=
fun _ => pure ()
end IO
end Async
end Internal
end Std

View File

@@ -10,9 +10,13 @@ public import Init.System.Promise
public import Init.Data.Queue
public import Std.Sync.Mutex
public import Std.Internal.Async.Select
public import Std.Internal.Async.IO
public section
open Std.Internal.Async.IO
open Std.Internal.IO.Async
/-!
This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer
multi-consumer FIFO channel that offers both bounded and unbounded buffering as well as synchronous
@@ -24,7 +28,6 @@ for cleaner code.
-/
namespace Std
namespace CloseableChannel
/--
@@ -753,6 +756,17 @@ partial def forAsync (f : α → BaseIO Unit) (ch : CloseableChannel α)
| none => return .pure ()
| some v => do f v; ch.forAsync f prio
instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where
next channel := channel.recvSelector
instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where
write receiver x := do
let task receiver.send x
Async.ofAsyncTask <| task.map (Except.mapError (IO.userError toString))
/--
This function is a no-op and just a convenient way to expose the synchronous API of the channel.
-/
@@ -804,7 +818,6 @@ instance [MonadLiftT BaseIO m] : ForIn m (Sync α) α where
forIn ch b f := private ch.forIn f b
end Sync
end CloseableChannel
/--
@@ -893,6 +906,17 @@ partial def forAsync [Inhabited α] (f : α → BaseIO Unit) (ch : Channel α)
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
BaseIO.bindTask (prio := prio) ( ch.recv) fun v => do f v; ch.forAsync f prio
instance [Inhabited α] : AsyncStream (Channel α) α where
next channel := channel.recvSelector
instance [Inhabited α] : AsyncRead (Channel α) α where
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
instance [Inhabited α] : AsyncWrite (Channel α) α where
write receiver x := do
let task receiver.send x
Async.ofTask task
@[inherit_doc CloseableChannel.sync, inline]
def sync (ch : Channel α) : Channel.Sync α := ch