Compare commits

...

1 Commits

Author SHA1 Message Date
Henrik Böving
e552497cac refactor: move Channel/Mutex to Std 2024-12-02 11:27:56 +01:00
8 changed files with 309 additions and 17 deletions

View File

@@ -8,6 +8,8 @@ import Init.Data.Queue
import Init.System.Promise
import Init.System.Mutex
set_option linter.deprecated false
namespace IO
/--
@@ -15,6 +17,7 @@ Internal state of an `Channel`.
We maintain the invariant that at all times either `consumers` or `values` is empty.
-/
@[deprecated "Use Std.Channel.State from Std.Sync.Channel instead" (since := "2024-12-02")]
structure Channel.State (α : Type) where
values : Std.Queue α :=
consumers : Std.Queue (Promise (Option α)) :=
@@ -27,12 +30,14 @@ FIFO channel with unbounded buffer, where `recv?` returns a `Task`.
A channel can be closed. Once it is closed, all `send`s are ignored, and
`recv?` returns `none` once the queue is empty.
-/
@[deprecated "Use Std.Channel from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel (α : Type) : Type := Mutex (Channel.State α)
instance : Nonempty (Channel α) :=
inferInstanceAs (Nonempty (Mutex _))
/-- Creates a new `Channel`. -/
@[deprecated "Use Std.Channel.new from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.new : BaseIO (Channel α) :=
Mutex.new {}
@@ -41,6 +46,7 @@ Sends a message on an `Channel`.
This function does not block.
-/
@[deprecated "Use Std.Channel.send from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.send (ch : Channel α) (v : α) : BaseIO Unit :=
ch.atomically do
let st get
@@ -54,6 +60,7 @@ def Channel.send (ch : Channel α) (v : α) : BaseIO Unit :=
/--
Closes an `Channel`.
-/
@[deprecated "Use Std.Channel.close from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.close (ch : Channel α) : BaseIO Unit :=
ch.atomically do
let st get
@@ -67,6 +74,7 @@ Every message is only received once.
Returns `none` if the channel is closed and the queue is empty.
-/
@[deprecated "Use Std.Channel.recv? from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) :=
ch.atomically do
let st get
@@ -85,6 +93,7 @@ def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) :=
Note that if this function is called twice, each `forAsync` only gets half the messages.
-/
@[deprecated "Use Std.Channel.forAsync from Std.Sync.Channel instead" (since := "2024-12-02")]
partial def Channel.forAsync (f : α BaseIO Unit) (ch : Channel α)
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
BaseIO.bindTask (prio := prio) ( ch.recv?) fun
@@ -96,11 +105,13 @@ Receives all currently queued messages from the channel.
Those messages are dequeued and will not be returned by `recv?`.
-/
@[deprecated "Use Std.Channel.recvAllCurrent from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.recvAllCurrent (ch : Channel α) : BaseIO (Array α) :=
ch.atomically do
modifyGet fun st => (st.values.toArray, { st with values := })
/-- Type tag for synchronous (blocking) operations on a `Channel`. -/
@[deprecated "Use Std.Channel.Sync from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.Sync := Channel
/--
@@ -110,6 +121,7 @@ For example, `ch.sync.recv?` blocks until the next message,
and `for msg in ch.sync do ...` iterates synchronously over the channel.
These functions should only be used in dedicated threads.
-/
@[deprecated "Use Std.Channel.sync from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.sync (ch : Channel α) : Channel.Sync α := ch
/--
@@ -118,9 +130,11 @@ Synchronously receives a message from the channel.
Every message is only received once.
Returns `none` if the channel is closed and the queue is empty.
-/
@[deprecated "Use Std.Channel.Sync.recv? from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.Sync.recv? (ch : Channel.Sync α) : BaseIO (Option α) := do
IO.wait ( Channel.recv? ch)
@[deprecated "Use Std.Channel.Sync.forIn from Std.Sync.Channel instead" (since := "2024-12-02")]
private partial def Channel.Sync.forIn [Monad m] [MonadLiftT BaseIO m]
(ch : Channel.Sync α) (f : α β m (ForInStep β)) : β m β := fun b => do
match ch.recv? with

View File

@@ -7,6 +7,9 @@ prelude
import Init.System.IO
import Init.Control.StateRef
set_option linter.deprecated false
namespace IO
private opaque BaseMutexImpl : NonemptyType.{0}
@@ -16,12 +19,13 @@ Mutual exclusion primitive (a lock).
If you want to guard shared state, use `Mutex α` instead.
-/
@[deprecated "Use Std.BaseMutex from Std.Sync.Mutex instead" (since := "2024-12-02")]
def BaseMutex : Type := BaseMutexImpl.type
instance : Nonempty BaseMutex := BaseMutexImpl.property
/-- Creates a new `BaseMutex`. -/
@[extern "lean_io_basemutex_new"]
@[extern "lean_io_basemutex_new", deprecated "Use Std.BaseMutex.new from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque BaseMutex.new : BaseIO BaseMutex
/--
@@ -30,7 +34,7 @@ Locks a `BaseMutex`. Waits until no other thread has locked the mutex.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basemutex_lock"]
@[extern "lean_io_basemutex_lock", deprecated "Use Std.BaseMutex.lock from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque BaseMutex.lock (mutex : @& BaseMutex) : BaseIO Unit
/--
@@ -39,33 +43,35 @@ Unlocks a `BaseMutex`.
The current thread must have already locked the mutex.
Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basemutex_unlock"]
@[extern "lean_io_basemutex_unlock", deprecated "Use Std.BaseMutex.unlock from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque BaseMutex.unlock (mutex : @& BaseMutex) : BaseIO Unit
private opaque CondvarImpl : NonemptyType.{0}
/-- Condition variable. -/
@[deprecated "Use Std.Condvar from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Condvar : Type := CondvarImpl.type
instance : Nonempty Condvar := CondvarImpl.property
/-- Creates a new condition variable. -/
@[extern "lean_io_condvar_new"]
@[extern "lean_io_condvar_new", deprecated "Use Std.Condvar.new from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.new : BaseIO Condvar
/-- Waits until another thread calls `notifyOne` or `notifyAll`. -/
@[extern "lean_io_condvar_wait"]
@[extern "lean_io_condvar_wait", deprecated "Use Std.Condvar.wait from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.wait (condvar : @& Condvar) (mutex : @& BaseMutex) : BaseIO Unit
/-- Wakes up a single other thread executing `wait`. -/
@[extern "lean_io_condvar_notify_one"]
@[extern "lean_io_condvar_notify_one", deprecated "Use Std.Condvar.notifyOne from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.notifyOne (condvar : @& Condvar) : BaseIO Unit
/-- Wakes up all other threads executing `wait`. -/
@[extern "lean_io_condvar_notify_all"]
@[extern "lean_io_condvar_notify_all", deprecated "Use Std.Condvar.notifyAll from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.notifyAll (condvar : @& Condvar) : BaseIO Unit
/-- Waits on the condition variable until the predicate is true. -/
@[deprecated "Use Std.Condvar.waitUntil from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Condvar.waitUntil [Monad m] [MonadLift BaseIO m]
(condvar : Condvar) (mutex : BaseMutex) (pred : m Bool) : m Unit := do
while !( pred) do
@@ -78,6 +84,7 @@ The type `Mutex α` is similar to `IO.Ref α`,
except that concurrent accesses are guarded by a mutex
instead of atomic pointer operations and busy-waiting.
-/
@[deprecated "Use Std.Mutex from Std.Sync.Mutex instead" (since := "2024-12-02")]
structure Mutex (α : Type) where private mk ::
private ref : IO.Ref α
mutex : BaseMutex
@@ -86,6 +93,7 @@ structure Mutex (α : Type) where private mk ::
instance : CoeOut (Mutex α) BaseMutex where coe := Mutex.mutex
/-- Creates a new mutex. -/
@[deprecated "Use Std.Mutex.new from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Mutex.new (a : α) : BaseIO (Mutex α) :=
return { ref := mkRef a, mutex := BaseMutex.new }
@@ -94,9 +102,11 @@ def Mutex.new (a : α) : BaseIO (Mutex α) :=
with outside monad `m`.
The action has access to the state `α` of the mutex (via `get` and `set`).
-/
@[deprecated "Use Std.AtomicT from Std.Sync.Mutex instead" (since := "2024-12-02")]
abbrev AtomicT := StateRefT' IO.RealWorld
/-- `mutex.atomically k` runs `k` with access to the mutex's state while locking the mutex. -/
@[deprecated "Use Std.Mutex.atomically from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : Mutex α) (k : AtomicT α m β) : m β := do
try
@@ -110,6 +120,7 @@ def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
waiting on `condvar` until `pred` returns true.
Both `k` and `pred` have access to the mutex's state.
-/
@[deprecated "Use Std.Mutex.atomicallyOnce from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Mutex.atomicallyOnce [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : Mutex α) (condvar : Condvar)
(pred : AtomicT α m Bool) (k : AtomicT α m β) : m β :=

View File

@@ -6,7 +6,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki
-/
prelude
import Init.System.IO
import Init.Data.Channel
import Std.Sync.Channel
import Lean.Data.RBMap
import Lean.Environment
@@ -64,7 +64,7 @@ open Widget in
structure WorkerContext where
/-- Synchronized output channel for LSP messages. Notifications for outdated versions are
discarded on read. -/
chanOut : IO.Channel JsonRpc.Message
chanOut : Std.Channel JsonRpc.Message
/--
Latest document version received by the client, used for filtering out notifications from
previous versions.
@@ -75,7 +75,7 @@ structure WorkerContext where
Channel that receives a message for every a `$/lean/fileProgress` notification, indicating whether
the notification suggests that the file is currently being processed.
-/
chanIsProcessing : IO.Channel Bool
chanIsProcessing : Std.Channel Bool
/--
Diagnostics that are included in every single `textDocument/publishDiagnostics` notification.
-/
@@ -271,7 +271,7 @@ open Language Lean in
Callback from Lean language processor after parsing imports that requests necessary information from
Lake for processing imports.
-/
def setupImports (meta : DocumentMeta) (cmdlineOpts : Options) (chanOut : Channel JsonRpc.Message)
def setupImports (meta : DocumentMeta) (cmdlineOpts : Options) (chanOut : Std.Channel JsonRpc.Message)
(srcSearchPathPromise : Promise SearchPath) (stx : Syntax) :
Language.ProcessingT IO (Except Language.Lean.HeaderProcessedSnapshot SetupImportsResult) := do
let importsAlreadyLoaded importsLoadedRef.modifyGet ((·, true))
@@ -337,7 +337,7 @@ section Initialization
let clientHasWidgets := initParams.initializationOptions?.bind (·.hasWidgets?) |>.getD false
let maxDocVersionRef IO.mkRef 0
let freshRequestIdRef IO.mkRef (0 : Int)
let chanIsProcessing IO.Channel.new
let chanIsProcessing Std.Channel.new
let stickyDiagnosticsRef IO.mkRef
let chanOut mkLspOutputChannel maxDocVersionRef chanIsProcessing
let srcSearchPathPromise IO.Promise.new
@@ -380,8 +380,8 @@ section Initialization
the output FS stream after discarding outdated notifications. This is the only component of
the worker with access to the output stream, so we can synchronize messages from parallel
elaboration tasks here. -/
mkLspOutputChannel maxDocVersion chanIsProcessing : IO (IO.Channel JsonRpc.Message) := do
let chanOut IO.Channel.new
mkLspOutputChannel maxDocVersion chanIsProcessing : IO (Std.Channel JsonRpc.Message) := do
let chanOut Std.Channel.new
let _ chanOut.forAsync (prio := .dedicated) fun msg => do
-- discard outdated notifications; note that in contrast to responses, notifications can
-- always be silently discarded

View File

@@ -6,7 +6,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki
-/
prelude
import Init.System.IO
import Init.System.Mutex
import Std.Sync.Mutex
import Init.Data.ByteArray
import Lean.Data.RBMap
@@ -113,7 +113,7 @@ section FileWorker
structure FileWorker where
doc : DocumentMeta
proc : Process.Child workerCfg
exitCode : IO.Mutex (Option UInt32)
exitCode : Std.Mutex (Option UInt32)
commTask : Task WorkerEvent
state : WorkerState
-- This should not be mutated outside of namespace FileWorker,
@@ -392,7 +392,7 @@ section ServerM
-- open session for `kill` above
setsid := true
}
let exitCode IO.Mutex.new none
let exitCode Std.Mutex.new none
let pendingRequestsRef IO.mkRef (RBMap.empty : PendingRequestMap)
let initialDependencyBuildMode := m.dependencyBuildMode
let updatedDependencyBuildMode :=

View File

@@ -6,6 +6,7 @@ Authors: Sebastian Ullrich
prelude
import Std.Data
import Std.Sat
import Std.Sync
import Std.Time
import Std.Tactic
import Std.Internal

8
src/Std/Sync.lean Normal file
View File

@@ -0,0 +1,8 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Sync.Channel
import Std.Sync.Mutex

137
src/Std/Sync/Channel.lean Normal file
View File

@@ -0,0 +1,137 @@
/-
Copyright (c) 2022 Microsoft Corporation. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Gabriel Ebner
-/
prelude
import Init.System.Promise
import Init.Data.Queue
import Std.Sync.Mutex
namespace Std
/--
Internal state of an `Channel`.
We maintain the invariant that at all times either `consumers` or `values` is empty.
-/
structure Channel.State (α : Type) where
values : Std.Queue α :=
consumers : Std.Queue (IO.Promise (Option α)) :=
closed := false
deriving Inhabited
/--
FIFO channel with unbounded buffer, where `recv?` returns a `Task`.
A channel can be closed. Once it is closed, all `send`s are ignored, and
`recv?` returns `none` once the queue is empty.
-/
def Channel (α : Type) : Type := Mutex (Channel.State α)
instance : Nonempty (Channel α) :=
inferInstanceAs (Nonempty (Mutex _))
/-- Creates a new `Channel`. -/
def Channel.new : BaseIO (Channel α) :=
Mutex.new {}
/--
Sends a message on an `Channel`.
This function does not block.
-/
def Channel.send (ch : Channel α) (v : α) : BaseIO Unit :=
ch.atomically do
let st get
if st.closed then return
if let some (consumer, consumers) := st.consumers.dequeue? then
consumer.resolve (some v)
set { st with consumers }
else
set { st with values := st.values.enqueue v }
/--
Closes an `Channel`.
-/
def Channel.close (ch : Channel α) : BaseIO Unit :=
ch.atomically do
let st get
for consumer in st.consumers.toArray do consumer.resolve none
set { st with closed := true, consumers := }
/--
Receives a message, without blocking.
The returned task waits for the message.
Every message is only received once.
Returns `none` if the channel is closed and the queue is empty.
-/
def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) :=
ch.atomically do
let st get
if let some (a, values) := st.values.dequeue? then
set { st with values }
return .pure a
else if !st.closed then
let promise IO.Promise.new
set { st with consumers := st.consumers.enqueue promise }
return promise.result
else
return .pure none
/--
`ch.forAsync f` calls `f` for every messages received on `ch`.
Note that if this function is called twice, each `forAsync` only gets half the messages.
-/
partial def Channel.forAsync (f : α BaseIO Unit) (ch : Channel α)
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
BaseIO.bindTask (prio := prio) ( ch.recv?) fun
| none => return .pure ()
| some v => do f v; ch.forAsync f prio
/--
Receives all currently queued messages from the channel.
Those messages are dequeued and will not be returned by `recv?`.
-/
def Channel.recvAllCurrent (ch : Channel α) : BaseIO (Array α) :=
ch.atomically do
modifyGet fun st => (st.values.toArray, { st with values := })
/-- Type tag for synchronous (blocking) operations on a `Channel`. -/
def Channel.Sync := Channel
/--
Accesses synchronous (blocking) version of channel operations.
For example, `ch.sync.recv?` blocks until the next message,
and `for msg in ch.sync do ...` iterates synchronously over the channel.
These functions should only be used in dedicated threads.
-/
def Channel.sync (ch : Channel α) : Channel.Sync α := ch
/--
Synchronously receives a message from the channel.
Every message is only received once.
Returns `none` if the channel is closed and the queue is empty.
-/
def Channel.Sync.recv? (ch : Channel.Sync α) : BaseIO (Option α) := do
IO.wait ( Channel.recv? ch)
private partial def Channel.Sync.forIn [Monad m] [MonadLiftT BaseIO m]
(ch : Channel.Sync α) (f : α β m (ForInStep β)) : β m β := fun b => do
match ch.recv? with
| some a =>
match f a b with
| .done b => pure b
| .yield b => ch.forIn f b
| none => pure b
/-- `for msg in ch.sync do ...` receives all messages in the channel until it is closed. -/
instance [MonadLiftT BaseIO m] : ForIn m (Channel.Sync α) α where
forIn ch b f := ch.forIn f b
end Std

121
src/Std/Sync/Mutex.lean Normal file
View File

@@ -0,0 +1,121 @@
/-
Copyright (c) 2022 Microsoft Corporation. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Gabriel Ebner
-/
prelude
import Init.System.IO
import Init.Control.StateRef
namespace Std
private opaque BaseMutexImpl : NonemptyType.{0}
/--
Mutual exclusion primitive (a lock).
If you want to guard shared state, use `Mutex α` instead.
-/
def BaseMutex : Type := BaseMutexImpl.type
instance : Nonempty BaseMutex := BaseMutexImpl.property
/-- Creates a new `BaseMutex`. -/
@[extern "lean_io_basemutex_new"]
opaque BaseMutex.new : BaseIO BaseMutex
/--
Locks a `BaseMutex`. Waits until no other thread has locked the mutex.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basemutex_lock"]
opaque BaseMutex.lock (mutex : @& BaseMutex) : BaseIO Unit
/--
Unlocks a `BaseMutex`.
The current thread must have already locked the mutex.
Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basemutex_unlock"]
opaque BaseMutex.unlock (mutex : @& BaseMutex) : BaseIO Unit
private opaque CondvarImpl : NonemptyType.{0}
/-- Condition variable. -/
def Condvar : Type := CondvarImpl.type
instance : Nonempty Condvar := CondvarImpl.property
/-- Creates a new condition variable. -/
@[extern "lean_io_condvar_new"]
opaque Condvar.new : BaseIO Condvar
/-- Waits until another thread calls `notifyOne` or `notifyAll`. -/
@[extern "lean_io_condvar_wait"]
opaque Condvar.wait (condvar : @& Condvar) (mutex : @& BaseMutex) : BaseIO Unit
/-- Wakes up a single other thread executing `wait`. -/
@[extern "lean_io_condvar_notify_one"]
opaque Condvar.notifyOne (condvar : @& Condvar) : BaseIO Unit
/-- Wakes up all other threads executing `wait`. -/
@[extern "lean_io_condvar_notify_all"]
opaque Condvar.notifyAll (condvar : @& Condvar) : BaseIO Unit
/-- Waits on the condition variable until the predicate is true. -/
def Condvar.waitUntil [Monad m] [MonadLift BaseIO m]
(condvar : Condvar) (mutex : BaseMutex) (pred : m Bool) : m Unit := do
while !( pred) do
condvar.wait mutex
/--
Mutual exclusion primitive (lock) guarding shared state of type `α`.
The type `Mutex α` is similar to `IO.Ref α`,
except that concurrent accesses are guarded by a mutex
instead of atomic pointer operations and busy-waiting.
-/
structure Mutex (α : Type) where private mk ::
private ref : IO.Ref α
mutex : BaseMutex
deriving Nonempty
instance : CoeOut (Mutex α) BaseMutex where coe := Mutex.mutex
/-- Creates a new mutex. -/
def Mutex.new (a : α) : BaseIO (Mutex α) :=
return { ref := IO.mkRef a, mutex := BaseMutex.new }
/--
`AtomicT α m` is the monad that can be atomically executed inside a `Mutex α`,
with outside monad `m`.
The action has access to the state `α` of the mutex (via `get` and `set`).
-/
abbrev AtomicT := StateRefT' IO.RealWorld
/-- `mutex.atomically k` runs `k` with access to the mutex's state while locking the mutex. -/
def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : Mutex α) (k : AtomicT α m β) : m β := do
try
mutex.mutex.lock
k mutex.ref
finally
mutex.mutex.unlock
/--
`mutex.atomicallyOnce condvar pred k` runs `k`,
waiting on `condvar` until `pred` returns true.
Both `k` and `pred` have access to the mutex's state.
-/
def Mutex.atomicallyOnce [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : Mutex α) (condvar : Condvar)
(pred : AtomicT α m Bool) (k : AtomicT α m β) : m β :=
let _ : MonadLift BaseIO (AtomicT α m) := liftM
mutex.atomically do
condvar.waitUntil mutex pred
k
end Std