Compare commits

...

18 Commits

Author SHA1 Message Date
Sofia Rodrigues
eac8a3d0f9 fix: remove redundancy 2025-10-16 20:05:16 -03:00
Sofia Rodrigues
cac3b795a6 fix: small comments 2025-10-09 13:22:40 -03:00
Sofia Rodrigues
414e00a2a0 fix: remove testing 2025-10-09 13:18:55 -03:00
Sofia Rodrigues
bc7b3b4ef6 fix: tests 2025-10-09 13:13:24 -03:00
Sofia Rodrigues
7e7ef97e12 fix: wrong behavior of cancel 2025-10-09 13:08:26 -03:00
Sofia Rodrigues
0008ca2014 fix: typo 2025-10-09 12:48:51 -03:00
Sofia Rodrigues
721d434c27 fix: smaller changes 2025-10-09 12:44:19 -03:00
Sofia Rodrigues
5163b5dfdf fix: selectors 2025-10-09 12:39:16 -03:00
Sofia Rodrigues
9fab3257cd fix: behavior that it just creates a loop with promises 2025-10-09 09:46:08 -03:00
Sofia Rodrigues
daf892e3b9 feat: remove useless changes 2025-10-09 08:33:52 -03:00
Sofia Rodrigues
02626492d0 fix: a bunch of selectors 2025-10-09 08:29:55 -03:00
Sofia Rodrigues
e41767ef99 feat: accept 2025-10-09 08:29:55 -03:00
Sofia Rodrigues
0c190cbdf9 fix: test orders 2025-10-09 08:29:54 -03:00
Sofia Rodrigues
487f09915a feat: add more tests 2025-10-09 08:29:41 -03:00
Sofia Rodrigues
d8e4ec9e70 fix: stop function 2025-10-09 08:29:34 -03:00
Sofia Rodrigues
bf6896c7a8 fix: some issues with rc and test 2025-10-09 08:29:33 -03:00
Sofia Rodrigues
ac0da74d54 fix: cancel 2025-10-09 08:29:07 -03:00
Sofia Rodrigues
ee4c004cd0 fix: behavior 2025-10-09 08:28:15 -03:00
15 changed files with 230 additions and 51 deletions

View File

@@ -238,23 +238,26 @@ def stop (s : Signal.Waiter) : IO Unit :=
s.native.stop
/--
Create a `Selector` that resolves once `s` has received the signal. Note that calling this function starts `s`
if it hasn't already started.
Create a `Selector` that resolves once `s` has received the signal. Note that calling this function will
only be started if used with `Selectable.one` or `Selectable.combine`.
-/
def selector (s : Signal.Waiter) : IO (Selector Unit) := do
let signalWaiter s.wait
return {
def selector (s : Signal.Waiter) : Selector Unit :=
{
tryFn := do
let signalWaiter : AsyncTask _ async s.wait
if IO.hasFinished signalWaiter then
return some ()
else
s.native.cancel
return none
registerFn waiter := do
let signalWaiter s.wait
discard <| AsyncTask.mapIO (x := signalWaiter) fun _ => do
let lose := return ()
let win promise := promise.resolve (.ok ())
waiter.race lose win
unregisterFn := s.stop
unregisterFn := s.native.cancel
}

View File

@@ -70,6 +70,44 @@ def accept (s : Server) : Async Client := do
|> Async.ofPromise
|>.map Client.ofNative
/--
Tries to accept an incoming connection.
-/
@[inline]
def tryAccept (s : Server) : IO (Option Client) := do
let res s.native.tryAccept
let socket IO.ofExcept res
return Client.ofNative <$> socket
/--
Creates a `Selector` that resolves once `s` has a connection available. Calling this function
does not start the connection wait, so it must not be called in parallel with `accept`.
-/
def acceptSelector (s : TCP.Socket.Server) : Selector Client :=
{
tryFn :=
s.tryAccept
registerFn waiter := do
let task s.native.accept
-- If we get cancelled the promise will be dropped so prepare for that
IO.chainTask (t := task.result?) fun res => do
match res with
| none => return ()
| some res =>
let lose := return ()
let win promise := do
try
let result IO.ofExcept res
promise.resolve (.ok (Client.ofNative result))
catch e =>
promise.resolve (.error e)
waiter.race lose win
unregisterFn := s.native.cancelAccept
}
/--
Gets the local address of the server socket.
-/
@@ -144,20 +182,25 @@ def recv? (s : Client) (size : UInt64) : Async (Option ByteArray) :=
/--
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
and provides that data. Calling this function starts the data wait, so it must not be called
and provides that data. Calling this function does not starts the data wait, so it must not be called
in parallel with `recv?`.
-/
def recvSelector (s : TCP.Socket.Client) (size : UInt64) : Async (Selector (Option ByteArray)) := do
let readableWaiter s.native.waitReadable
return {
def recvSelector (s : TCP.Socket.Client) (size : UInt64) : Selector (Option ByteArray) :=
{
tryFn := do
let readableWaiter s.native.waitReadable
if readableWaiter.isResolved then
-- We know that this read should not block
let res (s.recv? size).block
return some res
else
s.native.cancelRecv
return none
registerFn waiter := do
let readableWaiter s.native.waitReadable
-- If we get cancelled the promise will be dropped so prepare for that
discard <| IO.mapTask (t := readableWaiter.result?) fun res => do
match res with
@@ -173,6 +216,7 @@ def recvSelector (s : TCP.Socket.Client) (size : UInt64) : Async (Selector (Opti
catch e =>
promise.resolve (.error e)
waiter.race lose win
unregisterFn := s.native.cancelRecv
}

View File

@@ -69,17 +69,18 @@ def stop (s : Sleep) : IO Unit :=
s.native.stop
/--
Create a `Selector` that resolves once `s` has finished. Note that calling this function starts `s`
if it hasn't already started.
Create a `Selector` that resolves once `s` has finished. `s` only starts when it runs inside of a Selectable.
-/
def selector (s : Sleep) : Async (Selector Unit) := do
return {
def selector (s : Sleep) : Selector Unit :=
{
tryFn := do
let sleepWaiter s.native.next
if sleepWaiter.isResolved then
return some ()
else
s.native.cancel
return none
registerFn waiter := do
let sleepWaiter s.native.next
BaseIO.chainTask sleepWaiter.result? fun
@@ -107,7 +108,7 @@ Return a `Selector` that completes after `duration`.
-/
def Selector.sleep (duration : Std.Time.Millisecond.Offset) : Async (Selector Unit) := do
let sleeper Sleep.mk duration
sleeper.selector
return sleeper.selector
/--
`Interval` can be used to repeatedly wait for some duration like a clock.

View File

@@ -92,20 +92,24 @@ def recv (s : Socket) (size : UInt64) : Async (ByteArray × Option SocketAddress
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
and provides that data. If the socket has not been previously bound with `bind`, it is
automatically bound to `0.0.0.0` (all interfaces) with a random port.
Calling this function starts the data wait, so it must not be called in parallel with `recv`.
Calling this function does starts the data wait, only when it's used with `Selectable.one` or `combine`.
It must not be called in parallel with `recv`.
-/
def recvSelector (s : Socket) (size : UInt64) :
Async (Selector (ByteArray × Option SocketAddress)) := do
let readableWaiter s.native.waitReadable
return {
def recvSelector (s : Socket) (size : UInt64) : Selector (ByteArray × Option SocketAddress) :=
{
tryFn := do
let readableWaiter s.native.waitReadable
if readableWaiter.isResolved then
-- We know that this read should not block
let res (s.recv size).block
return some res
else
s.native.cancelRecv
return none
registerFn waiter := do
let readableWaiter s.native.waitReadable
-- If we get cancelled the promise will be dropped so prepare for that
discard <| IO.mapTask (t := readableWaiter.result?) fun res => do
match res with
@@ -121,6 +125,7 @@ def recvSelector (s : Socket) (size : UInt64) :
catch e =>
promise.resolve (.error e)
waiter.race lose win
unregisterFn := s.native.cancelRecv
}

View File

@@ -78,6 +78,15 @@ This function has different behavior depending on the state of the `Signal`:
@[extern "lean_uv_signal_stop"]
opaque stop (signal : @& Signal) : IO Unit
/--
This function has different behavior depending on the state of the `Signal`:
- If it is initial or finished this is a no-op.
- If it's running then it drops the accept promise and if it's not repeatable it sets
the signal handler to the initial state.
-/
@[extern "lean_uv_signal_cancel"]
opaque cancel (signal : @& Signal) : IO Unit
end Signal
end UV

View File

@@ -96,6 +96,18 @@ Accepts an incoming connection on a listening TCP socket.
@[extern "lean_uv_tcp_accept"]
opaque accept (socket : @& Socket) : IO (IO.Promise (Except IO.Error Socket))
/--
Tries to accept an incoming connection on a listening TCP socket.
-/
@[extern "lean_uv_tcp_try_accept"]
opaque tryAccept (socket : @& Socket) : IO (Except IO.Error (Option Socket))
/--
Cancels the accept request of a socket.
-/
@[extern "lean_uv_tcp_cancel_accept"]
opaque cancelAccept (socket : @& Socket) : IO Unit
/--
Shuts down an incoming connection on a listening TCP socket.
-/

View File

@@ -88,7 +88,7 @@ opaque stop (timer : @& Timer) : IO Unit
This function has different behavior depending on the state of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running, the promise generated by the `next` function is dropped.
- If `repeating` is `false` then it sets the timer to the finished state.
- If `repeating` is `false` then it sets the timer to the initial state.
-/
@[extern "lean_uv_timer_cancel"]
opaque cancel (timer : @& Timer) : IO Unit

View File

@@ -39,7 +39,7 @@ void initialize_libuv_signal() {
}
static bool signal_promise_is_finished(lean_uv_signal_object * signal) {
return lean_io_get_task_state_core((lean_object *)lean_to_promise(signal->m_promise)->m_result) == 2;
return signal->m_promise == NULL || lean_io_get_task_state_core((lean_object *)lean_to_promise(signal->m_promise)->m_result) == 2;
}
void handle_signal_event(uv_signal_t* handle, int signum) {
@@ -47,7 +47,6 @@ void handle_signal_event(uv_signal_t* handle, int signum) {
lean_uv_signal_object * signal = lean_to_uv_signal(obj);
lean_assert(signal->m_state == SIGNAL_STATE_RUNNING);
lean_assert(signal->m_promise != NULL);
if (signal->m_repeating) {
if (!signal_promise_is_finished(signal)) {
@@ -55,13 +54,14 @@ void handle_signal_event(uv_signal_t* handle, int signum) {
lean_dec(res);
}
} else {
lean_assert(!signal_promise_is_finished(signal));
if (signal->m_promise != NULL) {
lean_object* res = lean_io_promise_resolve(lean_box(signum), signal->m_promise, lean_io_mk_world());
lean_dec(res);
}
uv_signal_stop(signal->m_uv_signal);
signal->m_state = SIGNAL_STATE_FINISHED;
lean_object* res = lean_io_promise_resolve(lean_box(signum), signal->m_promise, lean_io_mk_world());
lean_dec(res);
lean_dec(obj);
}
}
@@ -154,21 +154,24 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_next(b_obj_arg obj, obj_arg /
}
case SIGNAL_STATE_RUNNING:
{
lean_assert(signal->m_promise != NULL);
// 2 indicates finished
if (signal_promise_is_finished(signal)) {
lean_dec(signal->m_promise);
if (signal->m_promise != NULL) {
lean_dec(signal->m_promise);
}
signal->m_promise = create_promise();
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
} else {
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
}
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
}
case SIGNAL_STATE_FINISHED:
{
lean_assert(signal->m_promise != NULL);
if (signal->m_promise == NULL) {
lean_object* finished_promise = create_promise();
return lean_io_result_mk_ok(finished_promise);
}
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
}
@@ -176,11 +179,12 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_next(b_obj_arg obj, obj_arg /
} else {
if (signal->m_state == SIGNAL_STATE_INITIAL) {
return setup_signal();
} else {
lean_assert(signal->m_promise != NULL);
} else if (signal->m_promise != NULL) {
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
} else {
lean_object* finished_promise = create_promise();
return lean_io_result_mk_ok(finished_promise);
}
}
}
@@ -190,12 +194,15 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg obj, obj_arg /
lean_uv_signal_object * signal = lean_to_uv_signal(obj);
if (signal->m_state == SIGNAL_STATE_RUNNING) {
lean_assert(signal->m_promise != NULL);
event_loop_lock(&global_ev);
int result = uv_signal_stop(signal->m_uv_signal);
event_loop_unlock(&global_ev);
if (signal->m_promise != NULL) {
lean_dec(signal->m_promise);
signal->m_promise = NULL;
}
signal->m_state = SIGNAL_STATE_FINISHED;
// The loop does not need to keep the signal alive anymore.
@@ -211,6 +218,30 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg obj, obj_arg /
}
}
/* Std.Internal.UV.Signal.cancel (signal : @& Signal) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */) {
lean_uv_signal_object * signal = lean_to_uv_signal(obj);
// It's locking here to avoid changing the state during other operations.
event_loop_lock(&global_ev);
if (signal->m_state == SIGNAL_STATE_RUNNING && signal->m_promise != NULL) {
if (signal->m_repeating) {
lean_dec(signal->m_promise);
signal->m_promise = NULL;
} else {
uv_signal_stop(signal->m_uv_signal);
lean_dec(signal->m_promise);
signal->m_promise = NULL;
signal->m_state = SIGNAL_STATE_INITIAL;
lean_dec(obj);
}
}
event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(lean_box(0));
}
#else
@@ -235,6 +266,13 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg signal, obj_ar
);
}
/* Std.Internal.UV.Signal.cancel (signal : @& Signal) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
#endif
}

View File

@@ -51,5 +51,6 @@ static inline lean_uv_signal_object* lean_to_uv_signal(lean_object * o) { return
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_mk(uint32_t signum_obj, uint8_t repeating, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_next(b_obj_arg signal, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg signal, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */);
}

View File

@@ -231,7 +231,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
lean_dec(socket);
lean_dec(data_array);
free(bufs);
free(write_uv->data);
free(write_uv);
@@ -524,6 +524,66 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket, obj_arg
return lean_io_result_mk_ok(promise);
}
/* Std.Internal.UV.TCP.Socket.tryAccept (socket : @& Socket) : IO (Except IO.Error (Option Socket)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_accept(b_obj_arg socket, obj_arg /* w */) {
lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket);
// Locking early prevents potential parallelism issues setting m_promise_accept.
event_loop_lock(&global_ev);
if (tcp_socket->m_promise_accept != nullptr) {
return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, mk_string("parallel accept is not allowed! consider binding multiple sockets to the same address and accepting on them instead")));
}
lean_object* client = lean_io_result_take_value(lean_uv_tcp_new(lean_box(0)));
lean_uv_tcp_socket_object* client_socket = lean_to_uv_tcp_socket(client);
int result = uv_accept((uv_stream_t*)tcp_socket->m_uv_tcp, (uv_stream_t*)client_socket->m_uv_tcp);
if (result < 0 && result != UV_EAGAIN) {
event_loop_unlock(&global_ev);
lean_dec(client);
return lean_io_result_mk_error(lean_decode_uv_error(result, NULL));
} else if (result >= 0) {
event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(mk_except_ok(lean::mk_option_some(client)));
} else {
event_loop_unlock(&global_ev);
lean_dec(client);
return lean_io_result_mk_ok(mk_except_ok(lean::mk_option_none()));
}
}
/* Std.Internal.UV.TCP.Socket.cancelAccept (socket : @& Socket) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_accept(b_obj_arg socket, obj_arg /* w */) {
lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket);
event_loop_lock(&global_ev);
if (tcp_socket->m_promise_accept == nullptr) {
event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(lean_box(0));
}
lean_object* promise = tcp_socket->m_promise_accept;
lean_dec(promise);
tcp_socket->m_promise_accept = nullptr;
lean_object* client = tcp_socket->m_client;
if (client != nullptr) {
lean_dec(client);
tcp_socket->m_client = nullptr;
}
lean_dec(socket);
event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.TCP.Socket.shutdown (socket : @& Socket) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_shutdown(b_obj_arg socket, obj_arg /* w */) {
lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket);
@@ -689,6 +749,12 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_accept(b_obj_arg socket, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")

View File

@@ -35,7 +35,6 @@ typedef struct {
// Tcp socket object manipulation functions.
static inline lean_object* lean_uv_tcp_socket_new(lean_uv_tcp_socket_object* s) { return lean_alloc_external(g_uv_tcp_socket_external_class, s); }
static inline lean_uv_tcp_socket_object* lean_to_uv_tcp_socket(lean_object* o) { return (lean_uv_tcp_socket_object*)(lean_get_external_data(o)); }
#endif
// =======================================
@@ -50,7 +49,10 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, ob
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t backlog, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_accept(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_accept(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_shutdown(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_accept(b_obj_arg socket, obj_arg /* w */);
// =======================================
// TCP Socket Utility Functions

View File

@@ -273,7 +273,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_cancel(b_obj_arg obj, obj_arg
lean_dec(timer->m_promise);
timer->m_promise = NULL;
timer->m_state = TIMER_STATE_FINISHED;
timer->m_state = TIMER_STATE_INITIAL;
// The loop does not need to keep the timer alive anymore.
lean_dec(obj);

View File

@@ -12,7 +12,7 @@ def testClient (addr : Net.SocketAddress) : Async String := do
Selectable.one #[
.case ( Selector.sleep 1000) fun _ => return "Timeout",
.case ( client.recvSelector 4096) fun data? => do
.case (client.recvSelector 4096) fun data? => do
if let some data := data? then
return String.fromUTF8! data
else
@@ -65,7 +65,7 @@ def testClient (addr : Net.SocketAddress) : Async String := do
Selectable.one #[
.case ( Selector.sleep 1000) fun _ => return "Timeout",
.case ( client.recvSelector 4096) fun (data, _) => do
.case (client.recvSelector 4096) fun (data, _) => do
return String.fromUTF8! data
]

View File

@@ -5,9 +5,10 @@ open Std Internal IO Async
def test1 : Async Nat := do
let s1 Sleep.mk 1000
let s2 Sleep.mk 1500
Selectable.one #[
.case ( s2.selector) fun _ => return 2,
.case ( s1.selector) fun _ => return 1,
.case (s2.selector) fun _ => return 2,
.case (s1.selector) fun _ => return 1,
]
/-- info: 1 -/

View File

@@ -77,9 +77,6 @@ def cancelBehaviorNoRepeat : IO Unit := do
assert! ( IO.getTaskState prom.result?) != IO.TaskState.finished
timer.cancel
assert! ( IO.getTaskState prom.result?) == IO.TaskState.finished
let prom timer.next
assert! ( IO.getTaskState prom.result?) == IO.TaskState.finished
assert! ( IO.getTaskState prom.result?) == IO.TaskState.finished
def stopBehaviorNoRepeat : IO Unit := do
let timer Timer.mk BASE_DURATION.toUInt64 false