Compare commits

...

2 Commits

Author SHA1 Message Date
Henrik Böving
7b32567b89 fix: address review 2025-04-02 17:46:40 +02:00
Henrik Böving
a0f1d9f97c feat add Std.SharedMutex 2025-04-02 16:37:09 +02:00
6 changed files with 346 additions and 3 deletions

View File

@@ -9,3 +9,4 @@ import Std.Sync.Channel
import Std.Sync.Mutex
import Std.Sync.RecursiveMutex
import Std.Sync.Barrier
import Std.Sync.SharedMutex

View File

@@ -24,14 +24,14 @@ instance : Nonempty BaseRecursiveMutex := RecursiveMutexImpl.property
opaque BaseRecursiveMutex.new : BaseIO BaseRecursiveMutex
/--
Locks a `RecursiveBaseMutex`. Waits until no other thread has locked the mutex.
Locks a `BaseRecursiveMutex`. Waits until no other thread has locked the mutex.
If the current thread already holds the mutex this function doesn't block.
-/
@[extern "lean_io_baserecmutex_lock"]
opaque BaseRecursiveMutex.lock (mutex : @& BaseRecursiveMutex) : BaseIO Unit
/--
Attempts to lock a `RecursiveBaseMutex`. If the mutex is not available return `false`, otherwise
Attempts to lock a `BaseRecursiveMutex`. If the mutex is not available return `false`, otherwise
lock it and return `true`.
This function does not block. Furthermore the same thread may acquire the lock multiple times
@@ -41,7 +41,7 @@ through this function.
opaque BaseRecursiveMutex.tryLock (mutex : @& BaseRecursiveMutex) : BaseIO Bool
/--
Unlocks a `RecursiveBaseMutex`. The owning thread must make as many `unlock` calls as `lock` and
Unlocks a `BaseRecursiveMutex`. The owning thread must make as many `unlock` calls as `lock` and
`tryLock` calls in order to fully relinquish ownership of the mutex.
The current thread must have already locked the mutex at least once.

View File

@@ -0,0 +1,168 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Sync.Basic
namespace Std
private opaque SharedMutexImpl : NonemptyType.{0}
/--
An exclusion primitive that allows a number of readers or at most one writer.
If you want to guard shared state, use `SharedMutex α` instead.
-/
def BaseSharedMutex : Type := SharedMutexImpl.type
instance : Nonempty BaseSharedMutex := SharedMutexImpl.property
/-- Creates a new `BaseSharedMutex`. -/
@[extern "lean_io_basesharedmutex_new"]
opaque BaseSharedMutex.new : BaseIO BaseSharedMutex
/--
Locks a `BaseSharedMutex` for exclusive write access. This function blocks until no other
writers or readers have access to the lock.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basesharedmutex_write"]
opaque BaseSharedMutex.write (mutex : @& BaseSharedMutex) : BaseIO Unit
/--
Attempts to lock a `BaseSharedMutex` for exclusive write access. If the mutex is not available
return `false`, otherwise lock it and return `true`.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basesharedmutex_try_write"]
opaque BaseSharedMutex.tryWrite (mutex : @& BaseSharedMutex) : BaseIO Bool
/--
Unlocks a `BaseSharedMutex` write lock.
The current thread must have already locked the mutex for write access.
Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basesharedmutex_unlock_write"]
opaque BaseSharedMutex.unlockWrite (mutex : @& BaseSharedMutex) : BaseIO Unit
/--
Locks a `BaseSharedMutex` for shared read access. This function blocks until there are no more
writers which hold the lock. There may be other readers currently inside the lock when this method
returns.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basesharedmutex_read"]
opaque BaseSharedMutex.read (mutex : @& BaseSharedMutex) : BaseIO Unit
/--
Attempts to lock a `BaseSharedMutex` for shared read access. If the mutex is not available
return `false`, otherwise lock it and return `true`.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basesharedmutex_try_read"]
opaque BaseSharedMutex.tryRead (mutex : @& BaseSharedMutex) : BaseIO Bool
/--
Unlocks a `BaseSharedMutex` read lock.
The current thread must have already locked the mutex for read access.
Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basesharedmutex_unlock_read"]
opaque BaseSharedMutex.unlockRead (mutex : @& BaseSharedMutex) : BaseIO Unit
/--
An exclusion primitive that allows a number of readers or at most one writer access to a shared
state of type `α`.
-/
structure SharedMutex (α : Type) where private mk ::
private ref : IO.Ref α
mutex : BaseSharedMutex
deriving Nonempty
instance : CoeOut (SharedMutex α) BaseSharedMutex where coe := SharedMutex.mutex
/-- Creates a new shared mutex. -/
def SharedMutex.new (a : α) : BaseIO (SharedMutex α) :=
return { ref := IO.mkRef a, mutex := BaseSharedMutex.new }
/--
`mutex.atomically k` runs `k` with read and write access to the mutex's state while locking the
mutex for exclusive write access.
Calling `mutex.atomically` while already holding the underlying `BaseSharedMutex` in the same thread
is undefined behavior.
-/
def SharedMutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : SharedMutex α) (k : AtomicT α m β) : m β := do
try
mutex.mutex.write
k mutex.ref
finally
mutex.mutex.unlockWrite
/--
`mutex.tryAtomically k` tries to lock `mutex` for exclusive write access and runs `k` with read
and write access to the mutex's state if it succeeds. If successful, it returns the value of `k`
as `some`, otherwise `none`.
Calling `mutex.tryAtomically` while already holding the underlying `BaseSharedMutex` in the same
thread is undefined behavior.
-/
def SharedMutex.tryAtomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : SharedMutex α) (k : AtomicT α m β) : m (Option β) := do
if mutex.mutex.tryWrite then
try
k mutex.ref
finally
mutex.mutex.unlockWrite
else
return none
/--
`mutex.atomicallyRead k` runs `k` with read access to the mutex's state while locking the mutex
for shared read access.
Calling `mutex.atomicallyRead` while already holding the underlying `BaseSharedMutex` in the same
thread is undefined behavior.
-/
def SharedMutex.atomicallyRead [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : SharedMutex α) (k : ReaderT α m β) : m β := do
try
mutex.mutex.read
let state (mutex.ref.get : BaseIO α)
k state
finally
mutex.mutex.unlockRead
/--
`mutex.tryAtomicallyRead k` tries to lock `mutex` for shared read access and runs `k` with read
access to the mutex's state if it succeeds. If successful, it returns the value of `k`
as `some`, otherwise `none`.
Calling `mutex.tryAtomicallyRead` while already holding the underlying `BaseSharedMutex` in the
same thread is undefined behavior.
-/
def SharedMutex.tryAtomicallyRead [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : SharedMutex α) (k : ReaderT α m β) : m (Option β) := do
if mutex.mutex.tryRead then
try
let state (mutex.ref.get : BaseIO α)
k state
finally
mutex.mutex.unlockRead
else
return none
end Std

View File

@@ -101,10 +101,57 @@ extern "C" LEAN_EXPORT obj_res lean_io_baserecmutex_unlock(b_obj_arg mtx, obj_ar
return io_result_mk_ok(box(0));
}
// We use a `shared_timed_mutex` instead of a `shared_mutex` for now as the latter is only available
// in C++ 17 and we are currently on C++ 14.
static lean_external_class * g_basesharedmutex_external_class = nullptr;
static void basesharedmutex_finalizer(void * h) {
delete static_cast<shared_timed_mutex *>(h);
}
static void basesharedmutex_foreach(void *, b_obj_arg) {}
static shared_timed_mutex * basesharedmutex_get(lean_object * mtx) {
return static_cast<shared_timed_mutex *>(lean_get_external_data(mtx));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_new(obj_arg) {
return io_result_mk_ok(lean_alloc_external(g_basesharedmutex_external_class, new shared_timed_mutex));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_write(b_obj_arg mtx, obj_arg) {
basesharedmutex_get(mtx)->lock();
return io_result_mk_ok(box(0));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_try_write(b_obj_arg mtx, obj_arg) {
bool success = basesharedmutex_get(mtx)->try_lock();
return io_result_mk_ok(box(success));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_unlock_write(b_obj_arg mtx, obj_arg) {
basesharedmutex_get(mtx)->unlock();
return io_result_mk_ok(box(0));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_read(b_obj_arg mtx, obj_arg) {
basesharedmutex_get(mtx)->lock_shared();
return io_result_mk_ok(box(0));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_try_read(b_obj_arg mtx, obj_arg) {
bool success = basesharedmutex_get(mtx)->try_lock_shared();
return io_result_mk_ok(box(success));
}
extern "C" LEAN_EXPORT obj_res lean_io_basesharedmutex_unlock_read(b_obj_arg mtx, obj_arg) {
basesharedmutex_get(mtx)->unlock_shared();
return io_result_mk_ok(box(0));
}
void initialize_mutex() {
g_basemutex_external_class = lean_register_external_class(basemutex_finalizer, basemutex_foreach);
g_condvar_external_class = lean_register_external_class(condvar_finalizer, condvar_foreach);
g_baserecmutex_external_class = lean_register_external_class(baserecmutex_finalizer, baserecmutex_foreach);
g_basesharedmutex_external_class = lean_register_external_class(basesharedmutex_finalizer, basesharedmutex_foreach);
}
void finalize_mutex() {

View File

@@ -21,6 +21,7 @@ namespace chrono = std::chrono;
#if defined(LEAN_MULTI_THREAD)
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <atomic>
#include <condition_variable>
#define LEAN_THREAD_LOCAL thread_local
@@ -29,6 +30,7 @@ namespace lean {
using std::thread;
using std::mutex;
using std::recursive_mutex;
using std::shared_timed_mutex;
using std::atomic;
using std::atomic_bool;
using std::atomic_ushort;
@@ -145,13 +147,24 @@ public:
class mutex {
public:
void lock() {}
bool try_lock() { return true; }
void unlock() {}
};
class recursive_mutex {
public:
void lock() {}
bool try_lock() { return true; }
void unlock() {}
};
class shared_timed_mutex {
public:
void lock() {}
bool try_lock() { return true; }
void unlock() {}
void lock_shared() {}
bool try_lock_shared() { return true; }
void unlock_shared() {}
};
class condition_variable {
public:
template<typename Lock> void wait(Lock const &) {}

View File

@@ -0,0 +1,114 @@
import Std.Sync.SharedMutex
def countIt (mutex : Std.SharedMutex Nat) : IO Unit := do
for _ in [0:1000] do
mutex.atomically do
modify fun s => s + 1
def atomically : IO Unit := do
let mutex Std.SharedMutex.new 0
let t1 IO.asTask (prio := .dedicated) (countIt mutex)
let t2 IO.asTask (prio := .dedicated) (countIt mutex)
let t3 IO.asTask (prio := .dedicated) (countIt mutex)
let t4 IO.asTask (prio := .dedicated) (countIt mutex)
IO.ofExcept t1.get
IO.ofExcept t2.get
IO.ofExcept t3.get
IO.ofExcept t4.get
mutex.atomically do
let val get
if val != 4000 then
throw <| .userError s!"Should be 4000 but was {val}"
def holdIt (mutex : Std.SharedMutex Nat) (ref : IO.Ref Nat) : IO Unit := do
mutex.atomically do
ref.set 1
modify fun s => s + 1
while ( ref.get) == 1 do
IO.sleep 1
def tryIt (mutex : Std.SharedMutex Nat) (ref : IO.Ref Nat) : IO Unit := do
while ( ref.get) == 0 do
IO.sleep 1
let success mutex.tryAtomically (modify fun s => s + 1)
if success.isSome then throw <| .userError s!"lock succeeded but shouldn't"
ref.set 2
mutex.atomically (modify fun s => s + 1)
let success mutex.tryAtomically (modify fun s => s + 1)
if success.isNone then throw <| .userError s!"lock didn't succeed but should"
def tryAtomically : IO Unit := do
let mutex Std.SharedMutex.new 0
let ref IO.mkRef 0
let t1 IO.asTask (prio := .dedicated) (holdIt mutex ref)
let t2 IO.asTask (prio := .dedicated) (tryIt mutex ref)
IO.ofExcept t1.get
IO.ofExcept t2.get
mutex.atomically do
let val get
if val != 3 then
throw <| .userError s!"Should be 3 but was {val}"
def readIt (mutex : Std.SharedMutex Nat) : IO Unit := do
mutex.atomicallyRead do
let val read
if val != 37 then
throw <| .userError s!"Value should be 37 but was {val}"
def atomicallyRead : IO Unit := do
let mutex Std.SharedMutex.new 37
let t1 IO.asTask (prio := .dedicated) (readIt mutex)
let t2 IO.asTask (prio := .dedicated) (readIt mutex)
IO.ofExcept t1.get
IO.ofExcept t2.get
def tryWriteIt (mutex : Std.SharedMutex Nat) (ref : IO.Ref Nat) : IO Unit := do
let success
mutex.tryAtomically do
ref.set 1
modify fun s => s + 1
while ( ref.get) == 1 do
IO.sleep 1
if success.isNone then throw <| .userError s!"write lock failed"
def tryReadIt (mutex : Std.SharedMutex Nat) (ref : IO.Ref Nat) : IO Unit := do
while ( ref.get) == 0 do
IO.sleep 1
let success mutex.tryAtomically (modify fun s => s + 1)
if success.isSome then throw <| .userError s!"write lock succeeded but shouldn't"
let success mutex.tryAtomicallyRead read
if success.isSome then throw <| .userError s!"read lock succeeded but shouldn't"
ref.set 2
let val mutex.atomicallyRead read
if val != 1 then throw <| .userError s!"value should be 1 but was {val}"
mutex.atomicallyRead do
ref.set 3
while ( ref.get) == 3 do
IO.sleep 1
def tryReadIt' (mutex : Std.SharedMutex Nat) (ref : IO.Ref Nat) : IO Unit := do
while ( ref.get) != 3 do
IO.sleep 1
let val mutex.tryAtomicallyRead read
if val != some 1 then throw <| .userError s!"value should be `some 1` but was {val}"
ref.set 4
def tryAtomicallyRead : IO Unit := do
let mutex Std.SharedMutex.new 0
let ref IO.mkRef 0
let t1 IO.asTask (prio := .dedicated) (tryWriteIt mutex ref)
let t2 IO.asTask (prio := .dedicated) (tryReadIt mutex ref)
let t3 IO.asTask (prio := .dedicated) (tryReadIt' mutex ref)
IO.ofExcept t1.get
IO.ofExcept t2.get
IO.ofExcept t3.get
mutex.atomically do
let val get
if val != 1 then
throw <| .userError s!"Should be 1 but was {val}"
#eval atomically
#eval atomicallyRead
#eval tryAtomically
#eval tryAtomicallyRead