Compare commits

...

3 Commits

Author SHA1 Message Date
Sofia Rodrigues
39c30b60d1 fix: free bufs 2025-09-21 18:10:52 -03:00
Sofia Rodrigues
de9fab6711 fix: free bufs 2025-09-21 18:09:55 -03:00
Sofia Rodrigues
d46924e08f feat: add vectored write and fix rc issue in tcp and udp cancel functions 2025-09-20 13:44:51 -03:00
8 changed files with 86 additions and 28 deletions

View File

@@ -118,12 +118,19 @@ Connects the client socket to the given address.
def connect (s : Client) (addr : SocketAddress) : Async Unit :=
Async.ofPromise <| s.native.connect addr
/--
Sends multiple data buffers through the client socket.
-/
@[inline]
def sendAll (s : Client) (data : Array ByteArray) : Async Unit :=
Async.ofPromise <| s.native.send data
/--
Sends data through the client socket.
-/
@[inline]
def send (s : Client) (data : ByteArray) : Async Unit :=
Async.ofPromise <| s.native.send data
Async.ofPromise <| s.native.send #[data]
/--
Receives data from the client socket. If data is received, its wrapped in .some. If EOF is reached,

View File

@@ -61,13 +61,21 @@ automatically sent to that destination.
def connect (s : Socket) (addr : SocketAddress) : IO Unit :=
s.native.connect addr
/--
Sends multiple data buffers through an UDP socket. The `addr` parameter specifies the destination
address. If `addr` is `none`, the data is sent to the default peer address set by `connect`.
-/
@[inline]
def sendAll (s : Socket) (data : Array ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
Async.ofPromise <| s.native.send data addr
/--
Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr`
is `none`, the data is sent to the default peer address set by `connect`.
-/
@[inline]
def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
Async.ofPromise <| s.native.send data addr
Async.ofPromise <| s.native.send #[data] addr
/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive.

View File

@@ -47,7 +47,7 @@ opaque connect (socket : @& Socket) (addr : @& SocketAddress) : IO (IO.Promise (
Sends data through a TCP socket.
-/
@[extern "lean_uv_tcp_send"]
opaque send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit))
opaque send (socket : @& Socket) (data : Array ByteArray) : IO (IO.Promise (Except IO.Error Unit))
/--
Receives data from a TCP socket with a maximum size of size bytes. The promise resolves when data is

View File

@@ -55,7 +55,7 @@ Sends data through an UDP socket. The `addr` parameter specifies the destination
is `none`, the data is sent to the default peer address set by `connect`.
-/
@[extern "lean_uv_udp_send"]
opaque send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit))
opaque send (socket : @& Socket) (data : Array ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit))
/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise

View File

@@ -22,6 +22,7 @@ typedef struct {
lean_object* promise;
lean_object* data;
lean_object* socket;
uv_buf_t* bufs;
} tcp_send_data;
// =======================================
@@ -164,14 +165,31 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_
return lean_io_result_mk_ok(promise);
}
/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data, obj_arg /* w */) {
/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : Array ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data_array, obj_arg /* w */) {
lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket);
size_t data_len = lean_sarray_size(data);
char* data_str = (char*)lean_sarray_cptr(data);
size_t array_len = lean_array_size(data_array);
uv_buf_t buf = uv_buf_init(data_str, data_len);
if (array_len == 0) {
lean_dec(data_array);
lean_object* promise = lean_promise_new();
mark_mt(promise);
lean_promise_resolve_with_code(0, promise);
return lean_io_result_mk_ok(promise);
}
// Allocate buffer array for uv_write
uv_buf_t* bufs = (uv_buf_t*)malloc(array_len * sizeof(uv_buf_t));
for (size_t i = 0; i < array_len; i++) {
lean_object* byte_array = lean_array_get_core(data_array, i);
size_t data_len = lean_sarray_size(byte_array);
char* data_str = (char*)lean_sarray_cptr(byte_array);
bufs[i] = uv_buf_init(data_str, data_len);
}
lean_object* promise = lean_promise_new();
mark_mt(promise);
@@ -181,8 +199,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
tcp_send_data* send_data = (tcp_send_data*)write_uv->data;
send_data->promise = promise;
send_data->data = data;
send_data->data = data_array;
send_data->socket = socket;
send_data->bufs = bufs;
// These objects are going to enter the loop and be owned by it
lean_inc(promise);
@@ -190,7 +209,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
event_loop_lock(&global_ev);
int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, &buf, 1, [](uv_write_t* req, int status) {
int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, bufs, array_len, [](uv_write_t* req, int status) {
tcp_send_data* tup = (tcp_send_data*) req->data;
lean_promise_resolve_with_code(status, tup->promise);
@@ -199,6 +218,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
lean_dec(tup->data);
lean_dec(tup->socket);
free(tup->bufs);
free(req->data);
free(req);
});
@@ -209,8 +229,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
lean_dec(promise); // The structure does not own it.
lean_dec(promise); // We are not going to return it.
lean_dec(socket);
lean_dec(data);
lean_dec(data_array);
free(bufs);
free(write_uv->data);
free(write_uv);
@@ -385,7 +406,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, ob
tcp_socket->m_byte_array = nullptr;
}
lean_dec((lean_object*)tcp_socket);
lean_dec(socket);
event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(lean_box(0));
@@ -709,4 +730,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_keepalive(b_obj_arg socket, int3
}
#endif
}
}

View File

@@ -43,7 +43,7 @@ static inline lean_uv_tcp_socket_object* lean_to_uv_tcp_socket(lean_object* o) {
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_new(obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data_array, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_wait_readable(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, obj_arg /* w */);

View File

@@ -16,6 +16,7 @@ typedef struct {
lean_object *promise;
lean_object *data;
lean_object *socket;
uv_buf_t* bufs;
} udp_send_data;
void lean_uv_udp_socket_finalizer(void* ptr) {
@@ -123,14 +124,30 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) {
/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : Array ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data_array, b_obj_arg opt_addr, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);
size_t data_len = lean_sarray_size(data);
char* data_str = (char*)lean_sarray_cptr(data);
size_t array_len = lean_array_size(data_array);
uv_buf_t buf = uv_buf_init(data_str, data_len);
if (array_len == 0) {
lean_dec(data_array);
lean_object* promise = lean_promise_new();
mark_mt(promise);
lean_promise_resolve_with_code(0, promise);
return lean_io_result_mk_ok(promise);
}
uv_buf_t* bufs = (uv_buf_t*)malloc(array_len * sizeof(uv_buf_t));
for (size_t i = 0; i < array_len; i++) {
lean_object* byte_array = lean_array_get_core(data_array, i);
size_t data_len = lean_sarray_size(byte_array);
char* data_str = (char*)lean_sarray_cptr(byte_array);
bufs[i] = uv_buf_init(data_str, data_len);
}
lean_object* promise = lean_promise_new();
mark_mt(promise);
@@ -140,8 +157,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d
udp_send_data* send_data = (udp_send_data*)send_uv->data;
send_data->promise = promise;
send_data->data = data;
send_data->data = data_array;
send_data->socket = socket;
send_data->bufs = bufs;
// These objects are going to enter the loop and be owned by it
lean_inc(promise);
@@ -157,7 +175,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d
event_loop_lock(&global_ev);
int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, &buf, 1, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) {
int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, bufs, array_len, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) {
udp_send_data* tup = (udp_send_data*) req->data;
lean_promise_resolve_with_code(status, tup->promise);
@@ -165,6 +183,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d
lean_dec(tup->socket);
lean_dec(tup->data);
free(tup->bufs);
free(req->data);
free(req);
});
@@ -179,7 +198,8 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d
lean_dec(promise); // The structure does not own it.
lean_dec(promise); // We are not going to return it.
lean_dec(socket); // The loop does not own the object.
lean_dec(data); // The data is owned.
lean_dec(data_array); // The data is owned.
free(bufs);
free(send_uv->data);
free(send_uv);
@@ -344,10 +364,12 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_wait_readable(b_obj_arg socket,
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);
lean_inc(socket);
event_loop_lock(&global_ev);
if (udp_socket->m_promise_read == nullptr) {
event_loop_unlock(&global_ev);
lean_dec(socket);
return lean_io_result_mk_ok(lean_box(0));
}
@@ -358,14 +380,14 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, ob
udp_socket->m_promise_read = nullptr;
lean_object* byte_array = udp_socket->m_byte_array;
if (byte_array != nullptr) {
lean_dec(byte_array);
udp_socket->m_byte_array = nullptr;
}
lean_dec((lean_object*)udp_socket);
event_loop_unlock(&global_ev);
lean_dec(socket);
return lean_io_result_mk_ok(lean_box(0));
}
@@ -601,4 +623,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32
}
#endif
}
}

View File

@@ -41,7 +41,7 @@ static inline lean_uv_udp_socket_object* lean_to_uv_udp_socket(lean_object * o)
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data_array, b_obj_arg opt_addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_wait_readable(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, obj_arg /* w */);