Skip to content

Commit

Permalink
feat: Add support for work queues; cleanup stream requests.
Browse files Browse the repository at this point in the history
joehendrix committed Dec 21, 2023
1 parent 35334f4 commit 0b29e79
Showing 8 changed files with 202 additions and 165 deletions.
1 change: 1 addition & 0 deletions LibUV.lean
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-- This module serves as the root of the `Libuv` library.
-- Import modules here that should be built as part of the library.
import LibUV.Handle
import LibUV.Work
6 changes: 2 additions & 4 deletions LibUV/Loop.lean
Original file line number Diff line number Diff line change
@@ -14,10 +14,6 @@ deriving Inhabited, Repr

alloy c section

#define LUV_EALREADY 0
#define LUV_ECANCELED 1
#define LUV_EINVAL 2
#define LUV_ETIMEDOUT 3

LEAN_EXPORT lean_object* lean_uv_error_mk(int code) {
if (code >= 0)
@@ -46,6 +42,7 @@ end
protected inductive Error where
| errorcode : ErrorCode → UV.Error
| user : String → UV.Error
deriving Repr

@[export lean_uv_error_errorcode]
def UV.Error.errorcode_c := Error.errorcode
@@ -164,6 +161,7 @@ static void stop_handles(uv_handle_t* h, void* arg) {
}

static void Loop_finalize(void* ptr) {
printf("Loop_finalize\n");
uv_loop_t* loop = (uv_loop_t*) ptr;
int err = uv_loop_close(loop);
if (err == UV_EBUSY) {
6 changes: 6 additions & 0 deletions LibUV/NonemptyProp.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def NonemptyProp := Subtype fun α : Prop => Nonempty α

instance : Inhabited NonemptyProp := ⟨⟨PUnit, ⟨⟨⟩⟩⟩⟩

/-- The underlying type of a `NonemptyType`. -/
abbrev NonemptyProp.type (type : NonemptyProp) : Prop := type.val
208 changes: 49 additions & 159 deletions LibUV/Stream.lean
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
import LibUV.Loop
import LibUV.NonemptyProp

open scoped Alloy.C
alloy c include <stdlib.h> <lean_uv.h>

section NonemptyProp


def NonemptyProp := Subtype fun α : Prop => Nonempty α

instance : Inhabited NonemptyProp := ⟨⟨PUnit, ⟨⟨⟩⟩⟩⟩

/-- The underlying type of a `NonemptyType`. -/
abbrev NonemptyProp.type (type : NonemptyProp) : Prop := type.val

end NonemptyProp

namespace UV

section StreamDeclaration
@@ -95,7 +84,7 @@ static void Req_foreach(void* ptr, b_lean_obj_arg f) {
fatal_st_only("Req");
}

static void Shutdown_finalize(void* ptr) {
static void ShutdownFree(void* ptr) {
lean_uv_shutdown_t* req = (lean_uv_shutdown_t*) ptr;
// Release counter on handle.
lean_dec_ref(req->uv.handle->data);
@@ -105,23 +94,6 @@ static void Shutdown_finalize(void* ptr) {
free(req);
}
}

static lean_external_class * shutdown_class = NULL;
end

/--
This returns the handle associated with the shutdown request.
-/
@[extern "lean_uv_shutdown_req_handle"]
opaque ShutdownReq.handle [Inhabited α] (req : @&ShutdownReq α) : α

alloy c section
lean_obj_res lean_uv_shutdown_req_handle(b_lean_obj_arg reqObj, b_lean_obj_arg _rw) {
lean_uv_shutdown_t* req = lean_get_external_data(reqObj);
lean_object* hdl = req->uv.handle->data;
lean_inc(hdl);
return hdl;
}
end

/--
@@ -130,7 +102,7 @@ It waits for pending write requests to complete.
The cb is called after shutdown is complete.
-/
@[extern "lean_uv_shutdown"]
opaque Stream.shutdown [Stream α] (handle : α) (cb : Bool → BaseIO Unit) : UV.IO (ShutdownReq α)
opaque Stream.shutdown [Stream α] (handle : @&α) (cb : Bool → UV.IO Unit) : UV.IO Unit

alloy c section

@@ -146,35 +118,22 @@ static void invoke_rec_callback(uv_loop_t* loop, uv_req_t* req, lean_object** cb
check_callback_result(loop, lean_apply_2(cb, status, lean_io_mk_world()));
}

static lean_obj_res mk_req(lean_external_class* cl, void* req) {
lean_object* reqObj = lean_alloc_external(cl, req);
((uv_req_t*) req)->data = reqObj;
return reqObj;
}

static void shutdown_cb(uv_shutdown_t *req, int status) {
lean_uv_shutdown_t* lreq = (lean_uv_shutdown_t*) req;
invoke_rec_callback(req->handle->loop, (uv_req_t*) req, &lreq->callback, lean_bool(status == 0));
}

lean_obj_res lean_uv_shutdown(b_lean_obj_arg handle, lean_obj_arg cb, b_lean_obj_arg _rw) {
lean_uv_shutdown_t* req = malloc(sizeof(lean_uv_shutdown_t));
if (shutdown_class == NULL)
shutdown_class = lean_register_external_class(Shutdown_finalize, Req_foreach);
lean_object* reqObj = mk_req(shutdown_class, req);
uv_stream_t* hdl = lean_get_external_data(handle);
int ec = uv_shutdown(&req->uv, hdl, &shutdown_cb);
if (ec < 0) {
// Release callback
lean_dec_ref(cb);
req->callback = 0;
// Make sure handle is initialized to avoid special case in Shutdown_finalize.
req->uv.handle = hdl;
lean_free_object(reqObj);
free(req);
fatal_error("uv_shutdown_req failed (error = %d)", ec);
}
req->callback = cb;
return lean_io_result_mk_ok(reqObj);
return lean_io_result_mk_ok(lean_unit());
}

end
@@ -412,7 +371,7 @@ This stops reading data from the stream.
It always succeeds even if data is not being read from stream.
-/
@[extern "lean_uv_read_stop"]
opaque Stream.read_stop [Stream α] (stream : @&α) : BaseIO Unit
opaque Stream.read_stop [Stream α] (stream : @&α) : UV.IO Unit

alloy c section
lean_object* lean_uv_read_stop(lean_object* stream_obj) {
@@ -459,25 +418,10 @@ The external data of a ShutdownReq in Lean is a lean_uv_shutdown_t req where:
*/
struct lean_uv_write_s {
uv_write_t uv;
lean_object* callback;
uv_buf_t* bufs;
size_t bufcnt;
lean_object* array;
};

typedef struct lean_uv_write_s lean_uv_write_t;

static void Write_finalize(void* ptr) {
lean_uv_write_t* req = (lean_uv_write_t*) ptr;
// Release counter on handle.
lean_dec_ref(req->uv.handle->data);
if (req->callback) {
req->uv.data = 0;
} else {
free(req);
}
}

static lean_external_class * write_class = NULL;
end

/--
@@ -498,27 +442,21 @@ end
end Write

@[extern "lean_uv_write"]
opaque Stream.write [Stream α] (stream : α) (bufs : @&Array ByteArray)
(callback : Bool → UV.IO Unit) : UV.IO (WriteReq α)
opaque Stream.write [Stream α] (stream : @&α) (bufs : @&Array ByteArray)
(callback : Bool → UV.IO Unit) : UV.IO Unit

alloy c section
static void write_cb(uv_write_t *req, int status) {
lean_uv_write_t* lreq = (lean_uv_write_t*) req;
uv_loop_t* loop = req->handle->loop;
lean_object* success = lean_bool(status == 0);
// If the request object has been freed, then we can free the request
// object as well.
lean_object* cb = lreq->callback;
if (req->data) {
lreq->callback = 0;
uv_buf_t* bufs = lreq->bufs;
uv_buf_t* bufEnd = bufs + lreq->bufcnt;
for (uv_buf_t* buf = bufs; buf != bufEnd; ++buf)
dec_buf_array(buf);
free(bufs);
} else {
free(req);
}
lean_object* cb = req->data;

lean_uv_write_t* lreq = (lean_uv_write_t*) req;
// Release array references
lean_dec_ref(lreq->array);
free(req);
check_callback_result(loop, lean_apply_2(cb, success, lean_io_mk_world()));
}

@@ -530,29 +468,26 @@ lean_obj_res lean_uv_write(b_lean_obj_arg streamObj, b_lean_obj_arg bufObj,
}

lean_uv_write_t* req = checked_malloc(sizeof(lean_uv_write_t));
if (write_class == NULL)
write_class = lean_register_external_class(Write_finalize, Req_foreach);
lean_object* reqObj = mk_req(write_class, req);
uv_stream_t* hdl = lean_get_external_data(streamObj);

lean_object** bufObjs = lean_array_cptr(bufObj);

uv_buf_t* bufArray = malloc(sizeof(uv_buf_t) * nbufs);
uv_buf_t* bufArray = checked_malloc(sizeof(uv_buf_t) * nbufs);
for (size_t i = 0; i != nbufs; ++i) {
init_buf(bufArray + i, bufObjs[i]);
lean_object* a = bufObjs[i];
uv_buf_t* buf = bufArray + i;
buf->base = (char*) lean_sarray_cptr(a);
buf->len = lean_sarray_size(a);
}

int ec = uv_write(&req->uv, hdl, bufArray, nbufs, &write_cb);
free(bufArray);
if (ec < 0) {
lean_dec_ref(callback);
req->callback = 0;
req->uv.handle = hdl;
lean_free_object(req->uv.data);
lean_dec_ref(bufObj);
free(req);
fatal_error("uv_write failed (error = %d)", ec);
} else {
req->callback = callback;
return lean_io_result_mk_ok(reqObj);
}
req->uv.data = callback;
req->array = bufObj;
return lean_io_result_mk_ok(lean_unit());
}
end

@@ -646,11 +581,14 @@ alloy c opaque_extern_type TCP => uv_tcp_t := {
}

alloy c extern "lean_uv_tcp_init"
def Loop.mkTCP (loop : Loop) : BaseIO TCP := {
def Loop.mkTCP (loop : Loop) : UV.IO TCP := {
lean_uv_tcp_t* tcp = checked_malloc(sizeof(lean_uv_tcp_t));
init_stream_callbacks(&tcp->callbacks);
tcp->connecting = false;
uv_tcp_init(of_loop(loop), &tcp->uv);
int ec = uv_tcp_init(of_loop(loop), &tcp->uv);
if (ec < 0) {
fatal_error("Loop.mkTCP failed %d\n", ec);
}
lean_object* r = to_lean<TCP>(&tcp->uv);
tcp->uv.data = r;
return lean_io_result_mk_ok(r);
@@ -674,47 +612,6 @@ def bind (tcp : @&TCP) (addr : @&SockAddr) : UV.IO Unit := {
return lean_io_unit_result_ok();
}

/-- References -/
opaque ConnectReqPointed : NonemptyType.{0}

/-- A shutdown request -/
structure ConnectReq (α : Type) : Type where
ref : ConnectReqPointed.type

instance : Nonempty (ConnectReq α) :=
Nonempty.intro { ref := Classical.choice ConnectReqPointed.property }


alloy c section
/*
The external data of a ConnectReq in Lean is a lean_uv_connect_t req where:

req.uv.handle is a pointer to a uv_stream_t.
req.uv.data is a pointer to the Lean shutdown_req object. This is set
to null if the shutdown request memory is released.
req.callback is a pointer to the callback to invoke when the shutdown completes.
This is set to null after the callback returns.
*/
struct lean_uv_connect_s {
uv_connect_t uv;
lean_object* callback;
};

typedef struct lean_uv_connect_s lean_uv_connect_t;

static void Connect_finalize(void* ptr) {
lean_uv_connect_t* req = (lean_uv_connect_t*) ptr;
lean_dec_ref(req->uv.handle->data);
if (req->callback) {
req->uv.data = 0;
} else {
free(req);
}
}

static lean_external_class * connect_class = NULL;
end

inductive ConnectionResult where
| ok : ConnectionResult
| canceled : ConnectionResult
@@ -740,43 +637,36 @@ static void tcp_connect_cb(uv_connect_t *req, int status) {
}
break;
}

lean_uv_connect_t* lreq = (lean_uv_connect_t*) req;
lean_uv_tcp_t* luv_tcp = lean_stream_base((uv_handle_t*) req->handle);
uv_handle_t* handle = (uv_handle_t*) req->handle;
lean_uv_tcp_t* luv_tcp = lean_stream_base(handle);
luv_tcp->connecting = false;
invoke_rec_callback(req->handle->loop, (uv_req_t*) req, &lreq->callback, statusObj);

// If the request object has been freed, then we can free the request
// object as well.
lean_object* cb = req->data;
free(req);
check_callback_result(handle->loop, lean_apply_2(cb, statusObj, lean_io_mk_world()));
}

end

alloy c extern "lean_uv_tcp_connect"
def connect (tcp : TCP) (addr : @&SockAddr) (callback : ConnectionResult → UV.IO Unit) : UV.IO (ConnectReq TCP) := {
lean_uv_tcp_t* luv_tcp = lean_stream_base(lean_get_external_data(tcp));
uv_tcp_t* uv_tcp = &luv_tcp->uv;
uv_stream_t* hdl = (uv_stream_t*) uv_tcp;
if (luv_tcp->connecting) {
def connect (tcp : @&TCP) (addr : @&SockAddr) (callback : ConnectionResult → UV.IO Unit) : UV.IO Unit := {
uv_tcp_t* uv_tcp = lean_get_external_data(tcp);
lean_uv_tcp_t* luv_tcp = lean_stream_base((uv_handle_t*) uv_tcp);
if (luv_tcp->connecting)
return lean_uv_io_error(UV_EINVAL);
}
lean_uv_connect_t* req = malloc(sizeof(lean_uv_connect_t));

if (connect_class == NULL)
connect_class = lean_register_external_class(Connect_finalize, Req_foreach);
lean_object* reqObj = mk_req(connect_class, req);
req->callback = callback;

uv_connect_t* req = malloc(sizeof(uv_connect_t));
const struct sockaddr *uv_addr = of_lean<SockAddr>(addr);

int ec = uv_tcp_connect(&req->uv, uv_tcp, uv_addr, &tcp_connect_cb);
int ec = uv_tcp_connect(req, uv_tcp, uv_addr, &tcp_connect_cb);
if (ec < 0) {
lean_dec_ref(tcp);
lean_dec_ref(callback);
req->callback = 0;
// Make sure handle is initialized to avoid special case in Shutdown_finalize.
req->uv.handle = hdl;
lean_free_object(reqObj);
free(req);
fatal_error("uv_tcp_connect failed (error = %d)", ec);
}
req->data = callback;
luv_tcp->connecting = true;
return lean_io_result_mk_ok(reqObj);
return lean_io_result_mk_ok(lean_unit());
}

def listen (tcp : TCP) (backlog : UInt32) (callback : UV.IO Unit) : UV.IO Unit :=
125 changes: 125 additions & 0 deletions LibUV/Work.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import LibUV.Loop

open scoped Alloy.C
alloy c include <stdlib.h> <lean_uv.h>

namespace UV

/-- References -/
opaque WorkPointed : NonemptyType.{0}

/-- A shutdown request -/
structure Work : Type where
ref : WorkPointed.type

instance : Nonempty Work :=
Nonempty.intro { ref := Classical.choice WorkPointed.property }

alloy c section
/*
The external data of a ShutdownReq in Lean is a lean_uv_shutdown_t req where:

req.uv.handle is a pointer to a uv_stream_t.
req.uv.data is a pointer to the Lean shutdown_req object. This is set
to null if the shutdown request memory is released.
req.callback is a pointer to the callback to invoke when the shutdown completes.
This is set to null after the callback returns.
*/
struct lean_uv_work_s {
uv_work_t uv;
// Callback until work_cb is called and work result to pass to after_work_cb if it is done.
lean_object* work_cb;
// Callback to run after work completes or null after work is all done.
lean_object* after_work_cb;
};

typedef struct lean_uv_work_s lean_uv_work_t;

static void Work_foreach(void* ptr, b_lean_obj_arg f) {
}

static void Work_finalize(void* ptr) {
lean_uv_work_t* req = (lean_uv_work_t*) ptr;
if (req->after_work_cb) {
req->uv.data = 0;
} else {
free(req);
}
}

static uv_once_t work_class_once = UV_ONCE_INIT;
static lean_external_class* work_class = NULL;

static void initWorkClass(void) {
work_class = lean_register_external_class(Work_finalize, Work_foreach);
}
end


@[extern "lean_uv_work_cancel"]
opaque Work.cancel (work : @&Work) : UV.IO Unit
-- TODO: Implement me

/--
Queue a work item
-/
@[extern "lean_uv_queue_work"]
opaque Loop.queue_work (loop : @&Loop) (work_cb : UV.IO α) (after_work_cb : Except UV.Error α → UV.IO Unit) : UV.IO Work

alloy c section

LEAN_EXPORT void lean_init_thread_heap(void);

static void lean_uv_work_cb(uv_work_t *req) {
lean_init_thread_heap();
lean_uv_work_t* lreq = (lean_uv_work_t*) req;
lean_object* cb = lreq->work_cb;
lreq->work_cb = lean_apply_1(cb, lean_io_mk_world());
}

static void lean_uv_after_work_cb(uv_work_t *req, int status) {
lean_uv_work_t* lreq = (lean_uv_work_t*) req;
uv_loop_t* loop = req->loop;
lean_object* cb = lreq->after_work_cb;
lean_object* result;
if (status >= 0) {
lean_object* res = lreq->work_cb;
unsigned tag = 1 - lean_ptr_tag(res);
result = lean_alloc_ctor(tag, 1, 0);
lean_object* val = lean_ctor_get(res, 0);
lean_inc(val);
lean_dec_ref(res);
lean_ctor_set(result, 0, val);
} else if (status == UV_ECANCELED) {
result = lean_alloc_ctor(0, 1, 0);
lean_ctor_set(result, 0, lean_box(LUV_ECANCELED));
} else {
fatal_error("lean_uv_after_work_cb had unexpected failure %d.\n", status);
}
if (req->data != 0) {
req->after_work_cb = 0;
} else {
free(req);
}
check_callback_result(loop, lean_apply_2(cb, result, lean_io_mk_world()));
}

lean_obj_res lean_uv_queue_work(b_lean_obj_arg loopObj, lean_obj_arg work_cb, lean_obj_arg after_work_cb, b_lean_obj_arg _rw) {
uv_once(&work_class_once, initWorkClass);
uv_loop_t* loop = lean_get_external_data(loopObj);
lean_uv_work_t* req = checked_malloc(sizeof(lean_uv_work_t));
lean_mark_mt(work_cb);
req->work_cb = work_cb;
int ec = uv_queue_work(loop, &req->uv, lean_uv_work_cb, lean_uv_after_work_cb);
if (ec < 0) {
lean_dec_ref(work_cb);
lean_dec_ref(after_work_cb);
free(req);
fatal_error("uv_shutdown_req failed (error = %d)", ec);
}
lean_object* reqObj = lean_alloc_external(work_class, req);
req->after_work_cb = after_work_cb;
req->uv.data = reqObj;
return lean_io_result_mk_ok(reqObj);
}
end
10 changes: 10 additions & 0 deletions examples/work.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import LibUV

def main : IO Unit := UV.IO.run do
let l ← UV.mkLoop
let _ ← l.queue_work (UV.log s!"Hello" >>= fun _ => pure 1) fun (r : Except _ Nat) => do
match r with
| .error e => UV.log s!"Error {repr e}"
| .ok v => UV.log s!"Returned {v}"
let _ ← l.run
pure ()
7 changes: 6 additions & 1 deletion include/lean_uv.h
Original file line number Diff line number Diff line change
@@ -179,4 +179,9 @@ struct lean_uv_timer_s {
lean_object* callback; // Callback for timer event
};

typedef struct lean_uv_timer_s lean_uv_timer_t;
typedef struct lean_uv_timer_s lean_uv_timer_t;

#define LUV_EALREADY 0
#define LUV_ECANCELED 1
#define LUV_EINVAL 2
#define LUV_ETIMEDOUT 3
4 changes: 3 additions & 1 deletion scripts/runExamples.sh
Original file line number Diff line number Diff line change
@@ -2,8 +2,9 @@
lake build

search_dir=/the/path/to/base/dir
# Load dylibs for OSX (FIXME: Support Windows and Linux)
dylibs=""
for entry in "build/lib"/*.dylib;
for entry in ".lake/build/lib"/*.dylib;
do
dylibs+=" --load-dynlib=$entry"
done
@@ -21,6 +22,7 @@ then
runExample "examples/phases.lean"
runExample "examples/tcp.lean"
runExample "examples/timer.lean"
runExample "examples/work.lean"
else
runExample $1
fi

0 comments on commit 0b29e79

Please sign in to comment.