From 499f76bb267da4f5da0787743920fd8436ccf424 Mon Sep 17 00:00:00 2001 From: Joe Hendrix Date: Tue, 19 Dec 2023 17:13:49 -0800 Subject: [PATCH] feat: Additional bugfixes and cleanups to streams. --- LibUV/Check.lean | 23 +- LibUV/Idle.lean | 22 +- LibUV/Loop.lean | 98 ++++-- LibUV/Request.lean | 10 + LibUV/Stream.lean | 660 +++++++++++++++++++++++++++++++++-------- LibUV/Timer.lean | 42 +-- README.md | 26 +- examples/tcp.lean | 238 ++++----------- include/lean_uv.h | 38 ++- scripts/runExamples.sh | 1 + 10 files changed, 760 insertions(+), 398 deletions(-) create mode 100644 LibUV/Request.lean diff --git a/LibUV/Check.lean b/LibUV/Check.lean index dab87cb..c8144b8 100644 --- a/LibUV/Check.lean +++ b/LibUV/Check.lean @@ -20,22 +20,23 @@ static void Check_foreach(void* ptr, b_lean_obj_arg f) { fatal_st_only("Check"); } +// Close the check handle if the loop stops +void lean_uv_check_loop_stop(uv_handle_t* h) { + lean_uv_check_t* check = (lean_uv_check_t*) h; + lean_dec_optref(check->callback); + uv_close(h, (uv_close_cb) &free); +} + static void Check_finalize(void* ptr) { - lean_uv_check_t* check = (lean_uv_check_t*) ptr; - if (check->callback != NULL) { - check->uv.data = 0; - } else { - uv_close((uv_handle_t*) ptr, (uv_close_cb) &free); - } - // Release loop object. Note that this may free the loop object - lean_dec(loop_object(check->uv.loop)); + bool is_active = ((lean_uv_check_t*) ptr)->callback != NULL; + lean_uv_finalize_handle((uv_handle_t*) ptr, is_active); } static void check_invoke_callback(uv_check_t* check) { // Get callback and handler objects lean_uv_check_t* luv_check = (lean_uv_check_t*) check; lean_object* cb = luv_check->callback; lean_inc(cb); - check_callback_result(luv_check->uv.loop, lean_apply_1(cb, lean_box(0))); + check_callback_result(luv_check->uv.loop, lean_apply_1(cb, lean_io_mk_world())); } end @@ -61,7 +62,7 @@ alloy c extern "lean_uv_check_start" def Check.start (check : @&Check) (callback : UV.IO Unit) : UV.IO Unit := { lean_uv_check_t* luv_check = lean_get_external_data(check); if (luv_check->callback) { - lean_dec(luv_check->callback); + lean_dec_ref(luv_check->callback); } else { uv_check_start(&luv_check->uv, &check_invoke_callback); } @@ -77,7 +78,7 @@ def Check.stop (check : @&Check) : UV.IO Unit := { lean_uv_check_t* luv_check = lean_get_external_data(check); if (luv_check->callback) { uv_check_stop(&luv_check->uv); - lean_dec(luv_check->callback); + lean_dec_ref(luv_check->callback); luv_check->callback = 0; } return lean_io_unit_result_ok(); diff --git a/LibUV/Idle.lean b/LibUV/Idle.lean index 315108b..5aff80d 100644 --- a/LibUV/Idle.lean +++ b/LibUV/Idle.lean @@ -25,27 +25,29 @@ static lean_object** idle_callback(lean_uv_idle_t* p) { return &(p->callback); } +// Close the check handle if the loop stops +void lean_uv_idle_loop_stop(uv_handle_t* h) { + lean_uv_idle_t* idle = (lean_uv_idle_t*) h; + if (idle->callback != NULL) { + lean_dec_ref(idle->callback); + } + uv_close(h, (uv_close_cb) &free); +} + static void Idle_foreach(void* ptr, b_lean_obj_arg f) { fatal_st_only("Idle"); } static void Idle_finalize(void* ptr) { - lean_uv_idle_t* idle = ptr; - assert((idle->callback != null) == uv_is_active((uv_handle_t*) idle)); - if (idle->callback) { - idle->uv.data = 0; - } else { - uv_close((uv_handle_t*) ptr, (uv_close_cb) free); - } - // Release loop object. Note that this may free the loop object - lean_dec(loop_object(idle->uv.loop)); + bool is_active = ((lean_uv_idle_t*) ptr)->callback != NULL; + lean_uv_finalize_handle((uv_handle_t*) ptr, is_active); } static void idle_invoke_callback(uv_idle_t* idle) { lean_uv_idle_t* luv_idle = (lean_uv_idle_t*) idle; lean_object* cb = luv_idle->callback; lean_inc(cb); - check_callback_result(luv_idle->uv.loop, lean_apply_1(cb, lean_box(0))); + check_callback_result(luv_idle->uv.loop, lean_apply_1(cb, lean_io_mk_world())); } end diff --git a/LibUV/Loop.lean b/LibUV/Loop.lean index c4c2761..14ce336 100644 --- a/LibUV/Loop.lean +++ b/LibUV/Loop.lean @@ -5,26 +5,57 @@ alloy c include namespace UV -alloy c enum - ErrorCode => int - | EALREADY => UV_EALREADY - | EINVAL => UV_EINVAL - deriving Inhabited, Repr +inductive ErrorCode where +| EALREADY +| ECANCELED +| EINVAL +| ETIMEDOUT +deriving Inhabited, Repr + +alloy c section + +#define LUV_EALREADY 0 +#define LUV_ECANCELED 1 +#define LUV_EINVAL 2 +#define LUV_ETIMEDOUT 3 + +lean_object* lean_uv_error_mk(int code) { + if (code >= 0) + fatal_error("Unexpected success code %d.\n", code); + size_t r; + switch (code) { + case UV_EALREADY: + r = LUV_EALREADY; + break; + case UV_ECANCELED: + r = LUV_ECANCELED; + break; + case UV_EINVAL: + r = LUV_EINVAL; + break; + case UV_ETIMEDOUT: + r = LUV_ETIMEDOUT; + break; + default: + fatal_error("Unexpected error code %s.\n", uv_err_name(code)); + } + return lean_box(r); +} +end protected inductive Error where | errorcode : ErrorCode → UV.Error | user : String → UV.Error -attribute [export lean_uv_error_errorcode] UV.Error.errorcode +@[export lean_uv_error_errorcode] +def UV.Error.errorcode_c := Error.errorcode alloy c section lean_object* lean_uv_error_errorcode(lean_object* err); /* Returns an IO error for the given error code. */ -lean_object* lean_uv_io_error(int err) { - lean_object* r = lean_box(to_lean(err)); - r = lean_uv_error_errorcode(r); - return lean_io_result_mk_error(r); +extern lean_object* lean_uv_io_error(int err) { + return lean_io_result_mk_error(lean_uv_error_errorcode(lean_uv_error_mk(err))); } end @@ -35,12 +66,20 @@ protected def IO := EIO UV.Error protected def IO.run (act : UV.IO α) : IO α := do match ← act.toBaseIO with - | .error (.errorcode e) => dbg_trace "A" throw (IO.userError s!"UV.IO failed (error = {repr e})") - | .error (.user msg) => dbg_trace "B" throw (IO.userError msg) - | .ok r => dbg_trace "C" pure r + | .error (.errorcode e) => throw (IO.userError s!"UV.IO failed (error = {repr e})") + | .error (.user msg) => throw (IO.userError msg) + | .ok r => pure r + +@[extern "lean_uv_log"] +protected opaque log (s : @& String) : UV.IO Unit + +alloy c section +lean_object* lean_uv_log(b_lean_obj_arg s, b_lean_obj_arg rw) { + printf("%s\n", lean_string_cstr(s)); + return lean_io_result_mk_ok(lean_unit()); +} +end -protected opaque log (s : @& String) : UV.IO Unit := do - (IO.println s).toBaseIO >>= fun _ => pure () protected def fatal {α} (msg : String) : UV.IO α := (throw (.user msg) : EIO UV.Error α) @@ -70,19 +109,28 @@ private def raiseInvalidArgument (message:String) : UV.IO α := alloy c section -static void close_stream(uv_handle_t* h) { - free(lean_stream_base(h)); -} +void lean_uv_check_loop_stop(uv_handle_t* h); +void lean_uv_idle_loop_stop(uv_handle_t* h); +void lean_uv_tcp_loop_stop(uv_handle_t* h); +void lean_uv_timer_loop_stop(uv_handle_t* h); static void stop_handles(uv_handle_t* h, void* arg) { + if (uv_is_closing(h)) return; switch (h->type) { - case UV_NAMED_PIPE: + case UV_CHECK: + lean_uv_check_loop_stop(h); + break; + case UV_IDLE: + lean_uv_idle_loop_stop(h); + break; case UV_TCP: - case UV_TTY: - uv_close(h, &close_stream); + lean_uv_tcp_loop_stop(h); + break; + case UV_TIMER: + lean_uv_timer_loop_stop(h); break; default: - uv_close(h, (uv_close_cb) &free); + fatal_error("Unsupported handle type %s", uv_handle_type_name(h->type)); break; } } @@ -108,10 +156,14 @@ static void Loop_finalize(void* ptr) { } } +static void Loop_foreach(void* ptr, b_lean_obj_arg f) { + fatal_st_only("Loop"); +} + end alloy c extern_type Loop => lean_uv_loop_t := { - foreach := `lean_uv_null_foreach + foreach := `Loop_foreach finalize := `Loop_finalize } diff --git a/LibUV/Request.lean b/LibUV/Request.lean new file mode 100644 index 0000000..4f1c9f6 --- /dev/null +++ b/LibUV/Request.lean @@ -0,0 +1,10 @@ +import LibUV.Stream + +namespace UV + +inductive Req where + | connect : ConnectReq -> Req + | shutdown : ∀{H : Type}, ShutdownReq H -> Req + | write : WriteReq -> Req + +end UV diff --git a/LibUV/Stream.lean b/LibUV/Stream.lean index f3bd2ff..b6efccf 100644 --- a/LibUV/Stream.lean +++ b/LibUV/Stream.lean @@ -54,7 +54,7 @@ void init_stream_callbacks(lean_stream_callbacks_t* cbs) { static inline lean_stream_callbacks_t* stream_callbacks(uv_stream_t* stream) { - return ((lean_stream_callbacks_t*) stream) - 1; + return lean_stream_base((uv_handle_t*) stream); } end @@ -64,6 +64,16 @@ end StreamDeclaration section Shutdown +/-- References -/ +opaque ShutdownReqPointed : NonemptyType.{0} + +/-- A shutdown request -/ +structure ShutdownReq (α : Type) : Type where + ref : ShutdownReqPointed.type + +instance : Nonempty (ShutdownReq α) := + Nonempty.intro { ref := Classical.choice ShutdownReqPointed.property } + alloy c section /* The external data of a ShutdownReq in Lean is a lean_uv_shutdown_t req where: @@ -71,8 +81,8 @@ The external data of a ShutdownReq in Lean is a lean_uv_shutdown_t req where: req.uv.handle is a pointer to a uv_stream_t. req.uv.data is a pointer to the Lean shutdown_req object. This is set to null if the shutdown request memory is released. - req.callback is a pointer to the callback to invoke when the shutdown - completes. This is set to null after the callback returns. + req.callback is a pointer to the callback to invoke when the shutdown completes. + This is set to null after the callback returns. */ struct lean_uv_shutdown_s { uv_shutdown_t uv; @@ -81,8 +91,8 @@ struct lean_uv_shutdown_s { typedef struct lean_uv_shutdown_s lean_uv_shutdown_t; -static void Shutdown_foreach(void* ptr, b_lean_obj_arg f) { - fatal_st_only("ShutdownReq"); +static void Req_foreach(void* ptr, b_lean_obj_arg f) { + fatal_st_only("Req"); } static void Shutdown_finalize(void* ptr) { @@ -97,46 +107,13 @@ static void Shutdown_finalize(void* ptr) { } static lean_external_class * shutdown_class = NULL; - -static void shutdown_cb(uv_shutdown_t *req, int status) { - lean_object* success = lean_bool(status == 0); - lean_uv_shutdown_t* lreq = (lean_uv_shutdown_t*) req; - - uv_loop_t* loop = req->handle->loop; - - lean_object* cb = lreq->callback; - // If the request object has been freed, then we can free the request - // object as well. - if (lreq->uv.data) { - lreq->callback = 0; - } else { - free(lreq); - } - // N.B. We intentionally have not incremented reference count to - // `cb` so it may get finalized when returning from lean_apply. - check_callback_result(loop, lean_apply_1(cb, success)); -} end -/-- References -/ -opaque ShutdownReqPointed : NonemptyType.{0} - -/-- -A shutdown rewquest --/ -structure ShutdownReq (α : Type) : Type where - ref : ShutdownReqPointed.type - -instance : Nonempty (ShutdownReq α) := - Nonempty.intro { ref := Classical.choice ShutdownReqPointed.property } - -namespace ShutdownReq - /-- This returns the handle associated with the shutdown request. -/ @[extern "lean_uv_shutdown_req_handle"] -opaque handle [Inhabited α] (req : @&ShutdownReq α) : α +opaque ShutdownReq.handle [Inhabited α] (req : @&ShutdownReq α) : α alloy c section lean_obj_res lean_uv_shutdown_req_handle(b_lean_obj_arg reqObj, b_lean_obj_arg _rw) { @@ -147,68 +124,198 @@ lean_obj_res lean_uv_shutdown_req_handle(b_lean_obj_arg reqObj, b_lean_obj_arg _ } end -end ShutdownReq - -namespace Stream /-- Shutdown the outgoing (write) side of a duplex stream. It waits for pending write requests to complete. The cb is called after shutdown is complete. -/ @[extern "lean_uv_shutdown"] -opaque shutdown [Stream α] (handle : α) (cb : Bool → BaseIO Unit) : UV.IO (ShutdownReq α) +opaque Stream.shutdown [Stream α] (handle : α) (cb : Bool → BaseIO Unit) : UV.IO (ShutdownReq α) alloy c section -lean_obj_res lean_uv_shutdown(lean_obj_arg handle, lean_obj_arg cb, b_lean_obj_arg _rw) { - uv_stream_t* hdl = lean_get_external_data(handle); - lean_uv_shutdown_t* req = malloc(sizeof(lean_uv_shutdown_t)); - req->callback = cb; - if (shutdown_class == NULL) { - shutdown_class = lean_register_external_class(Shutdown_finalize, Shutdown_foreach); +static void invoke_rec_callback(uv_loop_t* loop, uv_req_t* req, lean_object** cbp, lean_object* status) { + // If the request object has been freed, then we can free the request + // object as well. + lean_object* cb = *cbp; + if (req->data) { + *cbp = 0; + } else { + free(req); } - lean_object* reqObj = lean_alloc_external(shutdown_class, req); - req->uv.data = reqObj; + check_callback_result(loop, lean_apply_2(cb, status, lean_io_mk_world())); +} + +static lean_obj_res mk_req(lean_external_class* cl, void* req) { + lean_object* reqObj = lean_alloc_external(cl, req); + ((uv_req_t*) req)->data = reqObj; + return reqObj; +} + +static void shutdown_cb(uv_shutdown_t *req, int status) { + lean_uv_shutdown_t* lreq = (lean_uv_shutdown_t*) req; + invoke_rec_callback(req->handle->loop, (uv_req_t*) req, &lreq->callback, lean_bool(status == 0)); +} + +lean_obj_res lean_uv_shutdown(b_lean_obj_arg handle, lean_obj_arg cb, b_lean_obj_arg _rw) { + lean_uv_shutdown_t* req = malloc(sizeof(lean_uv_shutdown_t)); + if (shutdown_class == NULL) + shutdown_class = lean_register_external_class(Shutdown_finalize, Req_foreach); + lean_object* reqObj = mk_req(shutdown_class, req); + uv_stream_t* hdl = lean_get_external_data(handle); int ec = uv_shutdown(&req->uv, hdl, &shutdown_cb); if (ec < 0) { // Release callback - lean_dec(cb); + lean_dec_ref(cb); req->callback = 0; // Make sure handle is initialized to avoid special case in Shutdown_finalize. req->uv.handle = hdl; - // This will free reqObj and decrement hdl and then free reqObj and req. lean_free_object(reqObj); fatal_error("uv_shutdown_req failed (error = %d)", ec); } + req->callback = cb; return lean_io_result_mk_ok(reqObj); } end -end Stream end Shutdown +section Listen namespace Stream -opaque ConnectReq (α : Type) : Type +alloy c section + +static void listen_callback(uv_stream_t* server, int status) { + if (status != 0) + fatal_error("listen callback status != 0"); + // Get callback and increment + lean_object* cb = stream_callbacks(server)->listen_callback; + lean_inc(cb); + check_callback_result(server->loop, lean_apply_1(cb, lean_io_mk_world())); +} + +end + +/-- +Start listening for incoming connections. + +`backlog` indicates the number of connections the kernel might queue, +same as `listen(2)`. -opaque WriteReq : Type +When a new incoming connection is received the connection callback is called. +-/ +@[extern "lean_uv_listen"] +opaque listen {α : Type} [Stream α] (server : @&α) (backlog : UInt32) (cb : UV.IO Unit) : UV.IO Unit +end Stream + +alloy c section +/** Return true if this is closed or closing. */ +static bool lean_uv_is_closing(uv_stream_t* stream) { + return stream->loop == 0 + || uv_is_closing((uv_handle_t*) stream); +} + +lean_obj_res lean_uv_listen(b_lean_obj_arg socketObj, uint32_t backlog, lean_obj_arg cb, b_lean_obj_arg _rw) { + uv_stream_t* socket = lean_get_external_data(socketObj); + if (lean_uv_is_closing(socket)) { + lean_dec_ref(cb); + return lean_uv_io_error(UV_EINVAL); + } + + lean_stream_callbacks_t* cbs = stream_callbacks(socket); + if (cbs->listen_callback) { + lean_dec_ref(cb); + return lean_uv_io_error(UV_EALREADY); + } + int err = uv_listen(socket, backlog, &listen_callback); + if (err < 0) { + lean_dec_ref(cb); + fatal_error("uv_listen failed (error = %d).\n", err); + } + cbs->listen_callback = cb; + return lean_io_unit_result_ok(); +} +end + +@[extern "lean_uv_stream_stop"] +opaque Stream.stop {α : Type} [Stream α] (server : @&α) : UV.IO Unit + +alloy c section + +static void close_stream_stop(uv_handle_t* h) { + // If finalize has already deleted object then free memory, + // otherwise we set loop and rely on it to free memory. + if (h->data == 0) { + free(lean_stream_base(h)); + } else { + h->loop = 0; + } +} + +lean_obj_res lean_uv_stream_stop(b_lean_obj_arg socketObj, b_lean_obj_arg _rw) { + uv_stream_t* socket = lean_get_external_data(socketObj); + + if (lean_uv_is_closing(socket)) + return lean_uv_io_error(UV_EINVAL); + + lean_stream_callbacks_t* cbs = stream_callbacks(socket); + if (cbs->listen_callback == 0) + return lean_uv_io_error(UV_EINVAL); + + lean_dec_ref(cbs->listen_callback); + cbs->listen_callback = 0; + + // Stop read callback. + if (cbs->read_callback) { + lean_dec_ref(cbs->read_callback); + cbs->read_callback = 0; + } + uv_close((uv_handle_t*) socket, close_stream_stop); + return lean_io_unit_result_ok(); +} +end + + +end Listen + +section Accept @[extern "lean_uv_accept"] -opaque accept [Stream α] [Stream β] (server : α) (client : β) : BaseIO Unit +opaque Stream.accept [Stream α] [Stream β] (server : @&α) (client : @&β) : UV.IO Unit alloy c section -lean_obj_res lean_uv_accept(b_lean_obj_arg server, lean_obj_arg client) { - // FIXME. Ensure accept can be called. - uv_stream_t* uv_server = lean_get_external_data(server); - uv_stream_t* uv_client = lean_get_external_data(client); - int err = uv_accept(uv_server, uv_client); +lean_obj_res lean_uv_accept(b_lean_obj_arg serverObj, lean_obj_arg clientObj, b_lean_obj_arg rw) { + uv_stream_t* server = lean_get_external_data(serverObj); + uv_stream_t* client = lean_get_external_data(clientObj); + if (lean_uv_is_closing(server) || lean_uv_is_closing(client)) { + return lean_uv_io_error(UV_EINVAL); + } + + int err = uv_accept(server, client); if (err < 0) fatal_error("uv_accept failed (error = %d).\n", err); return lean_io_unit_result_ok(); } end +end Accept + +section Read + +inductive ReadResult where +| ok : ByteArray → ReadResult +| eof : ReadResult +| error : ErrorCode → ReadResult + +@[export lean_uv_read_ok] +def ReadResult.ok_c := ReadResult.ok + +@[export lean_uv_read_eof] +def ReadResult.eof_c := ReadResult.eof + +@[export lean_uv_read_error] +def ReadResult.error_c := ReadResult.error /-- This starts reading data from the stream by invoking the callback when data is @@ -218,9 +325,15 @@ It throws `EALREADY` if a reader is already reading from the stream and `EINVAL if the stream is closing. -/ @[extern "lean_uv_read_start"] -opaque read_start [Stream α] (stream : α) (callback : ByteArray -> BaseIO Unit) : UV.IO Unit +opaque Stream.read_start [Stream α] (stream : @&α) (callback : ReadResult -> UV.IO Unit) : UV.IO Unit alloy c section + +static inline void init_buf(uv_buf_t *buf, lean_object* byteArray) { + buf->base = (char*) lean_sarray_cptr(byteArray); + buf->len = lean_sarray_size(byteArray); +} + static void alloc_callback(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { lean_object* sarray = lean_alloc_sarray(1, 0, suggested_size); @@ -228,29 +341,65 @@ void alloc_callback(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->len = suggested_size; } +lean_object* lean_uv_read_ok(lean_obj_arg bytes); +extern lean_object* lean_uv_read_eof; +lean_object* lean_uv_read_error(lean_obj_arg e); + +static lean_object* bufArrayObj(const uv_buf_t* buf) { + return (lean_object*) (buf->base - offsetof(lean_sarray_object, m_data)); +} + +static void dec_buf_array(const uv_buf_t* buf) { + lean_dec_ref(bufArrayObj(buf)); +} + static void read_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - printf("read_callback(%p, %zd, %p)\n", stream, nread, buf); lean_object* cb = stream_callbacks(stream)->read_callback; lean_inc(cb); - char* data = buf->base; - lean_object* array_obj = (lean_object*) (buf->base - offsetof(lean_sarray_object, m_data)); - lean_dec(lean_apply_2(cb, array_obj, lean_box(0))); + + lean_object* arg; + + if (nread > 0) { + // Recover the array object created by alloc_callback from buf. + lean_object* array_obj = bufArrayObj(buf); + // Set size + lean_sarray_set_size(array_obj, nread); + arg = lean_uv_read_ok(array_obj); + } else { + if (buf->base != 0) + dec_buf_array(buf); + if (nread == UV_EOF) { + arg = lean_uv_read_eof; + } else if (nread == UV_ENOBUFS) { + // This should never occur alloc_callback either allocates + // the required amount of memory or fails. + fatal_error("Internal error - out of memory.\n"); + } else { + arg = lean_uv_read_error(lean_uv_error_mk(nread)); + } + } + + // Pass array into callback. + check_callback_result(stream->loop, lean_apply_2(cb, arg, lean_io_mk_world())); } -lean_object* lean_uv_read_start(lean_object* stream_obj, lean_object* cb) { +lean_object* lean_uv_read_start(b_lean_obj_arg stream_obj, lean_obj_arg cb, b_lean_obj_arg _rw) { uv_stream_t* stream = lean_get_external_data(stream_obj); + if (lean_uv_is_closing(stream)) { + lean_dec_ref(cb); + return lean_uv_io_error(UV_EINVAL); + } + lean_stream_callbacks_t* cbs = stream_callbacks(stream); if (cbs->read_callback) { - lean_dec(stream_obj); - lean_dec(cb); + lean_dec_ref(cb); return lean_uv_io_error(UV_EALREADY); } cbs->read_callback = cb; int err = uv_read_start(stream, &alloc_callback, &read_callback); if (err < 0) { - lean_dec(stream_obj); - lean_dec(cb); + lean_dec_ref(cb); return lean_uv_io_error(err); } return lean_io_unit_result_ok(); @@ -263,46 +412,160 @@ This stops reading data from the stream. It always succeeds even if data is not being read from stream. -/ @[extern "lean_uv_read_stop"] -opaque read_stop [Stream α] (stream : α) : BaseIO Unit +opaque Stream.read_stop [Stream α] (stream : @&α) : BaseIO Unit alloy c section lean_object* lean_uv_read_stop(lean_object* stream_obj) { uv_stream_t* stream = lean_get_external_data(stream_obj); - // Only need to do things if stream is active. - if (uv_is_active((uv_handle_t*) stream)) { + lean_stream_callbacks_t* cbs = stream_callbacks(stream); + + // Only need to do things if stream is reading. + if (cbs->read_callback) { uv_read_stop(stream); // Clear stream read callback. - lean_stream_callbacks_t* cbs = stream_callbacks(stream); - if (cbs->read_callback) { - lean_dec(cbs->read_callback); - cbs->read_callback = 0; - } - - // If stream is no longer active, release it from implicit ownership by loop. - if (!uv_is_active((uv_handle_t*) stream)) { - lean_dec(stream_obj); - } + lean_dec_ref(cbs->read_callback); + cbs->read_callback = 0; } - // Release stream object - lean_dec(stream_obj); return lean_io_unit_result_ok(); } end -/- -@[extern "lean_uv_write"] -opaque write [Stream α] (stream : α) : BaseIO Unit +end Read + +section Write + +/-- References -/ +opaque WriteReqPointed : NonemptyType.{0} + +/-- A shutdown request -/ +structure WriteReq (α : Type) : Type where + ref : WriteReqPointed.type + +instance : Nonempty (WriteReq α) := + Nonempty.intro { ref := Classical.choice WriteReqPointed.property } + +alloy c section +/* +The external data of a ShutdownReq in Lean is a lean_uv_shutdown_t req where: + + req.uv.handle is a pointer to a uv_stream_t. + req.uv.data is a pointer to the Lean shutdown_req object. This is set + to null if the shutdown request memory is released. + req.callback is a pointer to the callback to invoke when the shutdown completes. + This is set to null after the callback returns. +*/ +struct lean_uv_write_s { + uv_write_t uv; + lean_object* callback; + uv_buf_t* bufs; + size_t bufcnt; +}; + +typedef struct lean_uv_write_s lean_uv_write_t; + +static void Write_finalize(void* ptr) { + lean_uv_write_t* req = (lean_uv_write_t*) ptr; + // Release counter on handle. + lean_dec_ref(req->uv.handle->data); + if (req->callback) { + req->uv.data = 0; + } else { + free(req); + } +} + +static lean_external_class * write_class = NULL; +end + +/-- +This returns the handle associated with the shutdown request. -/ +@[extern "lean_uv_write_handle"] +opaque WriteReq.handle [Inhabited α] (req : @&WriteReq α) : α + +alloy c section +lean_obj_res lean_uv_write_handle(b_lean_obj_arg reqObj, b_lean_obj_arg _rw) { + uv_write_t* req = lean_get_external_data(reqObj); + lean_object* hdl = req->handle->data; + lean_inc_ref(hdl); + return hdl; +} +end + +end Write + +@[extern "lean_uv_write"] +opaque Stream.write [Stream α] (stream : α) (bufs : @&Array ByteArray) + (callback : Bool → UV.IO Unit) : UV.IO (WriteReq α) + +alloy c section +static void write_cb(uv_write_t *req, int status) { + lean_uv_write_t* lreq = (lean_uv_write_t*) req; + uv_loop_t* loop = req->handle->loop; + lean_object* success = lean_bool(status == 0); + // If the request object has been freed, then we can free the request + // object as well. + lean_object* cb = lreq->callback; + if (req->data) { + lreq->callback = 0; + uv_buf_t* bufs = lreq->bufs; + uv_buf_t* bufEnd = bufs + lreq->bufcnt; + for (uv_buf_t* buf = bufs; buf != bufEnd; ++buf) + dec_buf_array(buf); + free(bufs); + } else { + free(req); + } + check_callback_result(loop, lean_apply_2(cb, success, lean_io_mk_world())); +} + +lean_obj_res lean_uv_write(b_lean_obj_arg streamObj, b_lean_obj_arg bufObj, + lean_obj_arg callback, b_lean_obj_arg _rw) { + size_t nbufs = lean_array_size(bufObj); + if (nbufs == 0) { + return lean_uv_io_error(UV_EINVAL); + } + + lean_uv_write_t* req = checked_malloc(sizeof(lean_uv_write_t)); + if (write_class == NULL) + write_class = lean_register_external_class(Write_finalize, Req_foreach); + lean_object* reqObj = mk_req(write_class, req); + uv_stream_t* hdl = lean_get_external_data(streamObj); + + lean_object** bufObjs = lean_array_cptr(bufObj); + + uv_buf_t* bufArray = malloc(sizeof(uv_buf_t) * nbufs); + for (size_t i = 0; i != nbufs; ++i) { + init_buf(bufArray + i, bufObjs[i]); + } + + int ec = uv_write(&req->uv, hdl, bufArray, nbufs, &write_cb); + if (ec < 0) { + lean_dec_ref(callback); + req->callback = 0; + req->uv.handle = hdl; + lean_free_object(req->uv.data); + fatal_error("uv_write failed (error = %d)", ec); + } else { + req->callback = callback; + return lean_io_result_mk_ok(reqObj); + } +} + +end -end Stream section SockAddr alloy c section +static void SockAddr_finalize(void* ptr) { + free(ptr); +} + static void SockAddr_foreach(void* ptr, b_lean_obj_arg f) { } @@ -313,7 +576,7 @@ A IPV4 or IPv6 socket address -/ alloy c extern_type SockAddr => struct sockaddr := { foreach := `SockAddr_foreach - finalize := `free + finalize := `SockAddr_finalize } namespace SockAddr @@ -322,12 +585,11 @@ namespace SockAddr Parses the string to create an IPV4 address with the given name and port. -/ alloy c extern "lean_uv_ipv4_addr" -def mkIPv4 (addr:String) (port:UInt16) : IO SockAddr := { +def mkIPv4 (addr:String) (port:UInt16) : UV.IO SockAddr := { struct sockaddr_in* r = checked_malloc(sizeof(struct sockaddr_in)); if (uv_ip4_addr(lean_string_cstr(addr), port, r) != 0) { free(r); - return lean_io_result_mk_error(lean_mk_io_user_error( - lean_mk_string("Could not parse IPV4 address"))); + return lean_uv_io_error(UV_EINVAL); } return lean_io_result_mk_ok(to_lean((struct sockaddr*) r)); } @@ -336,12 +598,11 @@ def mkIPv4 (addr:String) (port:UInt16) : IO SockAddr := { Parses the string to create an IPV6 address with the given name and port. -/ alloy c extern "lean_uv_ipv6_addr" -def mkIPv6 (addr:String) (port:UInt16) : IO SockAddr := { +def mkIPv6 (addr:String) (port:UInt16) : UV.IO SockAddr := { struct sockaddr_in6* r = checked_malloc(sizeof(struct sockaddr_in6)); if (uv_ip6_addr(lean_string_cstr(addr), port, r) != 0) { free(r); - return lean_io_result_mk_error(lean_mk_io_user_error( - lean_mk_string("Could not parse IPV6 address"))); + return lean_uv_io_error(UV_EINVAL); } return lean_io_result_mk_ok(to_lean((struct sockaddr*) r)); } @@ -357,55 +618,66 @@ alloy c section struct lean_uv_tcp_s { lean_stream_callbacks_t callbacks; uv_tcp_t uv; + bool connecting; }; typedef struct lean_uv_tcp_s lean_uv_tcp_t; -static lean_uv_tcp_t* tcp_from_handle(uv_handle_t* h) { - lean_stream_callbacks_t* op = (lean_stream_callbacks_t*) h; - return (lean_uv_tcp_t*) (op - 1); -} - -/* Return lean object representing this check */ -static uv_handle_t* tcp_handle(lean_uv_tcp_t* p) { - return (uv_handle_t*) &(p->uv); -} - static void TCP_foreach(void* ptr, b_lean_obj_arg f) { fatal_st_only("TCP"); } -static void tcp_close_cb(uv_handle_t* h) { - lean_uv_tcp_t* tcp = tcp_from_handle(h); +void lean_uv_close_stream(uv_handle_t* h) { + free(lean_stream_base(h)); +} + +// Close the check handle if the loop stops +extern void lean_uv_tcp_loop_stop(uv_handle_t* h) { + lean_uv_tcp_t* tcp = lean_stream_base(h); lean_stream_callbacks_t* callbacks = &tcp->callbacks; - if (callbacks->listen_callback != 0) - lean_dec(callbacks->listen_callback); - free_handle(h); + lean_dec_optref(callbacks->listen_callback); + lean_dec_optref(callbacks->read_callback); + uv_close(h, &lean_uv_close_stream); } static void TCP_finalize(void* ptr) { - lean_uv_tcp_t* tcp = tcp_from_handle(ptr); + lean_uv_tcp_t* tcp = lean_stream_base((uv_handle_t*) ptr); lean_stream_callbacks_t* callbacks = &tcp->callbacks; - if (callbacks->read_callback != 0) - lean_dec(callbacks->read_callback); - uv_close((uv_handle_t*) ptr, &tcp_close_cb); + uv_handle_t* handle = (uv_handle_t*)(&tcp->uv); + if (uv_is_closing(handle)) { + // This indicates user called stop explicitly. We should check if the callback + // from stop has already run and either clear data or free memory dependending on status. + if (handle->loop != 0) { + tcp->uv.data = 0; + } else { + free(tcp); + } + } else if (callbacks->read_callback == 0 + && callbacks->listen_callback == 0 + && !tcp->connecting) { + uv_close(handle, &lean_uv_close_stream); + } else { + tcp->uv.data = 0; + } + // Release loop object. Note that this may free the loop object + lean_dec_ref(loop_object(tcp->uv.loop)); } end -alloy c extern_type TCP => uv_handle_t := { +alloy c extern_type TCP => uv_tcp_t := { foreach := `TCP_foreach finalize := `TCP_finalize } --- FIXME: Support uv_tcp_init_ex alloy c extern "lean_uv_tcp_init" def Loop.mkTCP (loop : Loop) : BaseIO TCP := { lean_uv_tcp_t* tcp = checked_malloc(sizeof(lean_uv_tcp_t)); init_stream_callbacks(&tcp->callbacks); - lean_object* r = to_lean(tcp_handle(tcp)); - *handle_object(tcp_handle(tcp)) = r; + tcp->connecting = false; uv_tcp_init(of_loop(loop), &tcp->uv); + lean_object* r = to_lean(&tcp->uv); + tcp->uv.data = r; return lean_io_result_mk_ok(r); } @@ -416,24 +688,152 @@ opaque instStreamTCP : Stream TCP instance : Stream TCP := instStreamTCP alloy c extern "lean_uv_tcp_bind" -def bind (tcp : TCP) (addr : SockAddr) : BaseIO Unit := { - int err = uv_tcp_bind(lean_get_external_data(tcp), of_lean(addr), 0); - if (err != 0) { +def bind (tcp : @&TCP) (addr : @&SockAddr) : UV.IO Unit := { + uv_tcp_t* uv_tcp = lean_get_external_data(tcp); + if (lean_uv_is_closing((uv_stream_t*) uv_tcp)) + return lean_uv_io_error(UV_EINVAL); + + int err = uv_tcp_bind(uv_tcp, of_lean(addr), 0); + if (err != 0) fatal_error("uv_tcp_bind failed (error = %d)\n", err); - } return lean_io_unit_result_ok(); } ---def listen (tcp : TCP) (backlog : UInt32) (cb : BaseIO Unit) : UV.IO Unit := --- Stream.listen tcp backlog cb +/-- References -/ +opaque ConnectReqPointed : NonemptyType.{0} + +/-- A shutdown request -/ +structure ConnectReq (α : Type) : Type where + ref : ConnectReqPointed.type + +instance : Nonempty (ConnectReq α) := + Nonempty.intro { ref := Classical.choice ConnectReqPointed.property } + + +alloy c section +/* +The external data of a ConnectReq in Lean is a lean_uv_connect_t req where: + + req.uv.handle is a pointer to a uv_stream_t. + req.uv.data is a pointer to the Lean shutdown_req object. This is set + to null if the shutdown request memory is released. + req.callback is a pointer to the callback to invoke when the shutdown completes. + This is set to null after the callback returns. +*/ +struct lean_uv_connect_s { + uv_connect_t uv; + lean_object* callback; +}; + +typedef struct lean_uv_connect_s lean_uv_connect_t; + +static void Connect_finalize(void* ptr) { + lean_uv_connect_t* req = (lean_uv_connect_t*) ptr; + lean_dec_ref(req->uv.handle->data); + if (req->callback) { + req->uv.data = 0; + } else { + free(req); + } +} + +static lean_external_class * connect_class = NULL; +end -def accept (server : TCP) (client : TCP) : BaseIO Unit := +inductive ConnectionResult where +| ok : ConnectionResult +| canceled : ConnectionResult +| timedout : ConnectionResult +| unknown : ConnectionResult +deriving Inhabited, Repr + +alloy c section +static void tcp_connect_cb(uv_connect_t *req, int status) { + lean_object* statusObj; + switch (status) { + case UV_ECANCELED: + statusObj = lean_box(1); + break; + case UV_ETIMEDOUT: + statusObj = lean_box(2); + break; + default: + if (status >= 0) { + statusObj = lean_box(0); + } else { + statusObj = lean_box(3); + } + break; + } + + lean_uv_connect_t* lreq = (lean_uv_connect_t*) req; + lean_uv_tcp_t* luv_tcp = lean_stream_base((uv_handle_t*) req->handle); + luv_tcp->connecting = false; + invoke_rec_callback(req->handle->loop, (uv_req_t*) req, &lreq->callback, statusObj); +} +end + +alloy c extern "lean_uv_tcp_connect" +def connect (tcp : TCP) (addr : @&SockAddr) (callback : ConnectionResult → UV.IO Unit) : UV.IO (ConnectReq TCP) := { + lean_uv_tcp_t* luv_tcp = lean_stream_base(lean_get_external_data(tcp)); + uv_tcp_t* uv_tcp = &luv_tcp->uv; + uv_stream_t* hdl = (uv_stream_t*) uv_tcp; + if (luv_tcp->connecting) { + return lean_uv_io_error(UV_EINVAL); + } + lean_uv_connect_t* req = malloc(sizeof(lean_uv_connect_t)); + + if (connect_class == NULL) + connect_class = lean_register_external_class(Connect_finalize, Req_foreach); + lean_object* reqObj = mk_req(connect_class, req); + req->callback = callback; + + const struct sockaddr *uv_addr = of_lean(addr); + + int ec = uv_tcp_connect(&req->uv, uv_tcp, uv_addr, &tcp_connect_cb); + if (ec < 0) { + lean_dec_ref(tcp); + lean_dec_ref(callback); + req->callback = 0; + // Make sure handle is initialized to avoid special case in Shutdown_finalize. + req->uv.handle = hdl; + lean_free_object(reqObj); + fatal_error("uv_tcp_connect failed (error = %d)", ec); + } + luv_tcp->connecting = true; + return lean_io_result_mk_ok(reqObj); +} + +def listen (tcp : TCP) (backlog : UInt32) (callback : UV.IO Unit) : UV.IO Unit := + Stream.listen tcp backlog callback + +def stop (tcp : TCP) : UV.IO Unit := + Stream.stop tcp + +def accept (server : TCP) (client : TCP) : UV.IO Unit := Stream.accept server client -def read_start (tcp : TCP) (callback : ByteArray -> BaseIO Unit) : UV.IO Unit := +def read_start (tcp : TCP) (callback : ReadResult -> UV.IO Unit) : UV.IO Unit := Stream.read_start tcp callback -def read_stop (tcp : TCP) : BaseIO Unit := Stream.read_stop tcp +@[extern "lean_uv_test_impl"] +def test_impl {α:Type} (c : @&TCP) (d : α): UV.IO Unit := pure () + +alloy c section +lean_obj_res lean_uv_test_impl(b_lean_obj_arg x, b_lean_obj_arg y, b_lean_obj_arg rw) { + lean_dec_ref(y); + return lean_io_result_mk_ok(lean_box(0)); +} +end + +@[noinline] +def test (c : TCP) (d : α) : UV.IO Unit := test_impl c d + +def read_stop (tcp : TCP) : UV.IO Unit := + Stream.read_stop tcp + +def write (stream : TCP) (bufs : @&Array ByteArray) (callback : Bool → UV.IO Unit) := + Stream.write stream bufs callback end TCP diff --git a/LibUV/Timer.lean b/LibUV/Timer.lean index ba4e424..a9af3c2 100644 --- a/LibUV/Timer.lean +++ b/LibUV/Timer.lean @@ -18,22 +18,23 @@ static void Timer_foreach(void* ptr, b_lean_obj_arg f) { fatal_st_only("Timer"); } +// Close the check handle if the loop stops +void lean_uv_timer_loop_stop(uv_handle_t* h) { + lean_uv_timer_t* timer = (lean_uv_timer_t*) h; + lean_dec_optref(timer->callback); + uv_close(h, (uv_close_cb) &free); +} + static void Timer_finalize(void* ptr) { - lean_uv_timer_t* timer = (lean_uv_timer_t*) ptr; - if (timer->callback != 0) { - timer->uv.data = 0; - } else { - uv_close((uv_handle_t*) timer, (uv_close_cb) &free); - } - // Release loop object. Note that this may free the loop object - lean_dec(loop_object(timer->uv.loop)); + bool is_active = ((lean_uv_timer_t*) ptr)->callback != NULL; + lean_uv_finalize_handle((uv_handle_t*) ptr, is_active); } static void timer_callback(uv_timer_t* timer) { lean_uv_timer_t* t = (lean_uv_timer_t*) timer; assert(t->callback != 0); lean_inc(t->callback); - check_callback_result(t->loop, lean_apply_1(t->callback, lean_box(0))); + check_callback_result(t->uv.loop, lean_apply_1(t->callback, lean_io_mk_world())); } end @@ -43,12 +44,15 @@ alloy c extern_type Timer => lean_uv_timer_t := { } alloy c extern "lean_uv_timer_init" -def Loop.mkTimer (loop : Loop) : BaseIO Timer := { +def Loop.mkTimer (loop : Loop) : UV.IO Timer := { lean_uv_timer_t* timer = checked_malloc(sizeof(lean_uv_timer_t)); lean_object* r = to_lean(timer); timer->uv.data = r; timer->callback = 0; - uv_timer_init(of_loop(loop), &timer->uv); + + int ec = uv_timer_init(of_loop(loop), &timer->uv); + if (ec < 0) + fatal_error("uv_timer_init failed (error = ec)\n", ec); return lean_io_result_mk_ok(r); } @@ -62,21 +66,23 @@ def start (timer : @&Timer) (timeout repeat_timeout : UInt64) (callback : UV.IO return lean_uv_io_error(UV_EINVAL); } l->callback = callback; - if (uv_timer_start(&l->uv, &timer_callback, timeout, repeat_timeout) != 0) + if (uv_timer_start(&l->uv, &timer_callback, timeout, repeat_timeout) != 0) { fatal_error("uv_timer_start failed\n"); - return lean_io_unit_result_ok(); + } + return lean_io_result_mk_ok(lean_unit()); } alloy c extern "lean_uv_timer_stop" def stop (timer : @&Timer) : UV.IO Unit := { lean_uv_timer_t* l = of_lean(timer); + if (l->callback == 0) { + return lean_io_unit_result_ok(); + } + lean_dec(l->callback); + l->callback = 0; if (uv_timer_stop(&l->uv) != 0) { fatal_error("uv_timer_stop failed¬"); } - if (l->callback) { - lean_dec(l->callback); - l->callback = 0; - } return lean_io_unit_result_ok(); } @@ -93,7 +99,7 @@ def again (timer : @&Timer) : UV.IO Unit := { } if (uv_timer_again(&l->uv) != 0) { - fatal_error("uv_timer_again failed.¬"); + fatal_error("uv_timer_again failed."); } return lean_io_unit_result_ok(); } diff --git a/README.md b/README.md index 8399cff..bdb22f2 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ called. We adopt the following scheme: -* Every handle object holds a reference to the associated loop. +* Every handle object holds a reference to the associated loop object. * Depending on their type, requests may hold references to handles or loops. * Every loop, handle and requests hold their corresponding object in the @@ -69,30 +69,46 @@ We adopt the following scheme: but may be null for `uv_handle_t` and `uv_req_t`. * When a Lean request object is freed, then the following steps are taken: 1. The data field for the request object is set to null. - 2. The reference count for the handle is decremented. + 2. Any references to Lean objects are released. + 3. If the request has been fulfilled, then the memory is released. Otherwise + we must wait for callback. * When a Lean handle object is freed, then the following steps are taken: 1. The data field for the handle object is set to null. 2. If the handle is not active, then `uv_close` is called with a callback that will free the handle resources. Once the callback is invoked, `uv_walk` will no longer return the handle. - 3. The reference to the loop is decremented. + 3. The reference to the loop is released. * When a Lean loop object is freed, then we call `uv_loop_close` and if it succeeds we free the loop resources and are done. If not, then we close all active handles with the following steps: 1. The loop walks the list of handles, and invokes close on each handle - that are not closing. + that is not closing. 2. If there were any handles in the loop encountered in step 1, then `uv_run` is called with `UV_RUN_DEFAULT` to run all closing callbacks. If this returns non-zero, then we exit with a fatal error since this reflects a bug in Lean LibUV. 3. We call `uv_loop_close` again. If it fails again, then we report a fatal error since this reflects a bug in Lean LibUV code. - +* Handles may need to be closed explicitly. This is particularly + true for streams that are listening on a port, since there is no function to + stop listening. We currently only allow streams to be closed, but may + eventually allow all streams to be explicitly closed. * If a handle or rquest data field is set to null but needed again for a callback, then a new Lean object of the appropriate type will be created and `uv_run` invokes a callback on a handle with a null `data` field, then a new handle object is created and assigned to `uv_handle_t.data`. +## Sockets closing + +Streams have additional potential states as there is no way to stop listening +once `uv_listen` is invoked. To work around this, we have an explicit +`Stream.close` operation that closes the socket, but does not free the underlying +`uv_stream_t` struct (until the Lean object is finalized). + +LibUV does not provide a function to see if a handle has been closed, so we set +the stream handle loop field to null if the stream has fully closed, but not freed. +The finalize procedure for a stream must detect this and free the object. + ## Limitations We do not allow any of the `lean-libuv` types to be shared between Lean diff --git a/examples/tcp.lean b/examples/tcp.lean index d27ed46..5cb231e 100644 --- a/examples/tcp.lean +++ b/examples/tcp.lean @@ -2,196 +2,64 @@ import LibUV -- This is example code from: -- https://medium.com/@padam.singh/an-event-driven-tcp-server-using-libuv-50cce9a473c0 +def read_cb (client : UV.TCP) (timer : UV.Timer) (res : UV.ReadResult) : UV.IO Unit := do + UV.log " read_cb: Got read" + match res with + | .eof => + UV.log s!"EOF" + timer.stop + client.read_stop + | .error e => + throw (.errorcode e) + | .ok bytes => + let cmd := String.fromUTF8Unchecked bytes + UV.log s!"Read '{cmd}'" -/- -#include -#include -#include -#include -#include -static uv_loop_t * loop; -static uv_tcp_t server; -/* - * Stores everything about a request - */ -struct client_request_data { - time_t start; - char *text; - size_t text_len; - char *response; - int work_started; - uv_tcp_t *client; - uv_work_t *work_req; - uv_write_t *write_req; - uv_timer_t *timer; -}; -/* Allocate buffers as requested by UV */ -static void alloc_buffer(uv_handle_t * handle, size_t size, - uv_buf_t *buf) { - char *base; - base = (char *) calloc(1, size); - if(!base) - *buf = uv_buf_init(NULL, 0); - else - *buf = uv_buf_init(base, size); -} -/* Callback to free the handle */ -static void on_close_free(uv_handle_t* handle) -{ - free(handle); -} -/* - * Callback for freeing up all resources allocated for request - */ -static void close_data(struct client_request_data *data) -{ - if(!data) - return; - if(data->client) - uv_close((uv_handle_t *)data->client, - on_close_free); - if(data->work_req) - free(data->work_req); - if(data->write_req) - free(data->write_req); - if(data->timer) - uv_close((uv_handle_t *)data->timer, on_close_free); - if(data->text) - free(data->text); - if(data->response) - free(data->response); - free(data); -} -/* - * Callback for when the TCP write is complete - */ -static void on_write_end(uv_write_t *req, int status) { - struct client_request_data *data; - data = req->data; - close_data(data); -} -/* - * Callback for post completion of the work associated with the - * request - */ -static void after_process_command(uv_work_t *req, int status) { - struct client_request_data *data; - data = req->data; - uv_buf_t buf = uv_buf_init(data->response, strlen(data->response)+1); - data->write_req = malloc(sizeof(*data->write_req)); - data->write_req->data = data; - uv_timer_stop(data->timer); - uv_write(data->write_req, (uv_stream_t *)data->client, - &buf, 1, on_write_end); -} -/* - * Callback for doing the actual work. - */ -static void process_command(uv_work_t *req) { - struct client_request_data *data; - char *x; - data = req->data; - // Do the actual time-consuming request processing here - data->response = strdup("Hello World, work's done\n"); -} +def String.escapeNewLines (s : String) := s.map (fun c => if c == '\n' then ' ' else c) -/* Callback for read function, called multiple times per request */ -static void read_cb(uv_stream_t * stream, ssize_t nread, const uv_buf_t *buf) { - uv_tcp_t *client; - struct client_request_data *data; - char *tmp; - client = (uv_tcp_t *)stream; - data = stream->data; - if (nread == -1 || nread==UV_EOF) { - free(buf->base); - uv_timer_stop(data->timer); - close_data(data); - } else { - if(!data->text) { - data->text = malloc(nread+1); - memcpy(data->text, buf->base, nread); - data->text[nread] = '\0'; - data->text_len = nread; - } else { - tmp = realloc(data->text, data->text_len + nread + 1); - memcpy(tmp + data->text_len, buf->base, nread); - tmp[data->text_len + nread] = '\0'; - data->text = tmp; - data->text_len += nread; - } - free(buf->base); - if(!data->work_started && data->text_len && - (strstr(data->text,"\r\n") - || strstr(data->text,"\n\n"))) { - data->work_req = - malloc(sizeof(*data->work_req)); - data->work_req->data = data; - data->work_started = 1; - uv_read_stop(stream); - uv_queue_work(loop, data->work_req, - process_command, - after_process_command); - } - } -} -/* Callback for the timer which signifies a timeout */ -static void client_timeout_cb(uv_timer_t* handle) -{ - struct client_request_data *data; - data = (struct client_request_data *)handle->data; - uv_timer_stop(handle); - if(data->work_started) - return; - close_data(data); -} -/* Callback for handling the new connection */ -static void connection_cb(uv_stream_t * server, int status) { - printf("connection_cb\n"); - fflush(stdout); - struct client_request_data *data; - /* if status not zero there was an error */ - if (status == -1) { - return; - } - data = calloc(1, sizeof(*data)); - data->start = time(NULL); - uv_tcp_t * client = malloc(sizeof(uv_tcp_t)); - client->data = data; - data->client = client; - /* initialize the new client */ - uv_tcp_init(loop, client); - if (uv_accept(server, (uv_stream_t *) client) == 0) { - /* start reading from stream */ - uv_timer_t *timer; - timer = malloc(sizeof(*timer)); - timer->data = data; - data->timer = timer; - uv_timer_init(loop, timer); - uv_timer_set_repeat(timer, 1); - uv_timer_start(timer, client_timeout_cb, 10000, 20000); - uv_read_start((uv_stream_t *) client, alloc_buffer, read_cb); - } else { - /* close client stream on error */ - close_data(data); - } -} --/ +def UV.TCP.readString (socket : UV.TCP) (eof : UV.IO Unit) (read : String → UV.IO Unit) : UV.IO Unit := do + socket.read_start fun res => do + match res with + | .error e => throw (.errorcode e) + | .eof => eof + | .ok bytes => + let cmd := String.fromUTF8Unchecked bytes + read cmd -def read_cb (data : ByteString) : BaseIO Unit := do - pure () +def UV.TCP.writeString (socket : UV.TCP) (msg : String) (next : UV.IO Unit): UV.IO Unit := do + let bytes := String.toUTF8 msg + let _ ← socket.write #[bytes] fun success => do + if not success then + throw (.errorcode .EINVAL) + next - -def connection_cb (loop : UV.Loop) (server : UV.TCP) : UV.UvIO Unit := do - let client ← loop.mkTCP - server.accept client - client.read_start read_cb - pure () - -def main : IO Unit := do +def main : IO Unit := UV.IO.run do + UV.log "Started loop" let loop ← UV.mkLoop + let addr ← UV.SockAddr.mkIPv4 "127.0.0.1" 10000 + UV.log "Starting listening" let server ← loop.mkTCP - let addr ← UV.SockAddr.mkIPv4 "0.0.0.0" 12000 server.bind addr - server.listen 128 (connection_cb loop server) + server.listen 128 do + UV.log "Received connection" + let client ← loop.mkTCP + server.accept client + let timer ← loop.mkTimer + timer.start 10000 20000 do UV.fatal "Timeout" + let onEOF := do + UV.log "Client disconnected" + timer.stop + client.read_stop + server.stop + client.readString onEOF fun msg => do + UV.log s!"Read '{msg.escapeNewLines}'" + let client ← loop.mkTCP + let _ ← client.connect addr $ fun success => do + match success with + | .ok => pure () + | _ => throw (.errorcode .EINVAL) + client.writeString "test\n" $ do + UV.log "Data written" + --client.stop let _stillActive ← loop.run - pure () + UV.log "Finished" diff --git a/include/lean_uv.h b/include/lean_uv.h index 49d2be1..e586d01 100644 --- a/include/lean_uv.h +++ b/include/lean_uv.h @@ -27,12 +27,7 @@ static lean_object* lean_bool(bool b) { } static lean_object* lean_io_unit_result_ok() { - static lean_object* r = 0; - if (r == 0) { - r = lean_io_result_mk_ok(lean_unit()); - lean_mark_persistent(r); - } - return r; + return lean_io_result_mk_ok(lean_unit()); } struct lean_uv_loop_s { @@ -58,6 +53,16 @@ static lean_object* loop_object(uv_loop_t* l) { return (lean_object*) l->data; } +/** Finalize a non-stream handle. */ +static inline void lean_uv_finalize_handle(uv_handle_t* h, bool is_active) { + if (is_active) { + h->data = 0; + } else { + uv_close(h, (uv_close_cb) free); + } + lean_dec(h->loop->data); +} + /* Check the callback result from a handle. The callback result should have type `IO Unit`. @@ -71,7 +76,12 @@ static void check_callback_result(uv_loop_t* loop, lean_object* io_result) { return; } } - lean_dec(io_result); + lean_dec_ref(io_result); +} + +/** Decrement an object that may be null. */ +static void lean_dec_optref(lean_object* o) { + if (o != NULL) lean_dec_ref(o); } /* @@ -81,12 +91,6 @@ static lean_object** handle_object(uv_handle_t* p) { return (lean_object**) &(p->data); } -// This decrements the loop object and frees the handle. -static void free_handle(uv_handle_t* handle) { - lean_dec(loop_object(handle->loop)); - free(handle); -} - lean_obj_res lean_uv_raise_invalid_argument(lean_obj_arg msg, lean_obj_arg _s); static @@ -113,7 +117,9 @@ void fatal_st_only(const char *name) { fatal_error("%s cannot be made multi-threaded or persistent.", name); } -lean_object* lean_uv_io_error(int err); +extern lean_object* lean_uv_error_mk(int err); + +extern lean_object* lean_uv_io_error(int err); /* This struct contains callbacks used by the stream API. @@ -121,7 +127,7 @@ This struct contains callbacks used by the stream API. See "uv_stream_t implementation note" above for more information. */ struct lean_stream_callbacks_s { - lean_object* listen_callback; // Object referencing method to call. + lean_object* listen_callback; lean_object* read_callback; // Object referencing method to call. }; @@ -130,4 +136,4 @@ typedef struct lean_stream_callbacks_s lean_stream_callbacks_t; static void* lean_stream_base(uv_handle_t* h) { lean_stream_callbacks_t* op = (lean_stream_callbacks_t*) h; return (op - 1); -} +} \ No newline at end of file diff --git a/scripts/runExamples.sh b/scripts/runExamples.sh index 6211e0c..6349a52 100755 --- a/scripts/runExamples.sh +++ b/scripts/runExamples.sh @@ -19,6 +19,7 @@ if [ $# -eq 0 ] then runExample "examples/counter.lean" runExample "examples/phases.lean" + runExample "examples/tcp.lean" runExample "examples/timer.lean" else runExample $1